1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import sys
17 import select
18 import socket
19 import string
20 import SocketServer
21 from random import choice
22 from rhn.connections import idn_ascii_to_puny
23 from spacewalk.common.rhnLog import initLOG, log_debug, log_error
24 from spacewalk.common.rhnConfig import initCFG, CFG
25 from spacewalk.server import rhnSQL
26
27 try:
28 from osad import jabber_lib, dispatcher_client
29 except ImportError:
30 import jabber_lib, dispatcher_client
31
32
33 jabber_lib.log_debug = log_debug
34 jabber_lib.log_error = log_error
35
37 return Runner().main()
38
39 -class Runner(jabber_lib.Runner):
40 client_factory = dispatcher_client.Client
41
42
43
44 _min_sleep = 10
45 _max_sleep = 10
46
55
57 ret = {
58 'jabber_server' : CFG.jabber_server,
59 }
60 return ret
61
62 _query_get_dispatcher_password = """
63 select id, password
64 from rhnPushDispatcher
65 where jabber_id like :jabber_id
66 """
67
68 _update_dispatcher_password = """
69 update rhnPushDispatcher
70 set password = :password_in
71 where id = :id_in
72 """
73
91
93 chars = string.ascii_letters + string.digits
94 return "".join(choice(chars) for x in range(length))
95
124
156
177
179 log_debug(3)
180
181
182 client.retrieve_roster()
183 self.reap_pinged_clients()
184 need_pinging = self._fetch_clients_to_be_pinged()
185 log_debug(4, "Clients to be pinged:", need_pinging)
186 if need_pinging:
187 client.ping_clients(need_pinging)
188 npi = self._next_poll_interval
189
190 rfds, wfds, efds = select.select([client], [client], [], npi)
191
192 npi = self._next_poll_interval = self._poll_interval
193 if client in rfds:
194 log_debug(5, "before process")
195 client.process(timeout=None)
196 log_debug(5, "after process")
197 if wfds:
198
199 log_debug(5,"Notifying jabber nodes")
200 self._notifier.notify_jabber_nodes()
201 else:
202 log_debug(5,"Not notifying jabber nodes")
203
204
205 _query_reap_pinged_clients = rhnSQL.Statement("""
206 update rhnPushClient
207 set state_id = :offline_id
208 where state_id = :online_id
209 and last_ping_time is not null
210 and current_timestamp > next_action_time
211 """)
222
223 _query_fetch_clients_to_be_pinged = rhnSQL.Statement("""
224 select id, name, shared_key, jabber_id
225 from rhnPushClient
226 where state_id = :online_id
227 and last_ping_time is not null
228 and next_action_time is null
229 and jabber_id is not null
230 """)
231 _query_update_clients_to_be_pinged = rhnSQL.Statement("""
232 update rhnPushClient
233 set next_action_time = current_timestamp + numtodsinterval(:delta, 'second')
234 where id = :client_id
235 """)
255
257 if state in self._state_ids:
258 return self._state_ids[state]
259
260 t = rhnSQL.Table('rhnPushClientState', 'label')
261 row = t[state]
262 assert row is not None
263 self._state_ids[state] = row['id']
264 return row['id']
265
266
267 _query_update_register_dispatcher = rhnSQL.Statement("""
268 update rhnPushDispatcher
269 set last_checkin = current_timestamp,
270 hostname = :hostname_in
271 where jabber_id = :jabber_id_in
272 """)
273 _query_insert_register_dispatcher = rhnSQL.Statement("""
274 insert into rhnPushDispatcher
275 (id, jabber_id, last_checkin, hostname, password)
276 values (sequence_nextval('rhn_pushdispatch_id_seq'), :jabber_id_in, current_timestamp,
277 :hostname_in, :password_in)
278 """)
279
287
288 _query_get_client_jids = rhnSQL.Statement("""
289 select jabber_id, TO_CHAR(modified, 'YYYY-MM-DD HH24:MI:SS') modified
290 from rhnPushClient
291 where jabber_id is not null
292 """)
305
306
309 self._next_poll_interval = None
310 self._notify_threshold = CFG.get('notify_threshold')
311
313 return self._next_poll_interval
314
316 self.jabber_connection = jabber_connection
317
324
326 log_debug(3)
327 running_clients = self.get_running_clients()
328 free_slots = 0
329 if self._notify_threshold:
330 free_slots = self._notify_threshold - running_clients
331 log_debug(4, "notify_threshold: %s running_clients: %s free_slots: %s" %
332 (self._notify_threshold, running_clients, free_slots))
333
334 h = rhnSQL.prepare(self._query_get_pending_clients)
335 h.execute()
336 self._next_poll_interval = None
337 notified = []
338
339 while 1:
340 if self._notify_threshold and free_slots <= 0:
341
342 log_debug(4, "max running clients reached; stop notifying")
343 break
344
345 row = h.fetchone_dict()
346 if not row:
347
348 break
349
350 delta = row['delta']
351 if delta > 0:
352
353
354
355
356 npi = self._next_poll_interval or 86400
357 self._next_poll_interval = min(delta, npi)
358 log_debug(4, "Next poll interval", delta)
359 continue
360
361 jabber_id = row['jabber_id']
362 if jabber_id is None:
363
364 continue
365 server_id = row['server_id']
366 if server_id and reboot_in_progress(server_id):
367
368 continue
369
370 if not self.jabber_connection.jid_available(jabber_id):
371 log_debug(4, "Node %s not available for notifications" %
372 jabber_id)
373
374
375 continue
376
377 log_debug(4, "Notifying", jabber_id, row['server_id'])
378 self.jabber_connection.send_message(jabber_id,
379 jabber_lib.NS_RHN_MESSAGE_REQUEST_CHECKIN)
380 if jabber_id not in notified:
381 free_slots -= 1
382 notified.append(jabber_id)
383 rhnSQL.commit()
384
385
386
387
388
389
390
391 _query_get_pending_clients = rhnSQL.Statement("""
392 select a.id, sa.server_id, pc.jabber_id,
393 date_diff_in_days(current_timestamp, earliest_action) * 86400 delta
394 from
395 rhnServerAction sa,
396 rhnAction a,
397 rhnPushClient pc
398 where pc.server_id = sa.server_id
399 and sa.action_id = a.id
400 and sa.status in (0, 1) -- Queued or picked up
401 and not exists (
402 -- This is like saying 'this action has no
403 -- prerequisite or has a prerequisite that has completed
404 -- (status = 2)
405 select 1
406 from rhnServerAction sap
407 where sap.server_id = sa.server_id
408 and sap.action_id = a.prerequisite
409 and sap.status != 2
410 )
411 order by sa.status, earliest_action, sa.server_id
412 """)
413
414 _query_get_running_clients = rhnSQL.Statement("""
415 select count(distinct server_id) clients
416 from rhnServerAction
417 where status = 1 -- picked up
418 """)
419
421 """check for a reboot action for this server in status Picked Up"""
422 h = rhnSQL.prepare("""
423 select 1
424 from rhnServerAction sa
425 join rhnAction a on sa.action_id = a.id
426 join rhnActionType at on a.action_type = at.id
427 where sa.server_id = :server_id
428 and at.label = 'reboot.reboot'
429 and sa.status = 1 -- Picked Up
430 """)
431 h.execute(server_id = server_id)
432 ret = h.fetchone_dict() or None
433 if ret:
434 return True
435 return False
436
437
438 if __name__ == '__main__':
439 sys.exit(main() or 0)
440