Package src :: Module osad
[hide private]
[frames] | no frames]

Source Code for Module src.osad

  1  # 
  2  # Copyright (c) 2008--2017 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  import re 
 17  import sys 
 18  import time 
 19  from spacewalk.common.usix import ListType 
 20  from rhn import rpclib 
 21  import random 
 22  import socket 
 23   
 24  from up2date_client.config import initUp2dateConfig 
 25  from up2date_client import config 
 26   
 27  try: # python 3 
 28      PY3 = sys.version_info.major >= 3 
 29  except AttributeError: # python 2 
 30      PY3 = False 
 31   
 32  if PY3: 
 33      import urllib.parse as urlparse 
 34      from osad.rhn_log import set_debug_level, log_debug, die, set_logfile 
 35      from osad import jabber_lib, osad_config, osad_client 
 36  else: 
 37      import urlparse 
 38      from rhn_log import set_debug_level, log_debug, die, set_logfile 
 39      import jabber_lib 
 40      import osad_config 
 41      import osad_client 
 42   
43 -def main():
44 return Runner().main()
45
46 -class Runner(jabber_lib.Runner):
47 client_factory = osad_client.Client 48 49 # How often will we try to reconnect. We want this randomized, so not all 50 # clients hit the server at the same time 51 _min_sleep = 60 52 _max_sleep = 120 53
54 - def __init__(self):
55 jabber_lib.Runner.__init__(self) 56 self._up2date_config = None 57 self._config = None 58 self._xmlrpc_server = None 59 self._systemid_file = None 60 self._time_drift = 0 61 self.options_table.extend([ 62 self.option('--cfg', action="store", 63 help="Use this configuration file for defaults"), 64 self.option('--jabber-server', action="store", 65 help="Primary jabber server to connect to"), 66 ]) 67 self._config_options = {} 68 # Counter for the number of config setups we had 69 self._config_setup_counter = 0 70 # How often to re-setup the config (i.e. make xmlrpc requests to get 71 # the config from the server) 72 self._config_setup_interval = random.randint(50, 100) 73 self._use_proxy = 1
74
75 - def setup_config(self, config, force=0):
76 # We don't want to slam the server with lots of XMLRPC requests at the 77 # same time, especially if jabberd goes down - in that case all 78 # clients are slamming the server at the same time 79 try: 80 if (self._config_setup_counter % self._config_setup_interval == 0) or \ 81 force: 82 # This will catch the first pass too 83 self._setup_config(config, force) 84 else: 85 log_debug(4, "Skipping config setup; counter=%s; interval=%s" % 86 (self._config_setup_counter, self._config_setup_interval)) 87 except: 88 self._config_setup_counter = 0 89 raise 90 91 # Update the counter for the next time 92 self._config_setup_counter = self._config_setup_counter + 1
93
94 - def _setup_config(self, config, force=0):
95 logfile = self.options.logfile 96 if logfile is None or logfile == '': 97 logfile = config['logfile'] 98 99 debug_level = self.options.verbose 100 if debug_level is None: 101 dl = config['debug_level'] 102 if dl is not None: 103 debug_level = int(dl) 104 else: 105 dl = 0 106 107 set_logfile(logfile) 108 self.debug_level = debug_level 109 set_debug_level(debug_level) 110 111 self._tcp_keepalive_timeout = config['tcp_keepalive_timeout'] 112 self._tcp_keepalive_count = config['tcp_keepalive_count'] 113 114 log_debug(3, "Updating configuration") 115 116 client_ssl_cert = config['ssl_ca_cert'] 117 osa_ssl_cert = config['osa_ssl_cert'] or client_ssl_cert 118 if osa_ssl_cert is None: 119 die("No SSL cert supplied") 120 121 self.ssl_cert = osa_ssl_cert 122 123 auth_info = self.read_auth_info(force) 124 125 self._username = auth_info['username'] 126 self._password = auth_info['password'] 127 self._resource = auth_info['resource'] 128 129 server_url = config.get('server_url') 130 131 self._jabber_servers = [] 132 if self.options.jabber_server: 133 self._jabber_servers.append(self.options.jabber_server) 134 135 if type(server_url) == type([]): 136 for su in server_url: 137 a_su = self._parse_url(su)[1] 138 self._jabber_servers.append(a_su) 139 else: 140 upstream_jabber_server = self._parse_url(server_url)[1] 141 if upstream_jabber_server not in self._jabber_servers: 142 self._jabber_servers.append(upstream_jabber_server) 143 144 if 'enable_failover' not in config or config['enable_failover'] != '1': 145 self._jabber_servers = [self._jabber_servers[0]] 146 147 # Load the config 148 self._config_options.clear() 149 self._config_options.update(config) 150 # No reason to expose these at the Client level - but if we have to, 151 # uncommment some of the values below 152 self._config_options.update({ 153 # 'jabber-servers' : self._jabber_servers, 154 # 'dispatchers' : self._dispatchers, 155 # 'client_name' : self._client_name, 156 # 'shared_key' : self._shared_key, 157 })
158 159
160 - def _parse_url(self, url, scheme="http"):
161 sch, netloc, path, params, query, fragment = urlparse.urlparse(url) 162 if not netloc: 163 # No schema - trying to patch it up ourselves? 164 url = scheme + "://" + url 165 sch, netloc, path, params, query, fragment = urlparse.urlparse(url) 166 return sch, netloc, path, params, query, fragment
167
168 - def fix_connection(self, c):
169 "After setting up the connection, do whatever else is necessary" 170 171 # Setup XMLRPC server 172 xmlrpc_params = self.build_rpclib_params(self._config_options) 173 174 # Looking for a server we connected to jabberd on 175 server_urls = self._config_options['server_url'] 176 for url in server_urls: 177 if self._connected_jabber_server in url: 178 xmlrpc_params['uri'] = url 179 break 180 181 server = rpclib.Server(**xmlrpc_params) 182 self._xmlrpc_server = server 183 184 client_ssl_cert = self._config_options['ssl_ca_cert'] 185 osa_ssl_cert = self._config_options['osa_ssl_cert'] or client_ssl_cert 186 if osa_ssl_cert: 187 server.add_trusted_cert(osa_ssl_cert) 188 189 server.registration.welcome_message() 190 191 server_capabilities = get_server_capability(server) 192 if 'registration.register_osad' not in server_capabilities: 193 raise Exception("Server does not support OSAD registration") 194 195 self._systemid_file = self._config_options['systemid'] 196 self._systemid = open(self._systemid_file).read() 197 198 current_timestamp = int(time.time()) 199 ret = server.registration.register_osad(self._systemid, 200 {'client-timestamp': current_timestamp}) 201 202 #Bugzilla: 142067 203 #If the server doesn't have push support. 'ret' won't have anything in it. 204 if len(ret.keys()) < 1: 205 raise jabber_lib.JabberConnectionError 206 207 js = ret.get('jabber-server') 208 if js not in self._jabber_servers: 209 self._jabber_servers.append(js) 210 211 212 server_timestamp = ret.get('server-timestamp') 213 # Compute the time drift between the client and the server 214 self._time_drift = server_timestamp - current_timestamp 215 log_debug(2, "Time drift", self._time_drift) 216 217 self._dispatchers = ret.get('dispatchers') 218 self._client_name = ret.get('client-name') 219 self._shared_key = ret.get('shared-key') 220 log_debug(2, "Client name", self._client_name) 221 log_debug(2, "Shared key", self._shared_key) 222 223 224 c.set_config_options(self._config_options) 225 c.client_id = self._client_name 226 c.shared_key = self._shared_key 227 c.time_drift = self._time_drift 228 c._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 229 c._sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, self._tcp_keepalive_timeout) 230 c._sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, self._tcp_keepalive_count) 231 232 # Update the jabber ID 233 systemid = open(self._systemid_file).read() 234 args = { 235 'jabber-id' : str(c.jid), 236 } 237 ret = self._xmlrpc_server.registration.register_osad_jid(systemid, 238 args) 239 240 c.set_dispatchers(self._dispatchers) 241 242 c.subscribe_to_presence(self._dispatchers) 243 # Signal presence to the jabber server 244 c.send_presence() 245 return c
246
247 - def preprocess_once(self, client):
248 # BZ 1410781 249 # If the system just started following a reboot event, 250 # we need to run rhn_check in order to let the server 251 # know the reboot is complete, otherwise it won't send 252 # any further events to us. 253 super(Runner, self).preprocess_once(client) 254 client.run_rhn_check_async() 255 return client
256
257 - def process_once(self, client):
258 259 # Re-read the systemid file. If it's changed from the 260 # previous version re-setup the config. This will create a new 261 # key on the satellite server tied to this new system id. 262 # This change prevents having to restart osad after a system 263 # re-registration. 264 systemid = open(self._systemid_file).read() 265 if systemid != self._systemid: 266 log_debug(4, "System re-registration detected. systemid file has changed.") 267 config = self.read_config() 268 raise jabber_lib.NeedRestart 269 270 # make sure that dispatchers are not stuck in state [none + ask] or [from + ask] 271 # for too long. This can happen, for example, if a "subscribe" presence stanza 272 # gets lost - in that case re-send it 273 client.unstick_contacts(self._dispatchers) 274 275 # if rhn_check is running or the last one failed, check more often 276 if (client._rhn_check_process is None) and (client._rhn_check_fail_count < 1): 277 client.process(timeout=180) 278 else: 279 client.process(timeout=5)
280
281 - def read_config(self):
282 ret = {} 283 # Read from the global config first 284 config_file = self.options.cfg 285 self._config = osad_config.init('osad', config_file=config_file) 286 config_keys = ['debug_level', 'osa_ssl_cert', 'logfile', 'run_rhn_check', 287 'rhn_check_command', 'enable_failover'] 288 for key in config_keys: 289 ret[key] = osad_config.get(key) 290 291 try: 292 server_url = osad_config.get('server_url') 293 except osad_config.InterpolationError: 294 e = sys.exc_info()[1] 295 server_url = config.getServerlURL() 296 else: 297 if not server_url: 298 server_url = config.getServerlURL() 299 else: 300 def convert_url(s): 301 s = s.strip() 302 if hasattr(config, 'convert_url_to_puny'): 303 s = config.convert_url_to_puny(s) 304 elif hasattr(config, 'convert_url_to_pune'): 305 s = config.convert_url_to_pune(s) 306 return s
307 308 server_url = [convert_url(i) for i in server_url.split(';')] 309 310 # Remove empty URLs 311 for url in server_url: 312 if not url: 313 server_url.remove(url) 314 315 # Real unusual case if there is no server URL both in up2date and osad config files 316 if not server_url: 317 die("Missing server URL in config file") 318 319 ret['server_url'] = server_url 320 321 #8/23/05 wregglej 165775 added the run_rhn_check option. 322 run_rhn_check = osad_config.get('run_rhn_check') 323 if run_rhn_check is None: 324 log_debug(3, "Forcing run_rhn_check") 325 run_rhn_check = 1 326 ret['run_rhn_check'] = int(run_rhn_check) 327 328 ret['tcp_keepalive_timeout'] = int(osad_config.get('tcp_keepalive_timeout', defval=1800)) 329 ret['tcp_keepalive_count'] = int(osad_config.get('tcp_keepalive_count', defval=3)) 330 331 systemid = osad_config.get('systemid') 332 if systemid is None: 333 systemid = self.get_up2date_config()['systemIdPath'] 334 ret['systemid'] = systemid 335 336 enable_proxy = self._config.get_option('enableProxy') 337 if enable_proxy is None: 338 enable_proxy = self.get_up2date_config()['enableProxy'] 339 340 if enable_proxy: 341 ret['enable_proxy'] = 1 342 343 ret['proxy_url'] = self._config.get_option('httpProxy') 344 if ret['proxy_url'] is None: 345 ret['proxy_url'] = str(config.getProxySetting()) 346 347 ret['enable_proxy_auth'] = 0 348 enable_proxy_auth = self._config.get_option('enableProxyAuth') 349 if enable_proxy_auth is None: 350 enable_proxy_auth = self.get_up2date_config()['enableProxyAuth'] 351 352 if enable_proxy_auth: 353 ret['enable_proxy_auth'] = 1 354 proxy_user = self._config.get_option('proxyUser') 355 if proxy_user is None: 356 proxy_user = self.get_up2date_config()['proxyUser'] 357 ret['proxy_user'] = proxy_user 358 359 proxy_password = self._config.get_option('proxyPassword') 360 if proxy_password is None: 361 proxy_password = self.get_up2date_config()['proxyPassword'] 362 ret['proxy_password'] = proxy_password 363 364 if not server_url: 365 die("Unable to retrieve server URL") 366 367 # SSL cert for Jabber's TLS, it can potentially be different than the 368 # client's 369 osa_ssl_cert = self._config.get_option('osa_ssl_cert') 370 # The up2date ssl cert - we get it from up2daate's config file 371 client_ca_cert = self.get_up2date_config()['sslCACert'] 372 if isinstance(client_ca_cert, ListType): 373 if client_ca_cert: 374 client_ca_cert = client_ca_cert[0] 375 else: 376 client_ca_cert = None 377 if osa_ssl_cert is None: 378 # No setting, use up2date's 379 osa_ssl_cert = client_ca_cert 380 381 if client_ca_cert is not None: 382 ret['ssl_ca_cert'] = client_ca_cert 383 if osa_ssl_cert is not None: 384 ret['osa_ssl_cert'] = osa_ssl_cert 385 386 return ret
387
388 - def get_up2date_config(self):
389 if self._up2date_config is None: 390 self._up2date_config = initUp2dateConfig() 391 return self._up2date_config
392
393 - def build_rpclib_params(self, config):
394 ret = {} 395 kmap = { 396 'server_url' : 'uri', 397 'proxy_user' : 'username', 398 'proxy_password' : 'password', 399 'proxy_url' : 'proxy', 400 } 401 for k, v in kmap.items(): 402 if k in config: 403 val = config[k] 404 if val is not None: 405 ret[v] = val 406 return ret
407
408 - def read_auth_info(self, force):
409 # generate some defaults 410 resource = 'osad' 411 username = 'osad-%s' % jabber_lib.generate_random_string(10) 412 password = jabber_lib.generate_random_string(20) 413 414 # Get the path to the auth info file - may be None 415 auth_info_file = self._config.get_option('auth_file') 416 auth_info = osad_config.get_auth_info(auth_info_file, 'osad-auth', force, 417 username=username, password=password, resource=resource) 418 return auth_info
419
420 -def get_server_capability(s):
421 headers = s.get_response_headers() 422 if headers is None: 423 # No request done yet 424 return {} 425 if PY3: 426 cap_headers = ["X-RHN-Server-Capability: %s" % val for val in headers.get_all("X-RHN-Server-Capability")] 427 else: 428 cap_headers = headers.getallmatchingheaders("X-RHN-Server-Capability") 429 430 if not cap_headers: 431 return {} 432 regexp = re.compile( 433 r"^(?P<name>[^(]*)\((?P<version>[^)]*)\)\s*=\s*(?P<value>.*)$") 434 vals = {} 435 for h in cap_headers: 436 arr = h.split(':', 1) 437 assert len(arr) == 2 438 val = arr[1].strip() 439 if not val: 440 continue 441 442 mo = regexp.match(val) 443 if not mo: 444 # XXX Just ignoring it, for now 445 continue 446 vdict = mo.groupdict() 447 for k, v in vdict.items(): 448 vdict[k] = v.strip() 449 450 vals[vdict['name']] = vdict 451 return vals
452 453 if __name__ == '__main__': 454 sys.exit(main() or 0) 455