Package src :: Module osa_dispatcher
[hide private]
[frames] | no frames]

Source Code for Module src.osa_dispatcher

  1  # 
  2  # Copyright (c) 2008--2017 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  import sys 
 17  import select 
 18  import socket 
 19  import string 
 20  import SocketServer 
 21  from random import choice 
 22  from rhn.connections import idn_ascii_to_puny 
 23  from spacewalk.common.rhnLog import initLOG, log_debug, log_error 
 24  from spacewalk.common.rhnConfig import initCFG, CFG 
 25  from spacewalk.server import rhnSQL 
 26   
 27  try: # python 3 
 28      from osad import jabber_lib, dispatcher_client 
 29  except ImportError: # python 2 
 30      import jabber_lib, dispatcher_client 
 31   
 32  # Override the log functions 
 33  jabber_lib.log_debug = log_debug 
 34  jabber_lib.log_error = log_error 
 35   
36 -def main():
37 return Runner().main()
38
39 -class Runner(jabber_lib.Runner):
40 client_factory = dispatcher_client.Client 41 42 # We want the dispatcher to check in quite often in case the jabberd 43 # connection drops 44 _min_sleep = 10 45 _max_sleep = 10 46
47 - def __init__(self):
48 jabber_lib.Runner.__init__(self) 49 initCFG("osa-dispatcher") 50 self._notifier = Notifier() 51 self._poll_interval = None 52 self._next_poll_interval = None 53 # Cache states 54 self._state_ids = {}
55
56 - def read_config(self):
57 ret = { 58 'jabber_server' : CFG.jabber_server, 59 } 60 return ret
61 62 _query_get_dispatcher_password = """ 63 select id, password 64 from rhnPushDispatcher 65 where jabber_id like :jabber_id 66 """ 67 68 _update_dispatcher_password = """ 69 update rhnPushDispatcher 70 set password = :password_in 71 where id = :id_in 72 """ 73
74 - def get_dispatcher_password(self, username):
75 h = rhnSQL.prepare(self._query_get_dispatcher_password) 76 h.execute(jabber_id = username + "%") 77 ret = h.fetchall_dict() 78 79 if ret and len(ret) == 1: 80 if ret[0]['password']: 81 return ret[0]['password'] 82 else: 83 # Upgrade Spacewalk 1.5 -> 1.6: the dispatcher row exists, 84 # we just need to generate and save the password. 85 self._password = self.create_dispatcher_password(32) 86 u = rhnSQL.prepare(self._update_dispatcher_password) 87 u.execute(password_in = self._password, id_in = ret[0]['id']) 88 return self._password 89 else: 90 return None
91
92 - def create_dispatcher_password(self, length):
93 chars = string.ascii_letters + string.digits 94 return "".join(choice(chars) for x in range(length))
95
96 - def setup_config(self, config):
97 # Figure out the log level 98 debug_level = self.options.verbose 99 if debug_level is None: 100 debug_level = CFG.debug 101 self.debug_level = debug_level 102 initLOG(level=debug_level, log_file=CFG.log_file) 103 104 # Get the ssl cert 105 ssl_cert = CFG.osa_ssl_cert 106 try: 107 self.check_cert(ssl_cert) 108 except jabber_lib.InvalidCertError: 109 e = sys.exc_info()[1] 110 log_error("Invalid SSL certificate:", e) 111 return 1 112 113 self.ssl_cert = ssl_cert 114 115 rhnSQL.initDB() 116 117 self._username = 'rhn-dispatcher-sat' 118 self._password = self.get_dispatcher_password(self._username) 119 if not self._password: 120 self._password = self.create_dispatcher_password(32) 121 self._resource = 'superclient' 122 js = config.get('jabber_server') 123 self._jabber_servers = [ idn_ascii_to_puny(js) ]
124
125 - def fix_connection(self, c):
126 "After setting up the connection, do whatever else is necessary" 127 self._notifier.set_jabber_connection(c) 128 129 self._poll_interval = CFG.poll_interval 130 self._next_poll_interval = self._poll_interval 131 132 if self._jabber_servers and self._jabber_servers[0]: 133 hostname = self._jabber_servers[0] 134 else: 135 hostname = socket.gethostname() 136 137 self._register_dispatcher(c.jid, hostname) 138 139 c.retrieve_roster() 140 log_debug(4, "Subscribed to", c._roster.get_subscribed_to()) 141 log_debug(4, "Subscribed from", c._roster.get_subscribed_from()) 142 log_debug(4, "Subscribed both", c._roster.get_subscribed_both()) 143 144 client_jids = self._get_client_jids() 145 client_jids = [x[0] for x in client_jids] 146 147 # self-healing no longer works correctly since we blow away jabberd's 148 # db on restart. Instead try to resubscribe jabberd to active jids manually. 149 c.subscribe_to_presence(client_jids) 150 151 # Unsubscribe the dispatcher from any client jid that no longer exists 152 self.cleanup_roster(c, client_jids) 153 154 c.send_presence() 155 return c
156
157 - def cleanup_roster(self, client, active_jids):
158 roster = client._roster 159 active_stripped_jids = {} 160 for jid in active_jids: 161 stripped_jid = jabber_lib.strip_resource(jid) 162 stripped_jid = str(stripped_jid) 163 active_stripped_jids[stripped_jid] = None 164 165 roster_jids = roster.get_subscribed_to() 166 roster_jids.update(roster.get_subscribed_from()) 167 roster_jids.update(roster.get_subscribed_both()) 168 169 to_remove = [] 170 for jid in roster_jids.keys(): 171 stripped_jid = jabber_lib.strip_resource(jid) 172 stripped_jid = str(stripped_jid) 173 if stripped_jid not in active_stripped_jids: 174 to_remove.append(stripped_jid) 175 176 client.cancel_subscription(to_remove)
177
178 - def process_once(self, client):
179 log_debug(3) 180 # First, clean up the nodes that have been pinged and have not 181 # responded 182 client.retrieve_roster() 183 self.reap_pinged_clients() 184 need_pinging = self._fetch_clients_to_be_pinged() 185 log_debug(4, "Clients to be pinged:", need_pinging) 186 if need_pinging: 187 client.ping_clients(need_pinging) 188 npi = self._next_poll_interval 189 190 rfds, wfds, efds = select.select([client], [client], [], npi) 191 # Reset the next poll interval 192 npi = self._next_poll_interval = self._poll_interval 193 if client in rfds: 194 log_debug(5, "before process") 195 client.process(timeout=None) 196 log_debug(5, "after process") 197 if wfds: 198 # Timeout 199 log_debug(5,"Notifying jabber nodes") 200 self._notifier.notify_jabber_nodes() 201 else: 202 log_debug(5,"Not notifying jabber nodes")
203 204 205 _query_reap_pinged_clients = rhnSQL.Statement(""" 206 update rhnPushClient 207 set state_id = :offline_id 208 where state_id = :online_id 209 and last_ping_time is not null 210 and current_timestamp > next_action_time 211 """)
212 - def reap_pinged_clients(self):
213 # Get the online and offline ids 214 online_id = self._get_push_state_id('online') 215 offline_id = self._get_push_state_id('offline') 216 217 h = rhnSQL.prepare(self._query_reap_pinged_clients) 218 ret = h.execute(online_id=online_id, offline_id=offline_id) 219 if ret: 220 # We have changed something 221 rhnSQL.commit()
222 223 _query_fetch_clients_to_be_pinged = rhnSQL.Statement(""" 224 select id, name, shared_key, jabber_id 225 from rhnPushClient 226 where state_id = :online_id 227 and last_ping_time is not null 228 and next_action_time is null 229 and jabber_id is not null 230 """) 231 _query_update_clients_to_be_pinged = rhnSQL.Statement(""" 232 update rhnPushClient 233 set next_action_time = current_timestamp + numtodsinterval(:delta, 'second') 234 where id = :client_id 235 """)
237 online_id = self._get_push_state_id('online') 238 h = rhnSQL.prepare(self._query_fetch_clients_to_be_pinged) 239 h.execute(online_id=online_id) 240 clients = h.fetchall_dict() or [] 241 rhnSQL.commit() 242 if not clients: 243 # Nothing to do 244 return 245 246 # XXX Need config option 247 delta = 20 248 249 client_ids = [x['id'] for x in clients] 250 deltas = [ delta ] * len(client_ids) 251 h = rhnSQL.prepare(self._query_update_clients_to_be_pinged) 252 h.executemany(client_id=client_ids, delta=deltas) 253 rhnSQL.commit() 254 return clients
255
256 - def _get_push_state_id(self, state):
257 if state in self._state_ids: 258 return self._state_ids[state] 259 260 t = rhnSQL.Table('rhnPushClientState', 'label') 261 row = t[state] 262 assert row is not None 263 self._state_ids[state] = row['id'] 264 return row['id']
265 266 267 _query_update_register_dispatcher = rhnSQL.Statement(""" 268 update rhnPushDispatcher 269 set last_checkin = current_timestamp, 270 hostname = :hostname_in 271 where jabber_id = :jabber_id_in 272 """) 273 _query_insert_register_dispatcher = rhnSQL.Statement(""" 274 insert into rhnPushDispatcher 275 (id, jabber_id, last_checkin, hostname, password) 276 values (sequence_nextval('rhn_pushdispatch_id_seq'), :jabber_id_in, current_timestamp, 277 :hostname_in, :password_in) 278 """) 279
280 - def _register_dispatcher(self, jabber_id, hostname):
281 h = rhnSQL.prepare(self._query_update_register_dispatcher) 282 rowcount = h.execute(jabber_id_in=jabber_id, hostname_in=hostname, password_in=self._password) 283 if not rowcount: 284 h = rhnSQL.prepare(self._query_insert_register_dispatcher) 285 h.execute(jabber_id_in=jabber_id, hostname_in=hostname, password_in=self._password) 286 rhnSQL.commit()
287 288 _query_get_client_jids = rhnSQL.Statement(""" 289 select jabber_id, TO_CHAR(modified, 'YYYY-MM-DD HH24:MI:SS') modified 290 from rhnPushClient 291 where jabber_id is not null 292 """)
293 - def _get_client_jids(self):
294 h = rhnSQL.prepare(self._query_get_client_jids) 295 h.execute() 296 ret = [] 297 while 1: 298 row = h.fetchone_dict() 299 if not row: 300 break 301 # Save the modified time too - we don't want to mark as offline 302 # clients that just checked in 303 ret.append((row['jabber_id'], row['modified'])) 304 return ret
305 306
307 -class Notifier:
308 - def __init__(self):
309 self._next_poll_interval = None 310 self._notify_threshold = CFG.get('notify_threshold')
311
312 - def get_next_poll_interval(self):
313 return self._next_poll_interval
314
315 - def set_jabber_connection(self, jabber_connection):
316 self.jabber_connection = jabber_connection
317
318 - def get_running_clients(self):
319 log_debug(3) 320 h = rhnSQL.prepare(self._query_get_running_clients) 321 h.execute() 322 row = h.fetchone_dict() or {} 323 return int(row.get("clients", 0))
324
325 - def notify_jabber_nodes(self):
326 log_debug(3) 327 running_clients = self.get_running_clients() 328 free_slots = 0 329 if self._notify_threshold: 330 free_slots = self._notify_threshold - running_clients 331 log_debug(4, "notify_threshold: %s running_clients: %s free_slots: %s" % 332 (self._notify_threshold, running_clients, free_slots)) 333 334 h = rhnSQL.prepare(self._query_get_pending_clients) 335 h.execute() 336 self._next_poll_interval = None 337 notified = [] 338 339 while 1: 340 if self._notify_threshold and free_slots <= 0: 341 # End of loop 342 log_debug(4, "max running clients reached; stop notifying") 343 break 344 345 row = h.fetchone_dict() 346 if not row: 347 # End of loop 348 break 349 350 delta = row['delta'] 351 if delta > 0: 352 # Set the next poll interval to something large if it was not 353 # previously set before; this way min() will pick up this 354 # delta, but we don't have to special-case the first delta we 355 # find 356 npi = self._next_poll_interval or 86400 357 self._next_poll_interval = min(delta, npi) 358 log_debug(4, "Next poll interval", delta) 359 continue 360 361 jabber_id = row['jabber_id'] 362 if jabber_id is None: 363 # Not even online 364 continue 365 server_id = row['server_id'] 366 if server_id and reboot_in_progress(server_id): 367 # don't call when a reboot is in progress 368 continue 369 370 if not self.jabber_connection.jid_available(jabber_id): 371 log_debug(4, "Node %s not available for notifications" % 372 jabber_id) 373 # iterate further, in case there are other clients that 374 # CAN be notified. 375 continue 376 377 log_debug(4, "Notifying", jabber_id, row['server_id']) 378 self.jabber_connection.send_message(jabber_id, 379 jabber_lib.NS_RHN_MESSAGE_REQUEST_CHECKIN) 380 if jabber_id not in notified: 381 free_slots -= 1 382 notified.append(jabber_id) 383 rhnSQL.commit()
384 385 # We need to drive this query by rhnPushClient since it's substantially 386 # smaller than rhnAction 387 # order by status, earliest_action, server_id to get 388 # "Queued" first with earliest_action first. If multiple clients have the 389 # same values, finally order by server_id to get a defined order 390 # important for notify_threshold 391 _query_get_pending_clients = rhnSQL.Statement(""" 392 select a.id, sa.server_id, pc.jabber_id, 393 date_diff_in_days(current_timestamp, earliest_action) * 86400 delta 394 from 395 rhnServerAction sa, 396 rhnAction a, 397 rhnPushClient pc 398 where pc.server_id = sa.server_id 399 and sa.action_id = a.id 400 and sa.status in (0, 1) -- Queued or picked up 401 and not exists ( 402 -- This is like saying 'this action has no 403 -- prerequisite or has a prerequisite that has completed 404 -- (status = 2) 405 select 1 406 from rhnServerAction sap 407 where sap.server_id = sa.server_id 408 and sap.action_id = a.prerequisite 409 and sap.status != 2 410 ) 411 order by sa.status, earliest_action, sa.server_id 412 """) 413 414 _query_get_running_clients = rhnSQL.Statement(""" 415 select count(distinct server_id) clients 416 from rhnServerAction 417 where status = 1 -- picked up 418 """)
419
420 -def reboot_in_progress(server_id):
421 """check for a reboot action for this server in status Picked Up""" 422 h = rhnSQL.prepare(""" 423 select 1 424 from rhnServerAction sa 425 join rhnAction a on sa.action_id = a.id 426 join rhnActionType at on a.action_type = at.id 427 where sa.server_id = :server_id 428 and at.label = 'reboot.reboot' 429 and sa.status = 1 -- Picked Up 430 """) 431 h.execute(server_id = server_id) 432 ret = h.fetchone_dict() or None 433 if ret: 434 return True 435 return False
436 437 438 if __name__ == '__main__': 439 sys.exit(main() or 0) 440