Package src :: Module jabber_lib
[hide private]
[frames] | no frames]

Source Code for Module src.jabber_lib

   1  # 
   2  # Copyright (c) 2008--2018 Red Hat, Inc. 
   3  # 
   4  # This software is licensed to you under the GNU General Public License, 
   5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
   6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
   7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
   8  # along with this software; if not, see 
   9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
  10  # 
  11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
  12  # granted to use or replicate Red Hat trademarks that are incorporated 
  13  # in this software or its documentation. 
  14  # 
  15   
  16  import os 
  17  import hashlib 
  18  import sys 
  19  import time 
  20  import select 
  21  import socket 
  22  import random 
  23  import fnmatch 
  24  from optparse import OptionParser, Option 
  25  import traceback 
  26  from rhn import SSL 
  27   
  28  try: # python 2 
  29      from rhn_log import log_debug, log_error 
  30  except ImportError: # python 3 
  31      from osad.rhn_log import log_debug, log_error 
  32   
  33  from spacewalk.common.usix import raise_with_tb 
  34  from rhn.i18n import bstr 
  35   
  36  import warnings 
  37  try: 
  38      warnings.filterwarnings("ignore", category=DeprecationWarning) 
  39      import jabber 
  40  finally: 
  41      warnings.resetwarnings() 
  42   
  43  NS_RHN = "http://jabber.rhn.redhat.com/jabber" 
  44  NS_RHN_SIGNED = "%s/signed" % NS_RHN 
  45  NS_RHN_PRESENCE_SUBSCRIBE = "%s/presence/subscribe" % NS_RHN 
  46  NS_RHN_PRESENCE_SUBSCRIBED = "%s/presence/subscribed" % NS_RHN 
  47  NS_RHN_PRESENCE_UNSUBSCRIBE = "%s/presence/unsubscribe" % NS_RHN 
  48  NS_RHN_MESSAGE_REQUEST_CHECKIN = "%s/message/request/checkin" % NS_RHN 
  49  NS_RHN_MESSAGE_RESPONSE_CHECKIN = "%s/message/response/checkin" % NS_RHN 
  50  NS_RHN_MESSAGE_REQUEST_PING = "%s/message/request/ping" % NS_RHN 
  51  NS_RHN_MESSAGE_RESPONSE_PING = "%s/message/response/ping" % NS_RHN 
  52   
  53  NS_STARTTLS = 'urn:ietf:params:xml:ns:xmpp-tls' 
  54  NS_STANZAS = "urn:ietf:params:xml:ns:xmpp-stanzas" 
  55   
  56  # The class that starts everything 
57 -class Runner(object):
58 option_parser = OptionParser 59 option = Option 60 61 client_factory = None 62 63 # How often will we try to reconnect. We want this randomized, so not all 64 # clients hit the server at the same time 65 _min_sleep = 60 66 _max_sleep = 90 67
68 - def __init__(self):
69 self.options_table = [ 70 self.option("-v", "--verbose", action="count", 71 help="Increase verbosity"), 72 self.option('-N', "--nodetach", action="store_true", 73 help="Suppress backgrounding and detachment of the process"), 74 self.option('--pid-file', action="store", 75 help="Write to this PID file"), 76 self.option('--logfile', action="store", 77 help="Write log information to this file"), 78 ] 79 80 self.ssl_cert = None 81 self.debug_level = 0 82 self._jabber_servers = [] 83 self._connected_jabber_server = None 84 self._username = None 85 self._password = None 86 self._resource = None 87 self._in_background = 0 88 self._use_proxy = 0
89
90 - def process_cli_options(self):
91 "Process command line options" 92 self._parser = self.option_parser(option_list=self.options_table) 93 self.options, self.args = self._parser.parse_args()
94
95 - def main(self):
96 """Method that starts up everything 97 - processes command line options 98 - big loop to reconnect if necessary 99 - read config 100 - setup config 101 - setup jabber connection 102 - process requests 103 """ 104 self.process_cli_options() 105 force_setup = 0 106 no_fork = None 107 while 1: 108 # First time around? 109 if no_fork is None: 110 # Yes, we may be forking 111 no_fork=0 112 else: 113 # Been here before, no need to fork anymore 114 no_fork=1 115 116 try: 117 config = self.read_config() 118 if force_setup: 119 log_debug(2,"###Forcing setup") 120 self.setup_config(config, 1) 121 force_setup = 0 122 else: 123 self.setup_config(config) 124 c = self.setup_connection(no_fork=no_fork) 125 self.fix_connection(c) 126 self.process_forever(c) 127 except KeyboardInterrupt: 128 try: 129 c.disconnect() 130 except: 131 pass 132 sys.exit(0) 133 except SystemExit: 134 raise 135 except RestartRequested: 136 e = sys.exc_info()[1] 137 log_error("Restart requested", e) 138 if not self.is_in_background(): 139 self.push_to_background() 140 continue 141 except NeedRestart: 142 e = sys.exc_info()[1] 143 log_debug(3, "Need Restart") 144 force_setup = 1 145 continue 146 except JabberConnectionError: 147 time_to_sleep = random.randint(self._min_sleep, self._max_sleep) 148 log_debug(0, "Unable to connect to jabber servers, sleeping" 149 " %s seconds" % time_to_sleep) 150 if not self.is_in_background(): 151 self.push_to_background() 152 try: 153 time.sleep(time_to_sleep) 154 except KeyboardInterrupt: 155 sys.exit(0) 156 except InvalidCertError: 157 e = sys.exc_info()[1] 158 log_error("Invalid Cert Error:") 159 raise 160 except: 161 # Print traceback 162 log_error("Error caught:") 163 log_error(extract_traceback()) 164 time_to_sleep = random.randint(self._min_sleep, self._max_sleep) 165 log_debug(3, "Sleeping", time_to_sleep, "seconds") 166 if not self.is_in_background(): 167 self.push_to_background() 168 try: 169 time.sleep(time_to_sleep) 170 except KeyboardInterrupt: 171 sys.exit(0)
172
173 - def fix_connection(self, client):
174 "After setting up the connection, do whatever else is necessary" 175 return client
176
177 - def preprocess_once(self, client):
178 return client
179
180 - def process_forever(self, client):
181 """Big loop to process requests 182 """ 183 log_debug(1) 184 self.preprocess_once(client) 185 while 1: 186 try: 187 self.process_once(client) 188 # random sleep so we don't kill CPU performance, bz 222988 189 time_to_sleep = random.randint(6, 10) 190 time.sleep(time_to_sleep) 191 except KeyboardInterrupt: 192 # CTRL+C 193 client.disconnect() 194 sys.exit(0) 195 except: 196 # XXX to be refined later 197 raise
198
199 - def process_once(self, client):
200 "To be overridden in a client class" 201 raise NotImplementedError
202
203 - def setup_config(self, config):
204 pass
205
206 - def read_config(self):
207 return {}
208
209 - def is_in_background(self):
210 return self._in_background
211
212 - def push_to_background(self):
213 if not self.options.nodetach: 214 # Detach and push to the background 215 push_to_background() 216 self._in_background = 1 217 218 pid_file = self.options.pid_file 219 if pid_file: 220 try: 221 os.unlink(pid_file) 222 except OSError: 223 e = sys.exc_info()[1] 224 if e.errno != 2: 225 raise 226 try: 227 # Make sure we don't create the file world-writable (#162619) 228 fd = os.open(pid_file, os.O_WRONLY| os.O_APPEND | os.O_CREAT, int("0644", 8)) 229 os.write(fd, bstr("%d" % os.getpid())) 230 os.write(fd, bstr("\n")) 231 os.close(fd) 232 except OSError: 233 pass
234
235 - def check_cert(self, cert):
236 return check_cert(cert)
237
238 - def print_message(self, js, e):
239 log_debug(1, e) 240 log_debug(1, "Could not connect to jabber server", js)
241
242 - def setup_connection(self, no_fork=0):
243 """ 244 - initializes a Jabber connection (by instantiating a Jabber client) 245 - if necessary, pushes to background 246 - authentication and resource binding (by calling start()) 247 248 Possible causes for this function to return None: 249 - jabber server is not started 250 - jabber server is started but did not initialize SSL just yet 251 252 This function will kill the process with exit code 1 if the SSL 253 handshake failed (an indication of a mismatching CA cert). We do this 254 so starting the program as a daemon to fail if this happens. Of 255 course, if the server is down and the CA cert is bad, then the daemon 256 will start but will silently fail afterwards; the error log should 257 have a traceback though. 258 """ 259 for js in self._jabber_servers: 260 261 log_debug(3, "Connecting to", js) 262 263 try: 264 c = self._get_jabber_client(js) 265 log_debug(1, "Connected to jabber server", js) 266 self._connected_jabber_server = js 267 break 268 except SSLHandshakeError: 269 # Error doing the handshake - this is a permanent error 270 sys.exit(1) 271 except socket.error: 272 e = sys.exc_info()[1] 273 self.print_message(js, "socket error") 274 log_error(extract_traceback()) 275 continue 276 except JabberError: 277 e = sys.exc_info()[1] 278 self.print_message(js, "JabberError") 279 log_error(extract_traceback()) 280 continue 281 except SSLError: 282 e = sys.exc_info()[1] 283 self.print_message(js, "SSLError") 284 log_error(extract_traceback()) 285 continue 286 else: 287 # Ran out of Jabber servers to try 288 # Could not connect to any servers 289 log_debug(1, "Could not connect to any jabber server") 290 # Make sure we push to background at this point, we don't want the 291 # service to block at startup 292 if not no_fork: 293 self.push_to_background() 294 raise JabberConnectionError 295 296 # If we got to this point, we have a connection set up 297 if not no_fork: 298 self.push_to_background() 299 300 # Autentication and resource binding 301 c.start(username=self._username, password=self._password, 302 resource=self._resource) 303 304 # Register callbacks 305 c.custom_handler.register_callback(c._presence_callback, 'presence') 306 c.custom_handler.register_callback(c._message_callback, 'message') 307 c.custom_handler.register_callback(self._error_callback, 'error') 308 return c
309
310 - def _get_jabber_client(self, jabber_server):
311 """Returns a connected Jabber client, or raises an exception if it was 312 unable to connect""" 313 log_debug(3) 314 arr = jabber_server.split(':', 1) 315 jabber_server = arr[0] 316 cf = self.read_config() 317 318 jabberpy_proxy_dict = None 319 if self._use_proxy and 'proxy_url' in cf: 320 jabberpy_proxy_dict = {'host': cf['proxy_url'].split(':')[0], 321 'port': int(cf['proxy_url'].split(':')[1])} 322 if cf['enable_proxy_auth']: 323 jabberpy_proxy_dict['user'] = cf['proxy_user']; 324 jabberpy_proxy_dict['password'] = cf['proxy_password']; 325 326 if len(arr) == 2: 327 jabber_port = int(arr[1]) 328 log_debug(2, "Connecting to", jabber_server, jabber_port) 329 c = self.client_factory(jabber_server, jabber_port, proxy=jabberpy_proxy_dict) 330 else: 331 log_debug(2, "Connecting to", jabber_server) 332 c = self.client_factory(jabber_server, proxy=jabberpy_proxy_dict) 333 334 c.debug_level = self.debug_level 335 c.add_trusted_cert(self.ssl_cert) 336 c.connect() 337 return c
338
339 - def _error_callback(self, client, stanza):
340 """Logs error stanza messages for diagnostic purposes""" 341 log_error("Received an error stanza: ", stanza) 342 for kid in stanza.kids: 343 if kid.getName() == "conflict": 344 log_error("Received an conflict. Restarting with new credentials.") 345 raise NeedRestart
346
347 -class InvalidCertError(SSL.SSL.Error):
348 - def __str__(self):
349 return " ".join(self.args)
350 __repr__ = __str__
351
352 -def check_cert(cert_path):
353 if cert_path is None: 354 raise InvalidCertError("Cannot pass None as a certificate path") 355 try: 356 cert = open(cert_path).read() 357 except IOError: 358 raise_with_tb(InvalidCertError("Unable to read file", cert_path), sys.exc_info()[2]) 359 try: 360 x509 = SSL.crypto.load_certificate(SSL.crypto.FILETYPE_PEM, cert) 361 except SSL.crypto.Error: 362 raise_with_tb(InvalidCertError("Unable to open certificate", cert_path), sys.exc_info()[2]) 363 log_debug(4, "Loading cert", x509.get_subject()) 364 if x509.has_expired(): 365 raise InvalidCertError("Expired certificate", cert_path)
366
367 -def sign(secret_key, *values):
368 h = hashlib.new('sha1', bstr(secret_key)).hexdigest() 369 for v in values: 370 h = hashlib.new('sha1', bstr("%s%s" % (h, v))).hexdigest() 371 return h
372 373 # getAttr is braindead, rewrite it
374 -class JabberProtocolNode(jabber.Protocol):
375 - def getAttr(self, key):
376 return self.attrs.get(key, None)
377
378 -class JabberIqNode(jabber.Iq, JabberProtocolNode):
379 getAttr = JabberProtocolNode.getAttr
380
381 -class JabberMessageNode(jabber.Message, JabberProtocolNode):
382 getAttr = JabberProtocolNode.getAttr
383
384 -class JabberPresenceNode(jabber.Presence, JabberProtocolNode):
385 getAttr = JabberProtocolNode.getAttr
386
387 -class Handlers:
388 - def __init__(self):
389 log_debug(3) 390 self._handlers = {}
391 392
393 - def dispatch(self, client, stanza):
394 log_debug(5, stanza) 395 396 self.cleanup_expired_callbacks() 397 398 callbacks = self._get_callbacks(stanza) 399 if not callbacks: 400 log_debug(4, "Unhandled stanza", stanza) 401 return 402 for callback in callbacks: 403 log_debug(6, "Calling callback", callback, stanza) 404 callback(client, stanza)
405
406 - def _get_callbacks(self, stanza):
407 log_debug(5, stanza) 408 stanza_name = stanza.getName() 409 if stanza_name not in self._handlers: 410 return [] 411 stanza_id = stanza.getID() 412 stanza_ns = stanza.getNamespace() 413 result = {} 414 (h_idns, h_id, h_ns, l_def) = self._handlers[stanza_name] 415 416 if stanza_id is not None and stanza_ns: 417 cbs = h_idns.get((stanza_id, stanza_ns), []) 418 self._get_callbacks_from_list(cbs, result) 419 if stanza_id is not None: 420 cbs = h_id.get(stanza_id, []) 421 self._get_callbacks_from_list(cbs, result) 422 if stanza_ns: 423 cbs = h_ns.get(stanza_ns, []) 424 self._get_callbacks_from_list(cbs, result) 425 self._get_callbacks_from_list(l_def, result) 426 return list(result.keys())
427
428 - def _get_callbacks_from_list(self, l, result_hash):
429 for ent in l: 430 (callback, expiry, usage_count) = ent[:3] 431 if usage_count is None or usage_count >= 1: 432 result_hash[callback] = None 433 if usage_count is None: 434 # We're done here 435 continue 436 437 usage_count = usage_count - 1 438 if usage_count <= 0: 439 # Expired 440 l.remove(ent) 441 continue 442 # Update the usage count 443 ent[2] = usage_count - 1
444
445 - def register_callback(self, callback, stanza_name, stanza_id=None, 446 stanza_ns=None, timeout=None, usage_count=None):
447 log_debug(3, callback, stanza_name, stanza_id, stanza_ns, timeout, 448 usage_count) 449 if timeout: 450 expiry = time.time() + timeout 451 else: 452 expiry = None 453 callback_entry = [callback, expiry, usage_count] 454 h_idns, h_id, h_ns, l_def = self._get_from_hash(self._handlers, 455 stanza_name, default_value=({}, {}, {}, [])) 456 # h_id is for all the callbacks we should call for a particular stanza 457 # id; h_ns is for namespaces 458 if stanza_id is not None and stanza_ns: 459 l = self._get_from_hash(h_idns, (stanza_id, stanza_ns), []) 460 l.append(callback_entry) 461 return 462 463 if stanza_id is not None: 464 l = self._get_from_hash(h_id, stanza_id, []) 465 l.append(callback_entry) 466 return 467 468 if stanza_ns: 469 l = self._get_from_hash(h_ns, stanza_ns, []) 470 l.append(callback_entry) 471 return 472 473 # Default callback 474 l_def.append(callback_entry)
475
476 - def _get_from_hash(self, h, key, default_value):
477 if key in h: 478 val = h[key] 479 else: 480 val = h[key] = default_value 481 return val
482
483 - def cleanup_expired_callbacks(self):
484 log_debug(5) 485 now = time.time() 486 for stanza_name, vals in self._handlers.items(): 487 h_idns, h_id, h_ns, l_def = vals 488 for h in (h_idns, h_id, h_ns): 489 self._expire_callbacks_hash(h, now) 490 self._expire_callbacks_list(l_def, now)
491
492 - def _expire_callbacks_hash(self, h, now):
493 log_debug(6, now) 494 for key, vals in h.items(): 495 self._expire_callbacks_list(vals, now)
496
497 - def _expire_callbacks_list(self, vals, now):
498 log_debug(7, vals, now) 499 for val in vals: 500 (callback, expiry, usage_count) = val 501 if not expiry: 502 continue 503 if now <= expiry: 504 # Fresh 505 continue 506 # Callback is stale 507 vals.remove(val)
508
509 -def my_debug(*args):
510 print("Debugging:", args)
511
512 -class RestartRequested(Exception):
513 pass
514
515 -class JabberError(Exception):
516 pass
517
518 -class NeedRestart(Exception):
519 pass
520
521 -class TimeoutError(JabberError):
522 pass
523
524 -class SSLError(Exception):
525 "Raised when a lower-level SSL error is caught" 526 pass
527
528 -class SSLHandshakeError(SSLError):
529 "Raised when the SSL handshake failed" 530 pass
531
532 -class SSLDisabledError(SSLError):
533 "Raised if the server does not support SSL" 534 pass
535
536 -class JabberConnectionError(Exception):
537 "Raised when we were unable to make a jabber connection" 538 pass
539
540 -class JabberQualifiedError(JabberError):
541 - def __init__(self, errcode, err, *args):
542 self.errcode = errcode 543 self.err = err 544 JabberError.__init__(self, *args)
545
546 - def __repr__(self):
547 return "<%s instance at %s; errcode=%s; err=%s>" % ( 548 self.__class__.__name__, id(self), self.errcode, self.err)
549 550 __str__ = __repr__
551
552 -class JabberClient(jabber.Client, object):
553 _seq = 0 554 BLOCK_SIZE = jabber.xmlstream.BLOCK_SIZE 555
556 - def __init__(self, *args, **kwargs):
557 log_debug(1) 558 jabber.Client.__init__(self, *args, **kwargs) 559 self.jid = None 560 # Lots of magic to add the nodes into a queue 561 self._incoming_node_queue = [] 562 563 self.debug_level = 0 564 self.trusted_certs = [] 565 566 self.registerProtocol('unknown', JabberProtocolNode) 567 self.registerProtocol('iq', JabberIqNode) 568 self.registerProtocol('message', JabberMessageNode) 569 self.registerProtocol('presence', JabberPresenceNode) 570 self.registerProtocol('error', JabberProtocolNode) 571 572 self.registerHandler('iq', self._expectedIqHandler, system=True) 573 self.registerHandler('iq', self._IqRegisterResult, 'result', 574 jabber.NS_REGISTER, system=True) 575 576 h = Handlers() 577 self.custom_handler = h 578 self.registerHandler('presence', h.dispatch) 579 self.registerHandler('iq', h.dispatch) 580 self.registerHandler('message', h.dispatch) 581 self.registerHandler('error', h.dispatch) 582 583 self._non_ssl_sock = None 584 self._roster = Roster() 585 586 self._uniq_client_string = generate_random_string(6)
587
588 - def add_trusted_cert(self, trusted_cert):
589 check_cert(trusted_cert) 590 self.trusted_certs.append(trusted_cert)
591
592 - def connect(self):
593 log_debug(2) 594 if not self.trusted_certs: 595 raise SSLVerifyError("No trusted certs added") 596 597 # Use our own dispatcher - we need to be able to read one stanza at 598 # the time 599 self.dispatch = self._auth_dispatch 600 601 log_debug(5, "Attempting to connect") 602 603 for retry in range(0,3): 604 try: 605 jabber.Client.connect(self) 606 except socket.error: 607 e = sys.exc_info()[1] 608 log_error("Error connecting to jabber server: %s. " 609 "See https://access.redhat.com/solutions/327903 for more information. " % e) 610 raise socket.error(e) 611 612 log_debug(5, "Connected") 613 614 # From the XMPP Core Internet Draft: 615 # server advertises <features><starttls /></features> 616 # client sends back <starttls /> 617 # server responds with <proceed /> 618 619 # Wait for a stanza 620 stanza = self.get_one_stanza() 621 622 log_debug(5, "Expecting features stanza, got:", stanza) 623 if stanza.getName() != 'features': 624 log_debug(1, "Server did not return a <features /> stanza," 625 " reconnecting") 626 self.disconnect() 627 time.sleep(1) 628 else: 629 break 630 else: 631 log_error("Not able to reconnect - " 632 "See https://access.redhat.com/solutions/45332 for possible solutions.\n") 633 raise SSLDisabledError 634 635 starttls_node = stanza.getTag('starttls') 636 log_debug(5, "starttls node", starttls_node) 637 if starttls_node is None: 638 log_error("Server does not support TLS - <starttls /> " 639 "not in <features /> stanza") 640 self.disconnect() 641 raise SSLDisabledError 642 643 # Initiate the TLS stream 644 self.write("<starttls xmlns='%s' />" % NS_STARTTLS) 645 646 stanza = self.get_one_stanza() 647 log_debug(5, "Expecting proceed stanza, got:", stanza) 648 if stanza.getName() != 'proceed': 649 log_error("Server broke TLS negotiation - <proceed /> not sent") 650 self.disconnect() 651 raise SSLDisabledError 652 653 log_debug(4, "Preparing for TLS handshake") 654 ssl = SSLSocket(self._sock, trusted_certs=self.trusted_certs) 655 ssl.ssl_verify_callback = self.ssl_verify_callback 656 ssl.init_ssl() 657 # Explicitly perform the SSL handshake 658 try: 659 ssl.do_handshake() 660 self.verify_peer(ssl) 661 except SSL.SSL.Error: 662 # Error in the SSL handshake - most likely mismatching CA cert 663 log_error("Traceback caught:") 664 log_error(extract_traceback()) 665 raise_with_tb(SSLHandshakeError(), sys.exc_info()[2]) 666 667 # Re-init the parsers 668 jabber.xmlstream.Stream.connect(self) 669 670 # Now replace the socket with the ssl object's connection 671 self._non_ssl_sock = self._sock 672 self._sock = ssl._connection 673 674 # jabber.py has copies of _read, _write, _reader - those have to 675 # be re-initialized as well 676 self._setupComms() 677 678 # Send the header again 679 self.send(self._header_string()) 680 681 # Read the server's open stream tag 682 self.process() 683 684 stanza = self.get_one_stanza() 685 686 if stanza.getName() != 'features': 687 self.disconnect() 688 raise Exception("Server did not pass any features?") 689 690 # Now replace the dispatcher 691 self.dispatch = self._orig_dispatch 692 log_debug(5, "connect returning")
693
694 - def disconnect(self):
695 try: 696 jabber.Client.disconnect(self) 697 except SSL.SSL.Error: 698 pass
699
700 - def _setupComms(self):
701 # We pretty much only support TCP connections 702 self._read = self._sock.recv 703 if hasattr(self._sock, 'sendall'): 704 self._write = self._sock.sendall 705 else: 706 self._write = Sendall(self._sock).sendall 707 self._reader = self._sock
708
709 - def ssl_verify_callback(self, conn, cert, errnum, depth, ok):
710 log_debug(4, "Called", errnum, depth, ok) 711 if not ok: 712 log_error("SSL certificate verification failed") 713 self.write("</stream:stream>") 714 conn.close() 715 self._sock.close() 716 return ok 717 718 return ok
719
720 - def verify_peer(self, ssl):
721 cert = ssl.get_peer_certificate() 722 if cert is None: 723 raise SSLVerifyError("Unable to retrieve peer cert") 724 725 subject = cert.get_subject() 726 if not hasattr(subject, 'CN'): 727 raise SSLVerifyError("Certificate has no Common Name") 728 729 common_name = subject.CN 730 731 # Add a trailing . since foo.example.com. is equal to foo.example.com 732 # This also catches non-FQDNs 733 if common_name[-1] != '.': 734 common_name = common_name + '.' 735 hdot = self._host 736 if hdot[-1] != '.': 737 hdot = hdot + '.' 738 739 if common_name != hdot and not fnmatch.fnmatchcase(hdot, common_name): 740 raise SSLVerifyError("Mismatch: peer name: %s; common name: %s" % 741 (self._host, common_name))
742
743 - def retrieve_roster(self):
744 """Request the roster. Will register the roster callback, 745 but the call will wait for the roster to be properly populated""" 746 # Register the roster callback 747 self.custom_handler.register_callback(self._roster_callback, 'iq') 748 iq_node_id = 'iq-request-%s' % self.get_unique_id() 749 iq_node = JabberIqNode(type="get") 750 iq_node.setQuery(jabber.NS_ROSTER) 751 iq_node.setID(iq_node_id) 752 self.send(iq_node) 753 754 stanza = None 755 # Wait for an IQ stanza with the same ID as the one we sent 756 while 1: 757 stanza = self.get_one_stanza() 758 node_id = stanza.getAttr('id') 759 if node_id == iq_node_id: 760 # This is the response 761 break 762 # We now have the roster populated 763 764 # All entries of type "from" and ask="subscribe" should be answered to 765 for k, v in self._roster.get_subscribed_from().items(): 766 if 'ask' in v and v['ask'] == 'subscribe': 767 self.send_presence(k, type="subscribed") 768 else: 769 # Ask for a subscription 770 self.send_presence(k, type="subscribe")
771
772 - def _roster_callback(self, client, stanza):
773 log_debug(3, "Updating the roster", stanza) 774 775 # Extract the <query> node 776 qnode = stanza.getTag('query') 777 if qnode is None or qnode.getNamespace() != jabber.NS_ROSTER: 778 # No query 779 log_debug(5, "Query node not found, skipping") 780 return 781 782 # This gets called any time a roster event is received 783 node_type = stanza.getAttr('type') 784 if node_type not in ('result', 'set'): 785 log_debug(5, "Not a result or a set, skipping") 786 return 787 788 # Now extract the <item> nodes 789 for node in qnode.getTags('item'): 790 self._roster.add_item(node)
791
792 - def cancel_subscription(self, jids):
793 if not jids: 794 return 795 796 qnode = JabberProtocolNode("query") 797 qnode.setNamespace(jabber.NS_ROSTER) 798 799 for jid in jids: 800 attrs = { 801 'jid' : jid, 802 'subscription' : 'remove', 803 } 804 inode = JabberProtocolNode("item", attrs=attrs) 805 qnode.insertNode(inode) 806 807 node = JabberIqNode(type="set") 808 remove_iq_id = "remove-%s" % self.get_unique_id() 809 node.setID(remove_iq_id) 810 node.insertNode(qnode) 811 812 self.send(node)
813
814 - def get_one_stanza(self, timeout=None):
815 """Returns one stanza (or None if timeout is set)""" 816 if timeout: 817 start = time.time() 818 while not self._incoming_node_queue: 819 if timeout: 820 now = time.time() 821 if now >= start + timeout: 822 # Timed out 823 log_debug(4, "timed out", now, start, timeout) 824 return None 825 tm = start + timeout - now 826 else: 827 tm = None 828 # No nodes in the queue, read some data 829 self.process(timeout=tm) 830 831 # Now we have nodes in the queue 832 node = self._incoming_node_queue[0] 833 del self._incoming_node_queue[0] 834 return node
835
836 - def _build_stanza(self, stanza):
837 """Builds one stanza according to the handlers we have registered via 838 registerHandler or registerProtocol""" 839 name = stanza.getName() 840 if name not in self.handlers: 841 name = 'unknown' 842 # XXX This is weird - why is jabbberpy using type which is a type? 843 stanza = self.handlers[name][type](node=stanza) 844 return stanza
845
846 - def _orig_dispatch(self, stanza):
847 log_debug(6, stanza) 848 if self.debug_level > 5: 849 # Even more verbosity 850 sys.stderr.write("<-- ") 851 sys.stderr.write(str(stanza)) 852 sys.stderr.write("\n\n") 853 # Even though Client.dispatch does build a stanza properly, we have to 854 # do it ourselves too since dispatch doesn't return the modified 855 # stanza, so it was always of type Node (i.e. the top-level class) 856 stanza = self._build_stanza(stanza) 857 jabber.Client.dispatch(self, stanza) 858 self._incoming_node_queue.append(stanza)
859
860 - def _auth_dispatch(self, stanza):
861 log_debug(6, stanza) 862 if self.debug_level > 5: 863 # Even more verbosity 864 sys.stderr.write("<-- ") 865 sys.stderr.write(str(stanza)) 866 sys.stderr.write("\n\n") 867 # Create the stanza of the proper type 868 stanza = self._build_stanza(stanza) 869 self._incoming_node_queue.append(stanza)
870
871 - def auth(self, username, password, resource, register=1):
872 """Try to authenticate the username with the specified password 873 If the authentication fails, try to register the user. 874 If that fails as well, then JabberQualifiedError is raised 875 """ 876 log_debug(2, username, password, resource, register) 877 auth_iq_id = "auth-get-%s" % self.get_unique_id() 878 auth_get_iq = jabber.Iq(type='get') 879 auth_get_iq.setID(auth_iq_id) 880 q = auth_get_iq.setQuery(jabber.NS_AUTH) 881 q.insertTag('username').insertData(username) 882 self.send(auth_get_iq) 883 log_debug(4, "Sending auth request", auth_get_iq) 884 885 try: 886 auth_response = self.waitForResponse(auth_iq_id, timeout=60) 887 except JabberQualifiedError: 888 e = sys.exc_info()[1] 889 if not register: 890 raise 891 if e.errcode == '401': 892 # Need to register the user if possible 893 log_debug(4, "Need to register") 894 self.register(username, password) 895 return self.auth(username, password, resource, register=0) 896 897 raise 898 899 log_debug(4, "Auth response", auth_response) 900 auth_ret_query = auth_response.getTag('query') 901 auth_set_id = "auth-set-%s" % self.get_unique_id() 902 auth_set_iq = jabber.Iq(type="set") 903 auth_set_iq.setID(auth_set_id) 904 905 q = auth_set_iq.setQuery(jabber.NS_AUTH) 906 q.insertTag('username').insertData(username) 907 q.insertTag('resource').insertData(resource) 908 909 if auth_ret_query.getTag('token'): 910 token = auth_ret_query.getTag('token').getData() 911 seq = auth_ret_query.getTag('sequence').getData() 912 913 h = hashlib.new('sha1', hashlib.new('sha1', password).hexdigest() + token).hexdigest() 914 for i in range(int(seq)): 915 h = hashlib.new('sha1', h).hexdigest() 916 q.insertTag('hash').insertData(h) 917 elif auth_ret_query.getTag('digest'): 918 digest = q.insertTag('digest') 919 digest.insertData(hashlib.new('sha1', 920 bstr(self.getIncomingID() + password)).hexdigest() ) 921 else: 922 q.insertTag('password').insertData(password) 923 924 log_debug(4, "Sending auth info", auth_set_iq) 925 try: 926 self.SendAndWaitForResponse(auth_set_iq) 927 except JabberQualifiedError: 928 e = sys.exc_info()[1] 929 if e.errcode == '401': 930 # Need to reserve the user if possible 931 log_debug(4, "Need to register") 932 return self.register(username, password) 933 raise 934 935 log_debug(4, "Authenticated") 936 return True
937
938 - def send(self, stanza):
939 if self.debug_level > 5: 940 sys.stderr.write("--> ") 941 sys.stderr.write(str(stanza)) 942 sys.stderr.write("\n\n") 943 return jabber.Client.send(self, stanza)
944 945
946 - def subscribe_to_presence(self, jids):
947 """Subscribe to these nodes' presence 948 The subscription in jabber works like this: 949 950 Contact 1 State Contact 1 State Contact 2 Contact 2 951 ----------+-------------------+------------------+---------- 952 subscribe -> [ none + ask ] 953 [ from ] <- subscribed 954 [ to ] 955 [ from + ask ] <- subscribe 956 subscribed -> [ both ] 957 [ both ] 958 ----------+-------------------+------------------+---------- 959 960 Enclosed in square brackets is the state when the communication took 961 place. 962 """ 963 subscribed_to = self._roster.get_subscribed_to() 964 log_debug(4, "Subscribed to", subscribed_to) 965 subscribed_both = self._roster.get_subscribed_both() 966 log_debug(4, "Subscribed both", subscribed_both) 967 subscribed_none = self._roster.get_subscribed_none() 968 log_debug(4, "Subscribed none", subscribed_none) 969 subscribed_from = self._roster.get_subscribed_from() 970 log_debug(4, "Subscribed from", subscribed_from) 971 for full_jid in jids: 972 jid = self._strip_resource(full_jid) 973 jid = str(jid) 974 if jid in subscribed_both: 975 log_debug(4, "Already subscribed to the presence of node", jid) 976 continue 977 # If to or from subscription for this node, we still send the 978 # subscription request, but we shouldn't drop the subscription, so 979 # we take the jid out of the respective hash 980 if jid in subscribed_to: 981 log_debug(4, "Subscribed to") 982 continue 983 if jid in subscribed_none: 984 ent = subscribed_none[jid] 985 if 'ask' in ent and ent['ask'] == 'subscribe': 986 log_debug(4, "Subscribed none + ask=subscribe") 987 # We already asked for a subscription 988 continue 989 if jid in subscribed_from: 990 ent = subscribed_from[jid] 991 if 'ask' in ent and ent['ask'] == 'subscribe': 992 log_debug(4, "Subscribed from + ask=subscribe") 993 # We already asked for a subscription 994 continue 995 996 # Make sure we update the roster ourselves, to avoid sending 997 # presence subscriptions twice 998 # At this point we should only have 2 cases left: either from or 999 # none. 1000 if jid in self._roster._subscribed_from: 1001 subscription = "from" 1002 hashd = self._roster._subscribed_from 1003 else: 1004 subscription = "none" 1005 hashd = self._roster._subscribed_none 1006 1007 hashd[jid] = { 1008 'jid' : jid, 1009 'subscription' : subscription, 1010 'ask' : 'subscribe', 1011 } 1012 1013 # subscribe this node to the jid's presence 1014 log_debug(4, jid) 1015 stripped_jid = self._strip_resource(jid) 1016 presence_node = JabberPresenceNode(to=stripped_jid, type="subscribe") 1017 presence_node.setID("presence-%s" % self.get_unique_id()) 1018 sig = self._create_signature(jid, NS_RHN_PRESENCE_SUBSCRIBE) 1019 if sig: 1020 presence_node.insertNode(sig) 1021 log_debug(5, "Sending presence subscription request", presence_node) 1022 self.send(presence_node)
1023 1024 # XXX Here we should clean up everybody that is no longer online, but 1025 # this is more difficult 1026
1027 - def send_presence(self, jid=None, type=None, xid=None):
1028 log_debug(3, jid, type) 1029 if jid is None: 1030 node = JabberPresenceNode() 1031 else: 1032 node = JabberPresenceNode(to=jid) 1033 1034 if type: 1035 node.setType(type) 1036 1037 if xid: 1038 node.setID(xid) 1039 1040 self.send(node)
1041
1042 - def fileno(self):
1043 return self._reader.fileno()
1044
1045 - def read(self):
1046 received = '' 1047 while 1: 1048 rfds, wfds, exfds = select.select([self.fileno()], [], [], 0) 1049 if not rfds: 1050 # No input 1051 break 1052 buff = self._read(self.BLOCK_SIZE) 1053 if not buff: 1054 break 1055 received = received + buff 1056 if not received: 1057 # EOF reached 1058 self.disconnected(self) 1059 return received
1060
1061 - def process_loop_hook(self):
1062 pass
1063
1064 - def process(self, timeout=None):
1065 log_debug(3, timeout) 1066 self._incoming_node_queue = [] 1067 fileno = self.fileno() 1068 # Wait for a node or until we hit the timeout 1069 start = time.time() 1070 while 1: 1071 now = time.time() 1072 if timeout: 1073 if now >= start + timeout: 1074 # Timed out 1075 return 0 1076 tm = start + timeout - now 1077 else: 1078 tm = None 1079 1080 self.process_loop_hook() 1081 # tm is the number of seconds we have to wait (or None) 1082 log_debug(5, "before select(); timeout", tm) 1083 rfds, wfds, exfds = select.select([fileno], [], [], tm) 1084 log_debug(5, "select() returned") 1085 if not rfds: 1086 # Timed out 1087 return 0 1088 # Try to read as much data as possible 1089 if hasattr(self._sock, 'pending'): 1090 # This is on the SSL case - select() will use the native 1091 # socket's file descriptor. SSL may decode more data than we 1092 # are willing to read - so just read what's available 1093 log_debug(5, "Reading %s bytes from ssl socket" % self.BLOCK_SIZE) 1094 try: 1095 data = self._read(self.BLOCK_SIZE) 1096 except SSL.SSL.SysCallError: 1097 e = sys.exc_info()[1] 1098 log_debug(5, "Closing socket") 1099 self._non_ssl_sock.close() 1100 raise_with_tb(SSLError("OpenSSL error; will retry", str(e)), sys.exc_info()[2]) 1101 log_debug(5, "Read %s bytes" % len(data)) 1102 if not data: 1103 raise JabberError("Premature EOF") 1104 self._parser.Parse(data) 1105 pending = self._sock.pending() 1106 if pending: 1107 # More bytes to read from the SSL socket 1108 data = self._read(pending) 1109 self._parser.Parse(data) 1110 else: 1111 # Normal socket - select will figure out correctly if the read 1112 # will block 1113 data = self._read(self.BLOCK_SIZE) 1114 if not data: 1115 raise JabberError("Premature EOF") 1116 self._parser.Parse(data) 1117 1118 # We may not have read enough data to be able to produce a node 1119 if not self._incoming_node_queue: 1120 # Go back and read some more 1121 if timeout: 1122 # Trying to wait some more before giving up in this call 1123 continue 1124 # No reason to block again, return into the higher-level 1125 # select() 1126 return 0 1127 return len(self._incoming_node_queue) 1128 return 0
1129 1130
1131 - def register(self, username, password):
1132 log_debug(2, username, password) 1133 self.requestRegInfo() 1134 d = self.getRegInfo() 1135 if 'username' in d: 1136 self.setRegInfo('username', username) 1137 if 'password' in d: 1138 self.setRegInfo('password', password) 1139 try: 1140 self.sendRegInfo() 1141 except JabberQualifiedError: 1142 e = sys.exc_info()[1] 1143 if e.errcode == '409': 1144 # Need to register the user if possible 1145 log_error("Invalid password") 1146 self.disconnect() 1147 sys.exit(0) 1148 raise 1149 return True
1150
1151 - def _waitForResponse(self, ID, timeout=jabber.timeout):
1152 log_debug(5, ID, timeout) 1153 # jabberpy's function waits when it shouldn't so have to rebuild it 1154 ID = jabber.ustr(ID) 1155 1156 self.lastErr = '' 1157 self.lastErrCode = 0 1158 1159 if timeout is not None: 1160 abort_time = time.time() + timeout 1161 self.DEBUG("waiting with timeout:%s for %s" % (timeout, ID), 1162 jabber.DBG_NODE_IQ) 1163 else: 1164 self.DEBUG("waiting for %s" % ID, jabber.DBG_NODE_IQ) 1165 1166 while 1: 1167 if timeout is None: 1168 tmout = None 1169 else: 1170 tmout = abort_time - time.time() 1171 if tmout <= 0: 1172 # Timed out 1173 break 1174 log_debug(5, "before get_one_stanza") 1175 stanza = self.get_one_stanza(tmout) 1176 log_debug(5, "after get_one_stanza") 1177 if not stanza: 1178 # get_one_stanza should only return None for a timeout 1179 assert timeout is not None 1180 break 1181 1182 error_code = stanza.getErrorCode() 1183 if error_code: 1184 # Error 1185 self.lastErr = stanza.getError() 1186 self.lastErrCode = error_code 1187 return None 1188 1189 # Is it the proper stanza ID? 1190 tid = jabber.ustr(stanza.getID()) 1191 if ID == tid: 1192 # This is the node 1193 return stanza 1194 1195 # Keep looking for stanzas until we time out (if a timeout was 1196 # passed) 1197 1198 # Timed out 1199 self.lastErr = "Timeout" 1200 return None
1201
1202 - def waitForResponse(self, ID, timeout=jabber.timeout):
1203 result = self._waitForResponse(ID, timeout=timeout) 1204 if result is not None: 1205 return result 1206 if self.lastErr == 'Timeout': 1207 raise TimeoutError() 1208 1209 if self.lastErrCode: 1210 raise JabberQualifiedError(self.lastErrCode, self.lastErr) 1211 1212 raise JabberError("Unknown error", self.lastErr)
1213 1214
1215 - def get_unique_id(self):
1216 seq = self._seq 1217 JabberClient._seq = seq + 1 1218 return "%s-%s" % (self._uniq_client_string, seq)
1219
1220 - def disconnectHandler(self, conn):
1221 pass
1222 1223 # Need to add the version tothe XML stream
1224 - def _header_string(self):
1225 self.DEBUG("jabber_lib.JabberClient.header: sending initial header", 1226 jabber.DBG_INIT) 1227 templ = "<?xml version='1.0' encoding='UTF-8'?><stream:stream %s>" 1228 attrs = { 1229 'to' : self._host, 1230 'xmlns' : self._namespace, 1231 'xmlns:stream' : "http://etherx.jabber.org/streams", 1232 'version' : '1.0', 1233 } 1234 if self._outgoingID: 1235 attrs['id'] = self._outgoingID 1236 # XXX Add more custom attributes here 1237 addition = [] 1238 for k, v in attrs.items(): 1239 addition.append("%s='%s'" % (k, v)) 1240 addition = " ".join(addition) 1241 return templ % addition
1242
1243 - def header(self):
1244 header = self._header_string() 1245 self.send(header) 1246 self.process(jabber.timeout)
1247
1248 - def _fix_jid(self, jid):
1249 return jid
1250
1251 - def _presence_callback(self, client, stanza):
1252 """ 1253 If the roster is enabled, presence stanzas with type="subscribed" 1254 should never be received - the server will initiate a roster push 1255 instead 1256 """ 1257 jid = stanza.getFrom() 1258 presence_type = stanza.getType() 1259 log_debug(3, self.jid, jid, presence_type) 1260 stanza_id = stanza.getID() 1261 1262 assert(stanza.getName() == 'presence') 1263 1264 # We may not get the full JID here 1265 if presence_type is None or presence_type == 'subscribed': 1266 log_debug(4, "Node is available", jid, presence_type) 1267 self.set_jid_available(jid) 1268 1269 # Now subscribe this node to the other node's presence, just in 1270 # case 1271 self.subscribe_to_presence([jid]) 1272 return 1273 1274 if presence_type in ('unsubscribed', 'unavailable'): 1275 log_debug(4, "Node is unavailable", jid, presence_type) 1276 self.set_jid_unavailable(jid) 1277 return 1278 1279 if presence_type == 'subscribe': 1280 # XXX misa 20051111: don't check signatures for presence anymore, 1281 # the fact they expire makes them unreliable 1282 1283 #sig = self._check_signature(stanza) 1284 #if not sig: 1285 # print("KKKKKK", stanza) 1286 # log_debug(1, "Invalid signature", jid) 1287 # return 1288 1289 log_debug(4, "Subscription request approved", jid) 1290 self.send_presence(jid, type="subscribed", xid=stanza_id) 1291 # Now subscribe this node to the other node's presence 1292 self.subscribe_to_presence([jid]) 1293 return 1294 if presence_type == 'probe': 1295 log_debug(4, "Presence probe", jid) 1296 self.send(JabberPresenceNode(to=jid))
1297
1298 - def _check_signature(self, stanza, actions=None):
1299 return 1
1300
1301 - def _strip_resource(self, jid):
1302 return strip_resource(jid)
1303
1304 - def _create_signature(self, jid, action):
1305 return None
1306
1307 - def send_message(self, jid, action):
1308 node = JabberMessageNode(to=jid, type='normal') 1309 sig = self._create_signature(jid, action) 1310 if sig: 1311 node.insertNode(sig) 1312 self.send(node)
1313
1314 - def jid_available(self, jid):
1315 return self._roster.jid_available(jid)
1316
1317 - def set_jid_available(self, jid):
1318 return self._roster.set_available(jid)
1319
1320 - def set_jid_unavailable(self, jid):
1321 return self._roster.set_unavailable(jid)
1322
1323 - def match_stanza_tags(self, stanza, tag_name, namespace=None):
1324 """Get the matching (child) tags of this stanza, possibly with the 1325 specified namespace""" 1326 tags = stanza.getTags(tag_name) 1327 if not tags: 1328 return [] 1329 if namespace is None: 1330 # Nothing more to look for 1331 return tags 1332 return [x for x in tags if x.getNamespace() == namespace]
1333
1334 - def _check_signature_from_message(self, stanza, actions):
1335 log_debug(4, stanza) 1336 assert stanza.getName() == 'message' 1337 1338 message_from = stanza.getFrom() 1339 message_type = stanza.getType() 1340 if message_type == 'error': 1341 log_debug(1, 'Received error from %s: %s' % (message_from, stanza)) 1342 return None 1343 1344 if message_type != 'normal': 1345 log_debug(1, 'Unsupported message type %s ignored' % message_type) 1346 return None 1347 1348 x_delayed_nodes = self.match_stanza_tags(stanza, 'x', 1349 namespace=jabber.NS_DELAY) 1350 if x_delayed_nodes: 1351 log_debug(1, 'Ignoring delayed stanza') 1352 return None 1353 1354 sig = self._check_signature(stanza, actions=actions) 1355 if not sig: 1356 if self.debug_level > 5: 1357 raise Exception(1) 1358 log_debug(1, "Mismatching signatures") 1359 return None 1360 1361 return sig
1362 1363
1364 -class SSLSocket(SSL.SSLSocket):
1365 pass
1366
1367 -class SSLVerifyError(SSL.SSL.Error):
1368 pass
1369
1370 -def generate_random_string(length=20):
1371 if not length: 1372 return '' 1373 random_bytes = 16 1374 length = int(length) 1375 s = hashlib.new('sha1') 1376 s.update(bstr("%.8f" % time.time())) 1377 s.update(bstr("%s" % os.getpid())) 1378 devrandom = open('/dev/urandom', "rb") 1379 result = [] 1380 cur_length = 0 1381 while 1: 1382 s.update(devrandom.read(random_bytes)) 1383 buf = s.hexdigest() 1384 result.append(buf) 1385 cur_length = cur_length + len(buf) 1386 if cur_length >= length: 1387 break 1388 1389 devrandom.close() 1390 1391 return ''.join(result)[:length].lower()
1392 1393
1394 -def push_to_background():
1395 log_debug(3, "Pushing process into background") 1396 # Push this process into background 1397 pid = os.fork() 1398 if pid > 0: 1399 # Terminate parent process 1400 os._exit(0) 1401 1402 # Child process becomes a process group leader (and detaches from 1403 # terminal) 1404 os.setpgrp() 1405 1406 # Change working directory 1407 os.chdir('/') 1408 1409 # Set umask 1410 #7/7/05 wregglej 162619 set the umask to 0 so the remote scripts can run 1411 os.umask(0) 1412 1413 #redirect stdin, stdout, and stderr. 1414 for f in sys.stdout, sys.stderr: 1415 f.flush() 1416 1417 #files we want stdin,stdout and stderr to point to. 1418 si = open("/dev/null", 'r') 1419 so = open("/dev/null", 'ab+') 1420 se = open("/dev/null", 'ab+', 0) 1421 1422 os.dup2(si.fileno(), sys.stdin.fileno()) 1423 os.dup2(so.fileno(), sys.stdout.fileno()) 1424 os.dup2(se.fileno(), sys.stderr.fileno())
1425 1426 1427 # close file descriptors 1428 # from subprocess import MAXFD 1429 #for i in range(3, MAXFD): 1430 # try: 1431 # os.close(i) 1432 # except: 1433 # pass 1434 1435
1436 -class Roster:
1437 - def __init__(self):
1438 self._subscribed_to = {} 1439 self._subscribed_from = {} 1440 self._subscribed_both = {} 1441 self._subscribed_none = {} 1442 self._available_nodes = {}
1443
1444 - def add_item(self, item):
1445 subscr = item.getAttr('subscription') 1446 jid = item.getAttr('jid') 1447 jid = strip_resource(jid) 1448 jid = str(jid) 1449 entry = { 1450 'jid' : jid, 1451 'subscription' : subscr, 1452 } 1453 ask = item.getAttr('ask') 1454 if ask: 1455 entry['ask'] = ask 1456 1457 actions = ['to', 'from', 'both', 'none'] 1458 if subscr in actions: 1459 for a in actions: 1460 d = getattr(self, '_subscribed_' + a) 1461 if subscr == a: 1462 # Set it 1463 d[jid] = entry 1464 elif jid in d: 1465 # Remove it 1466 del d[jid]
1467
1468 - def get_subscribed_from(self):
1469 return self._subscribed_from.copy()
1470
1471 - def get_subscribed_to(self):
1472 return self._subscribed_to.copy()
1473
1474 - def get_subscribed_both(self):
1475 return self._subscribed_both.copy()
1476
1477 - def get_subscribed_none(self):
1478 return self._subscribed_none.copy()
1479
1480 - def get_subscribed_to_jids(self):
1481 ret = self._subscribed_to.copy() 1482 ret.update(self._subscribed_both) 1483 return ret
1484
1485 - def get_subscribed_from_jids(self):
1486 ret = self._subscribed_from.copy() 1487 ret.update(self._subscribed_both) 1488 return ret
1489
1490 - def get_available_nodes(self):
1491 return self._available_nodes.copy()
1492
1493 - def set_available(self, jid):
1494 jid = str(jid) 1495 self._available_nodes[jid] = 1
1496
1497 - def set_unavailable(self, jid):
1498 jid = str(jid) 1499 if jid in self._available_nodes: 1500 del self._available_nodes[jid]
1501
1502 - def jid_available(self, jid):
1503 return jid in self._available_nodes
1504
1505 - def clear(self):
1506 self._subscribed_to.clear() 1507 self._subscribed_from.clear() 1508 self._subscribed_both.clear() 1509 self._subscribed_none.clear()
1510
1511 - def __repr__(self):
1512 return "Roster:\n\tto: %s\n\tfrom: %s\n\tboth: %s\n\tnone: %s" % ( 1513 self._subscribed_to.keys(), 1514 self._subscribed_from.keys(), 1515 self._subscribed_both.keys(), 1516 self._subscribed_none.keys(), 1517 )
1518 1519
1520 -def strip_resource(jid):
1521 # One doesn't subscribe to a specific resource 1522 if not isinstance(jid, jabber.JID): 1523 jid = jabber.JID(jid) 1524 return jid.getStripped()
1525
1526 -def extract_traceback():
1527 return traceback.format_exc(None)
1528
1529 -class Sendall:
1530 """This class exists here because python 1.5.2 does not support a 1531 sendall() method for sockets"""
1532 - def __init__(self, sock):
1533 self.sock = sock
1534
1535 - def sendall(self, data, flags=0):
1536 to_send = len(data) 1537 if not to_send: 1538 # No data 1539 return 0 1540 bytes_sent = 0 1541 while 1: 1542 ret = self.sock.send(data[bytes_sent:], flags) 1543 if bytes_sent + ret == to_send: 1544 # We're done 1545 break 1546 bytes_sent = bytes_sent + ret 1547 return to_send
1548