1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import os
17 import hashlib
18 import sys
19 import time
20 import select
21 import socket
22 import random
23 import fnmatch
24 from optparse import OptionParser, Option
25 import traceback
26 from rhn import SSL
27
28 try:
29 from rhn_log import log_debug, log_error
30 except ImportError:
31 from osad.rhn_log import log_debug, log_error
32
33 from spacewalk.common.usix import raise_with_tb
34 from rhn.i18n import bstr
35
36 import warnings
37 try:
38 warnings.filterwarnings("ignore", category=DeprecationWarning)
39 import jabber
40 finally:
41 warnings.resetwarnings()
42
43 NS_RHN = "http://jabber.rhn.redhat.com/jabber"
44 NS_RHN_SIGNED = "%s/signed" % NS_RHN
45 NS_RHN_PRESENCE_SUBSCRIBE = "%s/presence/subscribe" % NS_RHN
46 NS_RHN_PRESENCE_SUBSCRIBED = "%s/presence/subscribed" % NS_RHN
47 NS_RHN_PRESENCE_UNSUBSCRIBE = "%s/presence/unsubscribe" % NS_RHN
48 NS_RHN_MESSAGE_REQUEST_CHECKIN = "%s/message/request/checkin" % NS_RHN
49 NS_RHN_MESSAGE_RESPONSE_CHECKIN = "%s/message/response/checkin" % NS_RHN
50 NS_RHN_MESSAGE_REQUEST_PING = "%s/message/request/ping" % NS_RHN
51 NS_RHN_MESSAGE_RESPONSE_PING = "%s/message/response/ping" % NS_RHN
52
53 NS_STARTTLS = 'urn:ietf:params:xml:ns:xmpp-tls'
54 NS_STANZAS = "urn:ietf:params:xml:ns:xmpp-stanzas"
55
56
58 option_parser = OptionParser
59 option = Option
60
61 client_factory = None
62
63
64
65 _min_sleep = 60
66 _max_sleep = 90
67
69 self.options_table = [
70 self.option("-v", "--verbose", action="count",
71 help="Increase verbosity"),
72 self.option('-N', "--nodetach", action="store_true",
73 help="Suppress backgrounding and detachment of the process"),
74 self.option('--pid-file', action="store",
75 help="Write to this PID file"),
76 self.option('--logfile', action="store",
77 help="Write log information to this file"),
78 ]
79
80 self.ssl_cert = None
81 self.debug_level = 0
82 self._jabber_servers = []
83 self._connected_jabber_server = None
84 self._username = None
85 self._password = None
86 self._resource = None
87 self._in_background = 0
88 self._use_proxy = 0
89
91 "Process command line options"
92 self._parser = self.option_parser(option_list=self.options_table)
93 self.options, self.args = self._parser.parse_args()
94
96 """Method that starts up everything
97 - processes command line options
98 - big loop to reconnect if necessary
99 - read config
100 - setup config
101 - setup jabber connection
102 - process requests
103 """
104 self.process_cli_options()
105 force_setup = 0
106 no_fork = None
107 while 1:
108
109 if no_fork is None:
110
111 no_fork=0
112 else:
113
114 no_fork=1
115
116 try:
117 config = self.read_config()
118 if force_setup:
119 log_debug(2,"###Forcing setup")
120 self.setup_config(config, 1)
121 force_setup = 0
122 else:
123 self.setup_config(config)
124 c = self.setup_connection(no_fork=no_fork)
125 self.fix_connection(c)
126 self.process_forever(c)
127 except KeyboardInterrupt:
128 try:
129 c.disconnect()
130 except:
131 pass
132 sys.exit(0)
133 except SystemExit:
134 raise
135 except RestartRequested:
136 e = sys.exc_info()[1]
137 log_error("Restart requested", e)
138 if not self.is_in_background():
139 self.push_to_background()
140 continue
141 except NeedRestart:
142 e = sys.exc_info()[1]
143 log_debug(3, "Need Restart")
144 force_setup = 1
145 continue
146 except JabberConnectionError:
147 time_to_sleep = random.randint(self._min_sleep, self._max_sleep)
148 log_debug(0, "Unable to connect to jabber servers, sleeping"
149 " %s seconds" % time_to_sleep)
150 if not self.is_in_background():
151 self.push_to_background()
152 try:
153 time.sleep(time_to_sleep)
154 except KeyboardInterrupt:
155 sys.exit(0)
156 except InvalidCertError:
157 e = sys.exc_info()[1]
158 log_error("Invalid Cert Error:")
159 raise
160 except:
161
162 log_error("Error caught:")
163 log_error(extract_traceback())
164 time_to_sleep = random.randint(self._min_sleep, self._max_sleep)
165 log_debug(3, "Sleeping", time_to_sleep, "seconds")
166 if not self.is_in_background():
167 self.push_to_background()
168 try:
169 time.sleep(time_to_sleep)
170 except KeyboardInterrupt:
171 sys.exit(0)
172
174 "After setting up the connection, do whatever else is necessary"
175 return client
176
179
181 """Big loop to process requests
182 """
183 log_debug(1)
184 self.preprocess_once(client)
185 while 1:
186 try:
187 self.process_once(client)
188
189 time_to_sleep = random.randint(6, 10)
190 time.sleep(time_to_sleep)
191 except KeyboardInterrupt:
192
193 client.disconnect()
194 sys.exit(0)
195 except:
196
197 raise
198
200 "To be overridden in a client class"
201 raise NotImplementedError
202
205
208
210 return self._in_background
211
213 if not self.options.nodetach:
214
215 push_to_background()
216 self._in_background = 1
217
218 pid_file = self.options.pid_file
219 if pid_file:
220 try:
221 os.unlink(pid_file)
222 except OSError:
223 e = sys.exc_info()[1]
224 if e.errno != 2:
225 raise
226 try:
227
228 fd = os.open(pid_file, os.O_WRONLY| os.O_APPEND | os.O_CREAT, int("0644", 8))
229 os.write(fd, bstr("%d" % os.getpid()))
230 os.write(fd, bstr("\n"))
231 os.close(fd)
232 except OSError:
233 pass
234
237
241
243 """
244 - initializes a Jabber connection (by instantiating a Jabber client)
245 - if necessary, pushes to background
246 - authentication and resource binding (by calling start())
247
248 Possible causes for this function to return None:
249 - jabber server is not started
250 - jabber server is started but did not initialize SSL just yet
251
252 This function will kill the process with exit code 1 if the SSL
253 handshake failed (an indication of a mismatching CA cert). We do this
254 so starting the program as a daemon to fail if this happens. Of
255 course, if the server is down and the CA cert is bad, then the daemon
256 will start but will silently fail afterwards; the error log should
257 have a traceback though.
258 """
259 for js in self._jabber_servers:
260
261 log_debug(3, "Connecting to", js)
262
263 try:
264 c = self._get_jabber_client(js)
265 log_debug(1, "Connected to jabber server", js)
266 self._connected_jabber_server = js
267 break
268 except SSLHandshakeError:
269
270 sys.exit(1)
271 except socket.error:
272 e = sys.exc_info()[1]
273 self.print_message(js, "socket error")
274 log_error(extract_traceback())
275 continue
276 except JabberError:
277 e = sys.exc_info()[1]
278 self.print_message(js, "JabberError")
279 log_error(extract_traceback())
280 continue
281 except SSLError:
282 e = sys.exc_info()[1]
283 self.print_message(js, "SSLError")
284 log_error(extract_traceback())
285 continue
286 else:
287
288
289 log_debug(1, "Could not connect to any jabber server")
290
291
292 if not no_fork:
293 self.push_to_background()
294 raise JabberConnectionError
295
296
297 if not no_fork:
298 self.push_to_background()
299
300
301 c.start(username=self._username, password=self._password,
302 resource=self._resource)
303
304
305 c.custom_handler.register_callback(c._presence_callback, 'presence')
306 c.custom_handler.register_callback(c._message_callback, 'message')
307 c.custom_handler.register_callback(self._error_callback, 'error')
308 return c
309
311 """Returns a connected Jabber client, or raises an exception if it was
312 unable to connect"""
313 log_debug(3)
314 arr = jabber_server.split(':', 1)
315 jabber_server = arr[0]
316 cf = self.read_config()
317
318 jabberpy_proxy_dict = None
319 if self._use_proxy and 'proxy_url' in cf:
320 jabberpy_proxy_dict = {'host': cf['proxy_url'].split(':')[0],
321 'port': int(cf['proxy_url'].split(':')[1])}
322 if cf['enable_proxy_auth']:
323 jabberpy_proxy_dict['user'] = cf['proxy_user'];
324 jabberpy_proxy_dict['password'] = cf['proxy_password'];
325
326 if len(arr) == 2:
327 jabber_port = int(arr[1])
328 log_debug(2, "Connecting to", jabber_server, jabber_port)
329 c = self.client_factory(jabber_server, jabber_port, proxy=jabberpy_proxy_dict)
330 else:
331 log_debug(2, "Connecting to", jabber_server)
332 c = self.client_factory(jabber_server, proxy=jabberpy_proxy_dict)
333
334 c.debug_level = self.debug_level
335 c.add_trusted_cert(self.ssl_cert)
336 c.connect()
337 return c
338
340 """Logs error stanza messages for diagnostic purposes"""
341 log_error("Received an error stanza: ", stanza)
342 for kid in stanza.kids:
343 if kid.getName() == "conflict":
344 log_error("Received an conflict. Restarting with new credentials.")
345 raise NeedRestart
346
351
366
367 -def sign(secret_key, *values):
368 h = hashlib.new('sha1', bstr(secret_key)).hexdigest()
369 for v in values:
370 h = hashlib.new('sha1', bstr("%s%s" % (h, v))).hexdigest()
371 return h
372
373
376 return self.attrs.get(key, None)
377
380
383
386
391
392
405
407 log_debug(5, stanza)
408 stanza_name = stanza.getName()
409 if stanza_name not in self._handlers:
410 return []
411 stanza_id = stanza.getID()
412 stanza_ns = stanza.getNamespace()
413 result = {}
414 (h_idns, h_id, h_ns, l_def) = self._handlers[stanza_name]
415
416 if stanza_id is not None and stanza_ns:
417 cbs = h_idns.get((stanza_id, stanza_ns), [])
418 self._get_callbacks_from_list(cbs, result)
419 if stanza_id is not None:
420 cbs = h_id.get(stanza_id, [])
421 self._get_callbacks_from_list(cbs, result)
422 if stanza_ns:
423 cbs = h_ns.get(stanza_ns, [])
424 self._get_callbacks_from_list(cbs, result)
425 self._get_callbacks_from_list(l_def, result)
426 return list(result.keys())
427
429 for ent in l:
430 (callback, expiry, usage_count) = ent[:3]
431 if usage_count is None or usage_count >= 1:
432 result_hash[callback] = None
433 if usage_count is None:
434
435 continue
436
437 usage_count = usage_count - 1
438 if usage_count <= 0:
439
440 l.remove(ent)
441 continue
442
443 ent[2] = usage_count - 1
444
445 - def register_callback(self, callback, stanza_name, stanza_id=None,
446 stanza_ns=None, timeout=None, usage_count=None):
447 log_debug(3, callback, stanza_name, stanza_id, stanza_ns, timeout,
448 usage_count)
449 if timeout:
450 expiry = time.time() + timeout
451 else:
452 expiry = None
453 callback_entry = [callback, expiry, usage_count]
454 h_idns, h_id, h_ns, l_def = self._get_from_hash(self._handlers,
455 stanza_name, default_value=({}, {}, {}, []))
456
457
458 if stanza_id is not None and stanza_ns:
459 l = self._get_from_hash(h_idns, (stanza_id, stanza_ns), [])
460 l.append(callback_entry)
461 return
462
463 if stanza_id is not None:
464 l = self._get_from_hash(h_id, stanza_id, [])
465 l.append(callback_entry)
466 return
467
468 if stanza_ns:
469 l = self._get_from_hash(h_ns, stanza_ns, [])
470 l.append(callback_entry)
471 return
472
473
474 l_def.append(callback_entry)
475
477 if key in h:
478 val = h[key]
479 else:
480 val = h[key] = default_value
481 return val
482
491
496
498 log_debug(7, vals, now)
499 for val in vals:
500 (callback, expiry, usage_count) = val
501 if not expiry:
502 continue
503 if now <= expiry:
504
505 continue
506
507 vals.remove(val)
508
510 print("Debugging:", args)
511
514
517
520
523
525 "Raised when a lower-level SSL error is caught"
526 pass
527
529 "Raised when the SSL handshake failed"
530 pass
531
533 "Raised if the server does not support SSL"
534 pass
535
537 "Raised when we were unable to make a jabber connection"
538 pass
539
541 - def __init__(self, errcode, err, *args):
545
547 return "<%s instance at %s; errcode=%s; err=%s>" % (
548 self.__class__.__name__, id(self), self.errcode, self.err)
549
550 __str__ = __repr__
551
553 _seq = 0
554 BLOCK_SIZE = jabber.xmlstream.BLOCK_SIZE
555
557 log_debug(1)
558 jabber.Client.__init__(self, *args, **kwargs)
559 self.jid = None
560
561 self._incoming_node_queue = []
562
563 self.debug_level = 0
564 self.trusted_certs = []
565
566 self.registerProtocol('unknown', JabberProtocolNode)
567 self.registerProtocol('iq', JabberIqNode)
568 self.registerProtocol('message', JabberMessageNode)
569 self.registerProtocol('presence', JabberPresenceNode)
570 self.registerProtocol('error', JabberProtocolNode)
571
572 self.registerHandler('iq', self._expectedIqHandler, system=True)
573 self.registerHandler('iq', self._IqRegisterResult, 'result',
574 jabber.NS_REGISTER, system=True)
575
576 h = Handlers()
577 self.custom_handler = h
578 self.registerHandler('presence', h.dispatch)
579 self.registerHandler('iq', h.dispatch)
580 self.registerHandler('message', h.dispatch)
581 self.registerHandler('error', h.dispatch)
582
583 self._non_ssl_sock = None
584 self._roster = Roster()
585
586 self._uniq_client_string = generate_random_string(6)
587
591
593 log_debug(2)
594 if not self.trusted_certs:
595 raise SSLVerifyError("No trusted certs added")
596
597
598
599 self.dispatch = self._auth_dispatch
600
601 log_debug(5, "Attempting to connect")
602
603 for retry in range(0,3):
604 try:
605 jabber.Client.connect(self)
606 except socket.error:
607 e = sys.exc_info()[1]
608 log_error("Error connecting to jabber server: %s. "
609 "See https://access.redhat.com/solutions/327903 for more information. " % e)
610 raise socket.error(e)
611
612 log_debug(5, "Connected")
613
614
615
616
617
618
619
620 stanza = self.get_one_stanza()
621
622 log_debug(5, "Expecting features stanza, got:", stanza)
623 if stanza.getName() != 'features':
624 log_debug(1, "Server did not return a <features /> stanza,"
625 " reconnecting")
626 self.disconnect()
627 time.sleep(1)
628 else:
629 break
630 else:
631 log_error("Not able to reconnect - "
632 "See https://access.redhat.com/solutions/45332 for possible solutions.\n")
633 raise SSLDisabledError
634
635 starttls_node = stanza.getTag('starttls')
636 log_debug(5, "starttls node", starttls_node)
637 if starttls_node is None:
638 log_error("Server does not support TLS - <starttls /> "
639 "not in <features /> stanza")
640 self.disconnect()
641 raise SSLDisabledError
642
643
644 self.write("<starttls xmlns='%s' />" % NS_STARTTLS)
645
646 stanza = self.get_one_stanza()
647 log_debug(5, "Expecting proceed stanza, got:", stanza)
648 if stanza.getName() != 'proceed':
649 log_error("Server broke TLS negotiation - <proceed /> not sent")
650 self.disconnect()
651 raise SSLDisabledError
652
653 log_debug(4, "Preparing for TLS handshake")
654 ssl = SSLSocket(self._sock, trusted_certs=self.trusted_certs)
655 ssl.ssl_verify_callback = self.ssl_verify_callback
656 ssl.init_ssl()
657
658 try:
659 ssl.do_handshake()
660 self.verify_peer(ssl)
661 except SSL.SSL.Error:
662
663 log_error("Traceback caught:")
664 log_error(extract_traceback())
665 raise_with_tb(SSLHandshakeError(), sys.exc_info()[2])
666
667
668 jabber.xmlstream.Stream.connect(self)
669
670
671 self._non_ssl_sock = self._sock
672 self._sock = ssl._connection
673
674
675
676 self._setupComms()
677
678
679 self.send(self._header_string())
680
681
682 self.process()
683
684 stanza = self.get_one_stanza()
685
686 if stanza.getName() != 'features':
687 self.disconnect()
688 raise Exception("Server did not pass any features?")
689
690
691 self.dispatch = self._orig_dispatch
692 log_debug(5, "connect returning")
693
699
701
702 self._read = self._sock.recv
703 if hasattr(self._sock, 'sendall'):
704 self._write = self._sock.sendall
705 else:
706 self._write = Sendall(self._sock).sendall
707 self._reader = self._sock
708
710 log_debug(4, "Called", errnum, depth, ok)
711 if not ok:
712 log_error("SSL certificate verification failed")
713 self.write("</stream:stream>")
714 conn.close()
715 self._sock.close()
716 return ok
717
718 return ok
719
721 cert = ssl.get_peer_certificate()
722 if cert is None:
723 raise SSLVerifyError("Unable to retrieve peer cert")
724
725 subject = cert.get_subject()
726 if not hasattr(subject, 'CN'):
727 raise SSLVerifyError("Certificate has no Common Name")
728
729 common_name = subject.CN
730
731
732
733 if common_name[-1] != '.':
734 common_name = common_name + '.'
735 hdot = self._host
736 if hdot[-1] != '.':
737 hdot = hdot + '.'
738
739 if common_name != hdot and not fnmatch.fnmatchcase(hdot, common_name):
740 raise SSLVerifyError("Mismatch: peer name: %s; common name: %s" %
741 (self._host, common_name))
742
744 """Request the roster. Will register the roster callback,
745 but the call will wait for the roster to be properly populated"""
746
747 self.custom_handler.register_callback(self._roster_callback, 'iq')
748 iq_node_id = 'iq-request-%s' % self.get_unique_id()
749 iq_node = JabberIqNode(type="get")
750 iq_node.setQuery(jabber.NS_ROSTER)
751 iq_node.setID(iq_node_id)
752 self.send(iq_node)
753
754 stanza = None
755
756 while 1:
757 stanza = self.get_one_stanza()
758 node_id = stanza.getAttr('id')
759 if node_id == iq_node_id:
760
761 break
762
763
764
765 for k, v in self._roster.get_subscribed_from().items():
766 if 'ask' in v and v['ask'] == 'subscribe':
767 self.send_presence(k, type="subscribed")
768 else:
769
770 self.send_presence(k, type="subscribe")
771
773 log_debug(3, "Updating the roster", stanza)
774
775
776 qnode = stanza.getTag('query')
777 if qnode is None or qnode.getNamespace() != jabber.NS_ROSTER:
778
779 log_debug(5, "Query node not found, skipping")
780 return
781
782
783 node_type = stanza.getAttr('type')
784 if node_type not in ('result', 'set'):
785 log_debug(5, "Not a result or a set, skipping")
786 return
787
788
789 for node in qnode.getTags('item'):
790 self._roster.add_item(node)
791
793 if not jids:
794 return
795
796 qnode = JabberProtocolNode("query")
797 qnode.setNamespace(jabber.NS_ROSTER)
798
799 for jid in jids:
800 attrs = {
801 'jid' : jid,
802 'subscription' : 'remove',
803 }
804 inode = JabberProtocolNode("item", attrs=attrs)
805 qnode.insertNode(inode)
806
807 node = JabberIqNode(type="set")
808 remove_iq_id = "remove-%s" % self.get_unique_id()
809 node.setID(remove_iq_id)
810 node.insertNode(qnode)
811
812 self.send(node)
813
815 """Returns one stanza (or None if timeout is set)"""
816 if timeout:
817 start = time.time()
818 while not self._incoming_node_queue:
819 if timeout:
820 now = time.time()
821 if now >= start + timeout:
822
823 log_debug(4, "timed out", now, start, timeout)
824 return None
825 tm = start + timeout - now
826 else:
827 tm = None
828
829 self.process(timeout=tm)
830
831
832 node = self._incoming_node_queue[0]
833 del self._incoming_node_queue[0]
834 return node
835
837 """Builds one stanza according to the handlers we have registered via
838 registerHandler or registerProtocol"""
839 name = stanza.getName()
840 if name not in self.handlers:
841 name = 'unknown'
842
843 stanza = self.handlers[name][type](node=stanza)
844 return stanza
845
859
870
871 - def auth(self, username, password, resource, register=1):
872 """Try to authenticate the username with the specified password
873 If the authentication fails, try to register the user.
874 If that fails as well, then JabberQualifiedError is raised
875 """
876 log_debug(2, username, password, resource, register)
877 auth_iq_id = "auth-get-%s" % self.get_unique_id()
878 auth_get_iq = jabber.Iq(type='get')
879 auth_get_iq.setID(auth_iq_id)
880 q = auth_get_iq.setQuery(jabber.NS_AUTH)
881 q.insertTag('username').insertData(username)
882 self.send(auth_get_iq)
883 log_debug(4, "Sending auth request", auth_get_iq)
884
885 try:
886 auth_response = self.waitForResponse(auth_iq_id, timeout=60)
887 except JabberQualifiedError:
888 e = sys.exc_info()[1]
889 if not register:
890 raise
891 if e.errcode == '401':
892
893 log_debug(4, "Need to register")
894 self.register(username, password)
895 return self.auth(username, password, resource, register=0)
896
897 raise
898
899 log_debug(4, "Auth response", auth_response)
900 auth_ret_query = auth_response.getTag('query')
901 auth_set_id = "auth-set-%s" % self.get_unique_id()
902 auth_set_iq = jabber.Iq(type="set")
903 auth_set_iq.setID(auth_set_id)
904
905 q = auth_set_iq.setQuery(jabber.NS_AUTH)
906 q.insertTag('username').insertData(username)
907 q.insertTag('resource').insertData(resource)
908
909 if auth_ret_query.getTag('token'):
910 token = auth_ret_query.getTag('token').getData()
911 seq = auth_ret_query.getTag('sequence').getData()
912
913 h = hashlib.new('sha1', hashlib.new('sha1', password).hexdigest() + token).hexdigest()
914 for i in range(int(seq)):
915 h = hashlib.new('sha1', h).hexdigest()
916 q.insertTag('hash').insertData(h)
917 elif auth_ret_query.getTag('digest'):
918 digest = q.insertTag('digest')
919 digest.insertData(hashlib.new('sha1',
920 bstr(self.getIncomingID() + password)).hexdigest() )
921 else:
922 q.insertTag('password').insertData(password)
923
924 log_debug(4, "Sending auth info", auth_set_iq)
925 try:
926 self.SendAndWaitForResponse(auth_set_iq)
927 except JabberQualifiedError:
928 e = sys.exc_info()[1]
929 if e.errcode == '401':
930
931 log_debug(4, "Need to register")
932 return self.register(username, password)
933 raise
934
935 log_debug(4, "Authenticated")
936 return True
937
938 - def send(self, stanza):
944
945
947 """Subscribe to these nodes' presence
948 The subscription in jabber works like this:
949
950 Contact 1 State Contact 1 State Contact 2 Contact 2
951 ----------+-------------------+------------------+----------
952 subscribe -> [ none + ask ]
953 [ from ] <- subscribed
954 [ to ]
955 [ from + ask ] <- subscribe
956 subscribed -> [ both ]
957 [ both ]
958 ----------+-------------------+------------------+----------
959
960 Enclosed in square brackets is the state when the communication took
961 place.
962 """
963 subscribed_to = self._roster.get_subscribed_to()
964 log_debug(4, "Subscribed to", subscribed_to)
965 subscribed_both = self._roster.get_subscribed_both()
966 log_debug(4, "Subscribed both", subscribed_both)
967 subscribed_none = self._roster.get_subscribed_none()
968 log_debug(4, "Subscribed none", subscribed_none)
969 subscribed_from = self._roster.get_subscribed_from()
970 log_debug(4, "Subscribed from", subscribed_from)
971 for full_jid in jids:
972 jid = self._strip_resource(full_jid)
973 jid = str(jid)
974 if jid in subscribed_both:
975 log_debug(4, "Already subscribed to the presence of node", jid)
976 continue
977
978
979
980 if jid in subscribed_to:
981 log_debug(4, "Subscribed to")
982 continue
983 if jid in subscribed_none:
984 ent = subscribed_none[jid]
985 if 'ask' in ent and ent['ask'] == 'subscribe':
986 log_debug(4, "Subscribed none + ask=subscribe")
987
988 continue
989 if jid in subscribed_from:
990 ent = subscribed_from[jid]
991 if 'ask' in ent and ent['ask'] == 'subscribe':
992 log_debug(4, "Subscribed from + ask=subscribe")
993
994 continue
995
996
997
998
999
1000 if jid in self._roster._subscribed_from:
1001 subscription = "from"
1002 hashd = self._roster._subscribed_from
1003 else:
1004 subscription = "none"
1005 hashd = self._roster._subscribed_none
1006
1007 hashd[jid] = {
1008 'jid' : jid,
1009 'subscription' : subscription,
1010 'ask' : 'subscribe',
1011 }
1012
1013
1014 log_debug(4, jid)
1015 stripped_jid = self._strip_resource(jid)
1016 presence_node = JabberPresenceNode(to=stripped_jid, type="subscribe")
1017 presence_node.setID("presence-%s" % self.get_unique_id())
1018 sig = self._create_signature(jid, NS_RHN_PRESENCE_SUBSCRIBE)
1019 if sig:
1020 presence_node.insertNode(sig)
1021 log_debug(5, "Sending presence subscription request", presence_node)
1022 self.send(presence_node)
1023
1024
1025
1026
1041
1043 return self._reader.fileno()
1044
1046 received = ''
1047 while 1:
1048 rfds, wfds, exfds = select.select([self.fileno()], [], [], 0)
1049 if not rfds:
1050
1051 break
1052 buff = self._read(self.BLOCK_SIZE)
1053 if not buff:
1054 break
1055 received = received + buff
1056 if not received:
1057
1058 self.disconnected(self)
1059 return received
1060
1063
1065 log_debug(3, timeout)
1066 self._incoming_node_queue = []
1067 fileno = self.fileno()
1068
1069 start = time.time()
1070 while 1:
1071 now = time.time()
1072 if timeout:
1073 if now >= start + timeout:
1074
1075 return 0
1076 tm = start + timeout - now
1077 else:
1078 tm = None
1079
1080 self.process_loop_hook()
1081
1082 log_debug(5, "before select(); timeout", tm)
1083 rfds, wfds, exfds = select.select([fileno], [], [], tm)
1084 log_debug(5, "select() returned")
1085 if not rfds:
1086
1087 return 0
1088
1089 if hasattr(self._sock, 'pending'):
1090
1091
1092
1093 log_debug(5, "Reading %s bytes from ssl socket" % self.BLOCK_SIZE)
1094 try:
1095 data = self._read(self.BLOCK_SIZE)
1096 except SSL.SSL.SysCallError:
1097 e = sys.exc_info()[1]
1098 log_debug(5, "Closing socket")
1099 self._non_ssl_sock.close()
1100 raise_with_tb(SSLError("OpenSSL error; will retry", str(e)), sys.exc_info()[2])
1101 log_debug(5, "Read %s bytes" % len(data))
1102 if not data:
1103 raise JabberError("Premature EOF")
1104 self._parser.Parse(data)
1105 pending = self._sock.pending()
1106 if pending:
1107
1108 data = self._read(pending)
1109 self._parser.Parse(data)
1110 else:
1111
1112
1113 data = self._read(self.BLOCK_SIZE)
1114 if not data:
1115 raise JabberError("Premature EOF")
1116 self._parser.Parse(data)
1117
1118
1119 if not self._incoming_node_queue:
1120
1121 if timeout:
1122
1123 continue
1124
1125
1126 return 0
1127 return len(self._incoming_node_queue)
1128 return 0
1129
1130
1131 - def register(self, username, password):
1132 log_debug(2, username, password)
1133 self.requestRegInfo()
1134 d = self.getRegInfo()
1135 if 'username' in d:
1136 self.setRegInfo('username', username)
1137 if 'password' in d:
1138 self.setRegInfo('password', password)
1139 try:
1140 self.sendRegInfo()
1141 except JabberQualifiedError:
1142 e = sys.exc_info()[1]
1143 if e.errcode == '409':
1144
1145 log_error("Invalid password")
1146 self.disconnect()
1147 sys.exit(0)
1148 raise
1149 return True
1150
1152 log_debug(5, ID, timeout)
1153
1154 ID = jabber.ustr(ID)
1155
1156 self.lastErr = ''
1157 self.lastErrCode = 0
1158
1159 if timeout is not None:
1160 abort_time = time.time() + timeout
1161 self.DEBUG("waiting with timeout:%s for %s" % (timeout, ID),
1162 jabber.DBG_NODE_IQ)
1163 else:
1164 self.DEBUG("waiting for %s" % ID, jabber.DBG_NODE_IQ)
1165
1166 while 1:
1167 if timeout is None:
1168 tmout = None
1169 else:
1170 tmout = abort_time - time.time()
1171 if tmout <= 0:
1172
1173 break
1174 log_debug(5, "before get_one_stanza")
1175 stanza = self.get_one_stanza(tmout)
1176 log_debug(5, "after get_one_stanza")
1177 if not stanza:
1178
1179 assert timeout is not None
1180 break
1181
1182 error_code = stanza.getErrorCode()
1183 if error_code:
1184
1185 self.lastErr = stanza.getError()
1186 self.lastErrCode = error_code
1187 return None
1188
1189
1190 tid = jabber.ustr(stanza.getID())
1191 if ID == tid:
1192
1193 return stanza
1194
1195
1196
1197
1198
1199 self.lastErr = "Timeout"
1200 return None
1201
1213
1214
1216 seq = self._seq
1217 JabberClient._seq = seq + 1
1218 return "%s-%s" % (self._uniq_client_string, seq)
1219
1222
1223
1225 self.DEBUG("jabber_lib.JabberClient.header: sending initial header",
1226 jabber.DBG_INIT)
1227 templ = "<?xml version='1.0' encoding='UTF-8'?><stream:stream %s>"
1228 attrs = {
1229 'to' : self._host,
1230 'xmlns' : self._namespace,
1231 'xmlns:stream' : "http://etherx.jabber.org/streams",
1232 'version' : '1.0',
1233 }
1234 if self._outgoingID:
1235 attrs['id'] = self._outgoingID
1236
1237 addition = []
1238 for k, v in attrs.items():
1239 addition.append("%s='%s'" % (k, v))
1240 addition = " ".join(addition)
1241 return templ % addition
1242
1247
1250
1252 """
1253 If the roster is enabled, presence stanzas with type="subscribed"
1254 should never be received - the server will initiate a roster push
1255 instead
1256 """
1257 jid = stanza.getFrom()
1258 presence_type = stanza.getType()
1259 log_debug(3, self.jid, jid, presence_type)
1260 stanza_id = stanza.getID()
1261
1262 assert(stanza.getName() == 'presence')
1263
1264
1265 if presence_type is None or presence_type == 'subscribed':
1266 log_debug(4, "Node is available", jid, presence_type)
1267 self.set_jid_available(jid)
1268
1269
1270
1271 self.subscribe_to_presence([jid])
1272 return
1273
1274 if presence_type in ('unsubscribed', 'unavailable'):
1275 log_debug(4, "Node is unavailable", jid, presence_type)
1276 self.set_jid_unavailable(jid)
1277 return
1278
1279 if presence_type == 'subscribe':
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289 log_debug(4, "Subscription request approved", jid)
1290 self.send_presence(jid, type="subscribed", xid=stanza_id)
1291
1292 self.subscribe_to_presence([jid])
1293 return
1294 if presence_type == 'probe':
1295 log_debug(4, "Presence probe", jid)
1296 self.send(JabberPresenceNode(to=jid))
1297
1300
1303
1306
1313
1316
1319
1322
1333
1335 log_debug(4, stanza)
1336 assert stanza.getName() == 'message'
1337
1338 message_from = stanza.getFrom()
1339 message_type = stanza.getType()
1340 if message_type == 'error':
1341 log_debug(1, 'Received error from %s: %s' % (message_from, stanza))
1342 return None
1343
1344 if message_type != 'normal':
1345 log_debug(1, 'Unsupported message type %s ignored' % message_type)
1346 return None
1347
1348 x_delayed_nodes = self.match_stanza_tags(stanza, 'x',
1349 namespace=jabber.NS_DELAY)
1350 if x_delayed_nodes:
1351 log_debug(1, 'Ignoring delayed stanza')
1352 return None
1353
1354 sig = self._check_signature(stanza, actions=actions)
1355 if not sig:
1356 if self.debug_level > 5:
1357 raise Exception(1)
1358 log_debug(1, "Mismatching signatures")
1359 return None
1360
1361 return sig
1362
1363
1366
1369
1371 if not length:
1372 return ''
1373 random_bytes = 16
1374 length = int(length)
1375 s = hashlib.new('sha1')
1376 s.update(bstr("%.8f" % time.time()))
1377 s.update(bstr("%s" % os.getpid()))
1378 devrandom = open('/dev/urandom', "rb")
1379 result = []
1380 cur_length = 0
1381 while 1:
1382 s.update(devrandom.read(random_bytes))
1383 buf = s.hexdigest()
1384 result.append(buf)
1385 cur_length = cur_length + len(buf)
1386 if cur_length >= length:
1387 break
1388
1389 devrandom.close()
1390
1391 return ''.join(result)[:length].lower()
1392
1393
1395 log_debug(3, "Pushing process into background")
1396
1397 pid = os.fork()
1398 if pid > 0:
1399
1400 os._exit(0)
1401
1402
1403
1404 os.setpgrp()
1405
1406
1407 os.chdir('/')
1408
1409
1410
1411 os.umask(0)
1412
1413
1414 for f in sys.stdout, sys.stderr:
1415 f.flush()
1416
1417
1418 si = open("/dev/null", 'r')
1419 so = open("/dev/null", 'ab+')
1420 se = open("/dev/null", 'ab+', 0)
1421
1422 os.dup2(si.fileno(), sys.stdin.fileno())
1423 os.dup2(so.fileno(), sys.stdout.fileno())
1424 os.dup2(se.fileno(), sys.stderr.fileno())
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1438 self._subscribed_to = {}
1439 self._subscribed_from = {}
1440 self._subscribed_both = {}
1441 self._subscribed_none = {}
1442 self._available_nodes = {}
1443
1445 subscr = item.getAttr('subscription')
1446 jid = item.getAttr('jid')
1447 jid = strip_resource(jid)
1448 jid = str(jid)
1449 entry = {
1450 'jid' : jid,
1451 'subscription' : subscr,
1452 }
1453 ask = item.getAttr('ask')
1454 if ask:
1455 entry['ask'] = ask
1456
1457 actions = ['to', 'from', 'both', 'none']
1458 if subscr in actions:
1459 for a in actions:
1460 d = getattr(self, '_subscribed_' + a)
1461 if subscr == a:
1462
1463 d[jid] = entry
1464 elif jid in d:
1465
1466 del d[jid]
1467
1469 return self._subscribed_from.copy()
1470
1472 return self._subscribed_to.copy()
1473
1475 return self._subscribed_both.copy()
1476
1478 return self._subscribed_none.copy()
1479
1481 ret = self._subscribed_to.copy()
1482 ret.update(self._subscribed_both)
1483 return ret
1484
1486 ret = self._subscribed_from.copy()
1487 ret.update(self._subscribed_both)
1488 return ret
1489
1491 return self._available_nodes.copy()
1492
1494 jid = str(jid)
1495 self._available_nodes[jid] = 1
1496
1498 jid = str(jid)
1499 if jid in self._available_nodes:
1500 del self._available_nodes[jid]
1501
1503 return jid in self._available_nodes
1504
1506 self._subscribed_to.clear()
1507 self._subscribed_from.clear()
1508 self._subscribed_both.clear()
1509 self._subscribed_none.clear()
1510
1512 return "Roster:\n\tto: %s\n\tfrom: %s\n\tboth: %s\n\tnone: %s" % (
1513 self._subscribed_to.keys(),
1514 self._subscribed_from.keys(),
1515 self._subscribed_both.keys(),
1516 self._subscribed_none.keys(),
1517 )
1518
1519
1521
1522 if not isinstance(jid, jabber.JID):
1523 jid = jabber.JID(jid)
1524 return jid.getStripped()
1525
1527 return traceback.format_exc(None)
1528
1530 """This class exists here because python 1.5.2 does not support a
1531 sendall() method for sockets"""
1534
1535 - def sendall(self, data, flags=0):
1536 to_send = len(data)
1537 if not to_send:
1538
1539 return 0
1540 bytes_sent = 0
1541 while 1:
1542 ret = self.sock.send(data[bytes_sent:], flags)
1543 if bytes_sent + ret == to_send:
1544
1545 break
1546 bytes_sent = bytes_sent + ret
1547 return to_send
1548