1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 import sys
18 import time
19 try:
20
21 import xmlrpclib
22 except ImportError:
23
24 import xmlrpc.client as xmlrpclib
25
26 from spacewalk.common.usix import IntType, TupleType, UnicodeType
27
28
29 from spacewalk.common.usix import raise_with_tb
30 from spacewalk.common import rhnFlags
31 from spacewalk.common.rhnLog import log_debug, log_error
32 from spacewalk.common.rhnConfig import CFG
33 from spacewalk.common.rhnException import rhnFault
34 from spacewalk.common.rhnTranslate import _
35 from spacewalk.common.rhnTB import Traceback
36 from spacewalk.server.rhnHandler import rhnHandler
37 from spacewalk.server import rhnSQL, rhnCapability, rhnAction
38 from spacewalk.server.rhnLib import InvalidAction, EmptyAction, ShadowAction
39 from spacewalk.server.rhnServer import server_kickstart
40
41 import getMethod
42
43
45
46 """ XMLRPC queue functions that we will provide for the outside world. """
47
49 """ Add a list of functions we are willing to server out. """
50 rhnHandler.__init__(self)
51 self.functions.append('get')
52 self.functions.append('get_future_actions')
53 self.functions.append('length')
54 self.functions.append('submit')
55
56
57
58 self.action_type_completed_codes = {
59 'errata.update': {
60 39: None,
61 },
62 }
63
65 """ Fetches old queued actions for the client version 1. """
66 log_debug(3, self.server_id)
67 actionId = action['id']
68 method = action["method"]
69 if method == 'packages.update':
70 xml = self.__packageUpdate(actionId)
71 elif method == 'errata.update':
72 xml = self.__errataUpdate(actionId)
73 elif method == 'hardware.refresh_list':
74 xml = xmlrpclib.dumps(("hardware",), methodname="client.refresh")
75 elif method == 'packages.refresh_list':
76 xml = xmlrpclib.dumps(("rpmlist",), methodname="client.refresh")
77 else:
78 raise InvalidAction("Action method %s unsupported by "
79 "Update Agent Client" % method)
80
81 return {'id': actionId, 'version': 1, 'action': xml}
82
83 - def __getV2(self, action, dry_run=0):
84 """ Fetches queued actions for the clients version 2+. """
85 log_debug(3, self.server_id)
86
87 try:
88 method = getMethod.getMethod(action['method'],
89 'server.action')
90 except getMethod.GetMethodException:
91 Traceback("queue.get V2")
92 raise_with_tb(EmptyAction("Could not get a valid method for %s" % (
93 action['method'],)), sys.exc_info()[2])
94
95 result = method(self.server_id, action['id'], dry_run)
96 if result is None:
97
98 result = ()
99 elif not isinstance(result, TupleType):
100
101 result = (result, )
102
103 xmlblob = xmlrpclib.dumps(result, methodname=action['method'])
104 log_debug(5, "returning xmlblob for action", xmlblob)
105 return {
106 'id': action['id'],
107 'action': xmlblob,
108 'version': action['version'],
109 }
110
112 """ Update the runnng kernel and the last boot values for this
113 server from the status dictionary passed on queue checkin.
114
115 Record last running kernel and uptime. Only update
116 last_boot if it has changed by more than five seconds. We
117 don't know the timezone the server is in. or even if its
118 clock is right, but we do know it can properly track seconds
119 since it rebooted, and use our own clocks to keep proper
120 track of the actual time.
121 """
122
123 rhnSQL.set_log_auth_login('CLIENT')
124 if 'uname' in status:
125 kernelver = status['uname'][2]
126 if kernelver != self.server.server["running_kernel"]:
127 self.server.server["running_kernel"] = kernelver
128
129
130
131
132 if 'uptime' in status:
133 uptime = status['uptime']
134 if isinstance(uptime, type([])) and len(uptime):
135
136 uptime = uptime[0]
137 try:
138 uptime = float(uptime)
139 except ValueError:
140
141 pass
142 else:
143 last_boot = time.time() - uptime
144 if abs(last_boot - self.server.server["last_boot"]) > 5:
145 self.server.server["last_boot"] = last_boot
146 self.__set_reboot_action_to_succcess()
147
148
149 self.server.server.save()
150
152 h = rhnSQL.prepare("""
153 update rhnServerAction
154 set status = 2
155 where server_id = :server_id
156 and action_id in (
157 select sa.action_id
158 from rhnServerAction sa
159 join rhnAction a on sa.action_id = a.id
160 join rhnActionType at on a.action_type = at.id
161 where sa.server_id = :server_id
162 and sa.status = 1
163 and at.label = 'reboot.reboot'
164 )
165 """)
166 h.execute(server_id=self.server_id)
167
169 log_debug(4, self.server_id, "determining whether to snapshot...")
170
171 entitlements = self.server.check_entitlement()
172 if "enterprise_entitled" not in entitlements:
173 return 0
174
175
176 return 1
177
186
188 h = rhnSQL.prepare("""
189 select sa.action_id, a.prerequisite
190 from rhnServerAction sa, rhnAction a
191 where sa.server_id = :server_id
192 and sa.action_id = a.id
193 and sa.status in (0, 1) -- Queued or picked up
194 and a.prerequisite is not null
195 and exists (
196 select 1
197 from rhnServerAction
198 where server_id = sa.server_id
199 and action_id = a.prerequisite
200 and status = 3 -- failed
201 )
202 """)
203
204 h.execute(server_id=self.server_id)
205 while 1:
206 row = h.fetchone_dict()
207 if not row:
208 break
209
210 action_id, prereq_action_id = row['action_id'], row['prerequisite']
211
212 self._invalidate_child_actions(action_id)
213 _query_future_enabled = rhnSQL.Statement("""
214 select staging_content_enabled
215 from rhnOrgConfiguration oc,
216 rhnServer s
217 where s.org_id = oc.org_id
218 and s.id = :server_id
219 """)
220
228
229 _query_queue_future = rhnSQL.Statement("""
230 select sa.action_id id, a.version,
231 sa.remaining_tries, at.label as method,
232 at.unlocked_only,
233 a.prerequisite
234 from rhnServerAction sa,
235 rhnAction a,
236 rhnActionType at
237 where sa.server_id = :server_id
238 and sa.action_id = a.id
239 and a.action_type = at.id
240 and sa.status in (0, 1) -- Queued or picked up
241 and a.earliest_action <= current_timestamp + numtodsinterval(:time_window * 3600, 'second') -- Check earliest_action
242 and at.label in ('packages.update', 'errata.update',
243 'packages.runTransaction', 'packages.fullUpdate')
244 order by a.earliest_action, a.prerequisite nulls first, a.id
245 """)
246
261
262 _query_queue_get = rhnSQL.Statement("""
263 select sa.action_id id, a.version,
264 sa.remaining_tries, at.label as method,
265 at.unlocked_only,
266 a.prerequisite
267 from rhnServerAction sa,
268 rhnAction a,
269 rhnActionType at
270 where sa.server_id = :server_id
271 and sa.action_id = a.id
272 and a.action_type = at.id
273 and sa.status in (0, 1) -- Queued or picked up
274 and a.earliest_action <= current_timestamp -- Check earliest_action
275 and not exists (
276 select 1
277 from rhnServerAction sap
278 where sap.server_id = :server_id
279 and sap.action_id = a.prerequisite
280 and sap.status != 2 -- completed
281 )
282 order by a.earliest_action, a.prerequisite nulls first, a.id
283 """)
284
285
286 - def get(self, system_id, version=1, status={}):
287
288 if CFG.DISABLE_CHECKINS:
289 self.update_checkin = 0
290 else:
291 self.update_checkin = 1
292 self.auth_system(system_id)
293 log_debug(1, self.server_id, version,
294 "checkins %s" % ["disabled", "enabled"][self.update_checkin])
295 if status:
296 self.__update_status(status)
297
298
299 rhnCapability.update_client_capabilities(self.server_id)
300
301
302 self._invalidate_failed_prereq_actions()
303
304 server_locked = self.server.server_locked()
305 log_debug(3, "Server locked", server_locked)
306
307 if self.__reboot_in_progress():
308 log_debug(3, "Server reboot in progress", self.server_id)
309 rhnSQL.commit()
310 return ""
311
312 ret = {}
313
314
315
316
317
318
319 h = rhnSQL.prepare(self._query_queue_get)
320
321 should_execute = 1
322
323
324
325 while 1:
326 if should_execute:
327 h.execute(server_id=self.server_id)
328 should_execute = 0
329
330
331 action = h.fetchone_dict()
332 if not action:
333
334 ret = ""
335 break
336 action_id = action['id']
337 log_debug(4, "Checking action %s" % action_id)
338
339 if action['remaining_tries'] < 1:
340 log_debug(4, "Action %s picked up too many times" % action_id)
341
342 self.__update_action(action_id, status=3,
343 message="This action has been picked up multiple times "
344 "without a successful transaction; "
345 "this action is now failed for this system.")
346
347 self._invalidate_child_actions(action_id)
348
349 continue
350
351 if server_locked and action['unlocked_only'] == 'Y':
352
353 log_debug(4, "server id %s locked for action id %s" % (
354 self.server_id, action_id))
355 continue
356
357 try:
358 if version == 1:
359 ret = self.__getV1(action)
360 else:
361 ret = self.__getV2(action)
362 except ShadowAction:
363 e = sys.exc_info()[1]
364
365
366 should_execute = 1
367 text = e.args[0]
368 log_debug(4, "Shadow Action", text)
369 self.__update_action(action['id'], 2, 0, text)
370 continue
371 except InvalidAction:
372 e = sys.exc_info()[1]
373
374 text = e.args[0]
375 log_debug(4, "Invalid Action", text)
376 self.__update_action(action['id'], 3, -99, text)
377 continue
378 except EmptyAction:
379 e = sys.exc_info()[1]
380
381
382
383 log_error("Can not process action data", action, e.args)
384 ret = ""
385 break
386 else:
387
388 h = rhnSQL.prepare("""
389 update rhnServerAction
390 set status = 1,
391 pickup_time = current_timestamp,
392 remaining_tries = :tries - 1
393 where action_id = :action_id
394 and server_id = :server_id
395 """)
396 h.execute(action_id=action["id"], server_id=self.server_id,
397 tries=action["remaining_tries"])
398 break
399
400
401 rhnSQL.commit()
402
403 return ret
404
405 - def submit(self, system_id, action_id, result, message="", data={}):
406 """ Submit the results of a queue run.
407 Maps old and new rhn_check behavior to new database status codes
408
409 The new API uses 4 slightly different status codes than the
410 old client does. This function will "hopefully" sensibly
411 map them. Old methodology:
412 -rhn_check retrieves an action from the top of the action queue.
413 -It attempts to execute the desired action and returns either
414 (a) 0 -- presumed successful.
415 (b) rhnFault object -- presumed failed
416 (c) some other non-fault object -- *assumed* successful.
417 -Regardless of result code, action is marked as "executed"
418
419 We try to make a smarter status selection (i.e. failed||completed).
420
421 For reference:
422 New DB status codes: Old DB status codes:
423 0: Queued 0: queued
424 1: Picked Up 1: picked up
425 2: Completed 2: executed
426 3: Failed 3: completed
427 """
428 if type(action_id) is not IntType:
429
430 try:
431 action_id = int(action_id)
432 except ValueError:
433 log_error("Invalid action_id", action_id)
434 raise_with_tb(rhnFault(30, _("Invalid action value type %s (%s)") %
435 (action_id, type(action_id))), sys.exc_info()[2])
436
437 self.auth_system(system_id)
438 log_debug(1, self.server_id, action_id, result)
439
440
441 h = rhnSQL.prepare("""
442 select at.label action_type,
443 at.trigger_snapshot,
444 at.name
445 from rhnServerAction sa,
446 rhnAction a,
447 rhnActionType at
448 where sa.server_id = :server_id
449 and sa.action_id = :action_id
450 and sa.status = 1
451 and a.id = :action_id
452 and a.action_type = at.id
453 """)
454 h.execute(server_id=self.server_id, action_id=action_id)
455 row = h.fetchone_dict()
456 if not row:
457 log_error("Server %s does not own action %s" % (
458 self.server_id, action_id))
459 raise rhnFault(22, _("Action %s does not belong to server %s") % (
460 action_id, self.server_id))
461
462 action_type = row['action_type']
463 trigger_snapshot = (row['trigger_snapshot'] == 'Y')
464
465 if 'missing_packages' in data:
466 missing_packages = "Missing-Packages: %s" % str(
467 data['missing_packages'])
468 rmsg = "%s %s" % (message, missing_packages)
469 elif 'koan' in data:
470 rmsg = "%s: %s" % (message, data['koan'])
471 else:
472 rmsg = message
473
474 rcode = result
475
476
477
478 if type(rcode) == type({}):
479 if "faultCode" in result:
480 rcode = result["faultCode"]
481 if "faultString" in result:
482 rmsg = result["faultString"] + str(data)
483 if type(rcode) in [type({}), type(()), type([])] \
484 or type(rcode) is not IntType:
485 rmsg = u"%s [%s]" % (UnicodeType(message), UnicodeType(rcode))
486 rcode = -1
487
488 status = self.status_for_action_type_code(action_type, rcode)
489
490 if status == 3:
491
492 self._invalidate_child_actions(action_id)
493 elif action_type == 'reboot.reboot':
494
495 rhnSQL.commit()
496 return 0
497 elif status == 2 and trigger_snapshot and self.__should_snapshot():
498
499 self.server.take_snapshot("Scheduled action completion: %s" % row['name'])
500
501 self.__update_action(action_id, status, rcode, rmsg)
502
503
504
505 rhnFlags.set('action_id', action_id)
506 rhnFlags.set('action_status', status)
507
508 self.process_extra_data(self.server_id, action_id, data=data,
509 action_type=action_type)
510
511
512 rhnSQL.commit()
513 return 0
514
516 """ Convert whatever the client sends as a result code into a status in the
517 database format
518 This is more complicated, since some of the client's result codes have
519 to be marked as successes.
520 """
521 log_debug(4, action_type, rcode)
522 if rcode == 0:
523
524 return 2
525
526 if action_type not in self.action_type_completed_codes:
527
528 return 3
529
530 hash = self.action_type_completed_codes[action_type]
531 if rcode not in hash:
532
533 return 3
534
535
536 return 2
537
556
558 """ Return the queue length for a certain server. """
559
560 self.auth_system(system_id)
561 log_debug(1, self.server_id)
562 h = rhnSQL.prepare("""
563 select
564 count(action_id) id
565 from
566 rhnServerAction r
567 where
568 r.server_id = :server_id
569 and r.status in (0, 1)
570 """)
571 h.execute(server_id=self.server_id)
572 data = h.fetchone_dict()
573 if data is None:
574 return 0
575 return data["id"]
576
577
578
580 """check for a reboot action for this server in status Picked Up"""
581 log_debug(4, self.server_id)
582 h = rhnSQL.prepare("""
583 select 1
584 from rhnServerAction sa
585 join rhnAction a on sa.action_id = a.id
586 join rhnActionType at on a.action_type = at.id
587 where sa.server_id = :server_id
588 and at.label = 'reboot.reboot'
589 and sa.status = 1 -- Picked Up
590 """)
591 h.execute(server_id=self.server_id)
592 ret = h.fetchone_dict() or None
593 if ret:
594 return True
595 return False
596
597 - def __update_action(self, action_id, status,
598 resultCode=None, message=""):
605
607 """ Old client errata retrieval. """
608 log_debug(3, self.server_id, actionId)
609
610
611
612 sql = """
613 select
614 pn.name name,
615 pl.evr.version version,
616 pl.evr.release release
617 from (
618 select
619 p.name_id,
620 max(pe.evr) evr
621 from
622 rhnPackageEVR pe,
623 rhnChannelPackage cp,
624 rhnPackage p,
625 rhnServerChannel sc,
626 (
627 select
628 p_name.name_id id
629 from
630 rhnActionErrataUpdate aeu,
631 rhnErrataPackage ep,
632 rhnPackage p_name
633 where
634 aeu.action_id = :action_id
635 and aeu.errata_id = ep.errata_id
636 and ep.package_id = p_name.id
637 ) nids
638 where
639 nids.id = p.name_id
640 and p.evr_id = pe.id
641 and p.id = cp.package_id
642 and cp.channel_id = sc.channel_id
643 and sc.server_id = :server_id
644 group by p.name_id
645 ) pl,
646 rhnPackageName pn
647 where
648 pn.id = pl.name_id
649 """
650 h = rhnSQL.prepare(sql)
651 h.execute(action_id=actionId, server_id=self.server_id)
652
653 packages = []
654 while 1:
655 ret = h.fetchone_dict()
656 if not ret:
657 break
658
659
660 packages.append([ret["name"], ret["version"], ret["release"], ''])
661 xml = xmlrpclib.dumps((packages,), methodname='client.update_packages')
662 return xml
663
665 """ Old client package retrieval. """
666 log_debug(3, self.server_id, actionId)
667
668
669
670
671
672 statement = """
673 select distinct
674 pkglist.name name,
675 -- decode the evr object selected earlier
676 pkglist.evr.version version,
677 pkglist.evr.release release
678 from (
679 -- get the max of the two possible cases
680 select
681 pl.name name,
682 max(pl.evr) evr
683 from (
684 -- if the EVR is specifically requested...
685 select
686 pn.name name,
687 pe.evr evr
688 from
689 rhnActionPackage ap,
690 rhnPackage p,
691 rhnPackageName pn,
692 rhnPackageEVR pe,
693 rhnServerChannel sc,
694 rhnChannelPackage cp
695 where
696 ap.action_id = :action_id
697 and ap.evr_id is NOT NULL
698 and ap.evr_id = p.evr_id
699 and ap.evr_id = pe.id
700 and ap.name_id = p.name_id
701 and ap.name_id = pn.id
702 and p.id = cp.package_id
703 and cp.channel_id = sc.channel_id
704 and sc.server_id = :server_id
705 UNION
706 -- when no EVR requested, we need to compute the max available
707 -- from the channels the server is subscribed to
708 select
709 pn.name name,
710 max(pevr.evr) evr
711 from
712 rhnActionPackage ap,
713 rhnServerChannel sc,
714 rhnChannelPackage cp,
715 rhnPackage p,
716 rhnPackageEVR pevr,
717 rhnPackageName pn
718 where
719 ap.action_id = :action_id
720 and ap.evr_id is null
721 and ap.name_id = pn.id
722 and ap.name_id = p.name_id
723 and p.evr_id = pevr.id
724 and sc.server_id = :server_id
725 and sc.channel_id = cp.channel_id
726 and cp.package_id = p.id
727 group by pn.name
728 ) pl
729 group by pl.name
730 ) pkglist
731 """
732 h = rhnSQL.prepare(statement)
733 h.execute(action_id=actionId, server_id=self.server_id)
734 ret = h.fetchall_dict() or []
735 packages = []
736 for p in ret:
737
738
739 entry = [p['name'], p['version'], p['release'], '']
740 packages.append(entry)
741 xml = xmlrpclib.dumps((packages,), methodname='client.update_packages')
742 return xml
743
744
745
746 if __name__ == "__main__":
747 print("You can not run this module by itself")
748 q = Queue()
749 sys.exit(-1)
750
751