Package src :: Module osad_client
[hide private]
[frames] | no frames]

Source Code for Module src.osad_client

  1  # 
  2  # Copyright (c) 2008--2016 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  import os 
 17  import time 
 18  import sys 
 19  from subprocess import Popen 
 20   
 21  try: # python 3 
 22      PY3 = sys.version_info.major >= 3 
 23  except AttributeError: # python 2 
 24      PY3 = False 
 25   
 26  if PY3: 
 27      from osad.rhn_log import log_debug 
 28      from osad import jabber_lib 
 29  else: 
 30      from rhn_log import log_debug 
 31      import jabber_lib 
 32   
33 -class Client(jabber_lib.JabberClient):
34 35 RHN_CHECK_CMD = '/usr/sbin/rhn_check' 36
37 - def __init__(self, *args, **kwargs):
38 jabber_lib.JabberClient.__init__(self, *args, **kwargs) 39 self.username = None 40 self.resource = None 41 self.client_id = None 42 self.shared_key = None 43 self.debug_level = 0 44 self.time_drift = 0 45 self._dispatchers = [] 46 self._config = {} 47 self._rhn_check_process = None 48 self._rhn_check_fail_count = 0 49 self._stuck_subscription_timestamp = time.time()
50
51 - def set_config_options(self, config):
52 self._config = config
53
54 - def set_debug_level(self, debug_level):
56
57 - def set_dispatchers(self, dispatchers):
58 self._dispatchers = dispatchers
59
60 - def start(self, username, password, resource):
61 log_debug(3, username, password, resource) 62 # XXX find a better name for this function 63 self.auth(username, password, resource) 64 self.username = username 65 self.resource = resource 66 self.jid = "%s@%s/%s" % (self.username, self._host, self.resource) 67 68 # Retrieve roster 69 self.retrieve_roster()
70
71 - def _create_signature(self, jid, action):
72 log_debug(4, jid, action) 73 attrs = { 74 'client-id' : self.client_id, 75 'timestamp' : int(time.time()), 76 'serial' : self.get_unique_id(), 77 'action' : action, 78 'jid' : self.jid, 79 } 80 signing_comps = ['client-id', 'timestamp', 'serial', 'action', 'jid'] 81 args = [self.shared_key, jid] 82 for sc in signing_comps: 83 args.append(attrs[sc]) 84 85 log_debug(4, "Signature args", args) 86 attrs['signature'] = jabber_lib.sign(*args) 87 88 x = jabber_lib.jabber.xmlstream.Node('x') 89 x.setNamespace(jabber_lib.NS_RHN_SIGNED) 90 for k, v in attrs.items(): 91 x.putAttr(k, v) 92 return x
93
94 - def _lookup_dispatcher(self, jid):
95 # presence may not send a resource in the JID 96 if not isinstance(jid, jabber_lib.jabber.JID) or jid.resource: 97 return str(jid) 98 jid = str(jid) 99 jid_len = len(jid) 100 for d in self._dispatchers: 101 if d[:jid_len] != jid: 102 continue 103 assert len(d) > jid_len 104 if d[jid_len] == '/': 105 # This is it 106 return d 107 return None
108
109 - def _fix_jid(self, jid):
110 return self._lookup_dispatcher(jid)
111
112 - def _check_signature(self, stanza, actions=None):
113 # Do we have this client in the table? 114 jid = stanza.getFrom() 115 if jid is None: 116 log_debug(3, 'no from') 117 return None 118 # Look for a <x> child that has our namespace 119 xes = stanza.getTags('x') 120 for x in xes: 121 if x.getNamespace() != jabber_lib.NS_RHN_SIGNED: 122 continue 123 break 124 else: #for 125 log_debug(1, "No signature node found in stanza") 126 return None 127 128 timestamp = x.getAttr('timestamp') 129 try: 130 timestamp = int(timestamp) 131 except ValueError: 132 log_debug(1, "Invalid message timestamp", timestamp) 133 return None 134 now = time.time() 135 136 current_drift = timestamp - now 137 # Allow for a 120 seconds drift 138 max_drift = 120 139 abs_drift = abs(current_drift - self.time_drift) 140 if abs_drift > max_drift: 141 log_debug(1, "Dropping message, drift is too big", abs_drift) 142 143 action = x.getAttr('action') 144 145 if actions and action not in actions: 146 log_debug(1, "action %s not allowed" % action) 147 return None 148 149 # We need the fully qualified JID here too 150 full_jid = x.getAttr('jid') 151 if not full_jid: 152 log_debug(3, "Full JID not found in signature stanza") 153 return None 154 155 attrs = { 156 'timestamp' : x.getAttr('timestamp'), 157 'serial' : x.getAttr('serial'), 158 'action' : x.getAttr('action'), 159 'jid' : full_jid, 160 } 161 signing_comps = ['timestamp', 'serial', 'action', 'jid'] 162 args = [self.shared_key, self.jid] 163 for sc in signing_comps: 164 args.append(attrs[sc]) 165 166 log_debug(4, "Signature args", args) 167 signature = jabber_lib.sign(*args) 168 x_signature = x.getAttr('signature') 169 if signature != x_signature: 170 log_debug(1, "Signatures do not match", signature, x_signature) 171 return None 172 # Happy joy 173 return x
174
175 - def _message_callback(self, client, stanza):
176 log_debug(4) 177 assert stanza.getName() == 'message' 178 179 # Actions we know how to react to 180 actions = [ 181 jabber_lib.NS_RHN_MESSAGE_REQUEST_CHECKIN, 182 jabber_lib.NS_RHN_MESSAGE_REQUEST_PING, 183 ] 184 sig = self._check_signature_from_message(stanza, actions) 185 if not sig: 186 return 187 188 action = sig.getAttr('action') 189 if action == jabber_lib.NS_RHN_MESSAGE_REQUEST_PING: 190 log_debug(1, 'Ping request') 191 self.send_message(stanza.getFrom(), 192 jabber_lib.NS_RHN_MESSAGE_RESPONSE_PING) 193 return 194 195 # Send confirmation 196 self.send_message(stanza.getFrom(), 197 jabber_lib.NS_RHN_MESSAGE_RESPONSE_CHECKIN) 198 199 # Checkin 200 run_check = self._config.get('run_rhn_check') 201 log_debug(3, "run_rhn_check:", run_check) 202 203 if not self._config.get('run_rhn_check'): 204 log_debug(0, "Pretend that command just ran") 205 else: 206 self.run_rhn_check_async()
207
208 - def process_loop_hook(self):
209 # if rhn_check process exists, check it last 210 # status 211 if self._rhn_check_process is not None: 212 retcode = self._rhn_check_process.poll() 213 if retcode is not None: 214 log_debug(3, "rhn_check exited with status %d" % retcode) 215 if retcode != 0: 216 self._rhn_check_fail_count += 1 217 else: 218 self._rhn_check_fail_count = 0 219 self._rhn_check_process = None 220 else: 221 log_debug(3, "rhn_check is still running...") 222 else: 223 # rhn_check is not running but last one failed 224 # we force a check even if the server does not 225 # contact us. The idea is to exhaust the number of 226 # times we can pick up the action until the server fails 227 # it. 228 if self._rhn_check_fail_count > 0: 229 log_debug(3, "rhn_check failed last time, " \ 230 "force retry (fail count %d)" % self._rhn_check_fail_count) 231 self.run_rhn_check_async()
232
233 - def run_rhn_check_async(self):
234 """Runs rhn_check and keeps a handle that it is monitored 235 during the event loop 236 """ 237 command = self._config.get('rhn_check_command') 238 # rhn_check now checks for multiple instances, 239 # lets use that directly 240 if command is None: 241 args = [self.RHN_CHECK_CMD] 242 else: 243 # XXX should find a better way to get the list of args 244 args = command.split() 245 246 # if rhn_check process already exists 247 if self._rhn_check_process is not None: 248 retcode = self._rhn_check_process.poll() 249 if retcode is None: 250 log_debug(3, "rhn_check is still running, not running again...") 251 return 252 253 if self._rhn_check_fail_count > 0: 254 log_debug(3, "rhn_check failed last time (fail count %d)" % self._rhn_check_fail_count) 255 256 log_debug(3, "About to execute:", args) 257 oldumask = os.umask(int("0077", 8)) 258 os.umask(oldumask | int("0022", 8)) 259 self._rhn_check_process = Popen(args) 260 os.umask(oldumask) 261 log_debug(0, "executed %s with pid %d" % (args[0], self._rhn_check_process.pid))
262
263 - def unstick_contacts(self, jids):
264 """If we are waiting for 'subscribed' presence stanzas for too long, ask again""" 265 if time.time() - self._stuck_subscription_timestamp > 60: 266 for jid in jids: 267 stripped_jid = self._strip_resource(jid) 268 if self.needs_unsticking(stripped_jid): 269 presence_node = jabber_lib.JabberPresenceNode(to=stripped_jid, type="subscribe") 270 presence_node.setID("presence-%s" % self.get_unique_id()) 271 log_debug(4, "Re-sending presence subscription request", presence_node) 272 self.send(presence_node) 273 self._stuck_subscription_timestamp = time.time()
274
275 - def needs_unsticking(self, jid):
276 """Returns True if jid is in state [none + ask] or [from + ask]""" 277 contact = None 278 279 subscribed_none = self._roster.get_subscribed_none() 280 if jid in subscribed_none: 281 contact = subscribed_none[jid] 282 283 subscribed_from = self._roster.get_subscribed_from() 284 if jid in subscribed_from: 285 contact = subscribed_from[jid] 286 287 if contact is not None: 288 return 'ask' in contact and contact['ask'] == 'subscribe' 289 290 return False
291