Package backend :: Package satellite_tools :: Module xmlWireSource
[hide private]
[frames] | no frames]

Source Code for Module backend.satellite_tools.xmlWireSource

  1  # 
  2  # Copyright (c) 2008--2018 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16   
 17  # pylint: disable=E0012, C0413 
 18  # system imports 
 19  import os 
 20  import sys 
 21  import time 
 22   
 23  # rhn imports 
 24  from rhn import rpclib 
 25  sys.path.append("/usr/share/rhn") 
 26  from up2date_client import config 
 27   
 28  from spacewalk.common.usix import raise_with_tb 
 29  from spacewalk.common import rhnLib 
 30  from spacewalk.common.rhnConfig import CFG 
 31   
 32  # local imports 
 33  from syncLib import log, log2, RhnSyncException 
 34  import connection 
35 36 -class BaseWireSource:
37 38 """ Base object for wire-commo to RHN for delivery of XML/RPMS. """ 39 40 serverObj = None 41 handler = '' 42 url = '' 43 sslYN = 0 44 systemid = None 45 server_handler = None 46 xml_dump_version = None 47
48 - def __init__(self, systemid, sslYN=0, xml_dump_version=None):
53
54 - def getServer(self, forcedYN=0):
55 if forcedYN: 56 self.setServer(self.handler, self.url, forcedYN) 57 return BaseWireSource.serverObj
58
59 - def schemeAndUrl(self, url):
60 """ http[s]://BLAHBLAHBLAH/ACKACK --> http[s]://BLAHBLAHBLAH """ 61 62 if not url: 63 url = CFG.RHN_PARENT # the default 64 # just make the url complete. 65 hostname = rhnLib.parseUrl(url or '')[1] 66 hostname = hostname.split(':')[0] # just in case 67 if self.sslYN: 68 url = 'https://' + hostname 69 else: 70 url = 'http://' + hostname 71 return url
72
73 - def setServer(self, handler, url=None, forcedYN=0):
74 """ XMLRPC server object (ssl set in parameters). 75 NOTE: url expected to be of the form: scheme://machine/HANDLER 76 """ 77 78 url = self.schemeAndUrl(url) 79 80 if self._cached_connection_params(handler, url, forcedYN=forcedYN): 81 # Already cached 82 return None 83 84 self._set_connection_params(handler, url) 85 86 url = '%s%s' % (url, handler) # url is properly set up now. 87 88 serverObj = self._set_connection(url) 89 self._set_ssl_trusted_certs(serverObj) 90 return serverObj
91 92 @staticmethod
93 - def _set_connection_params(handler, url):
96
97 - def _cached_connection_params(self, handler, url, forcedYN=0):
98 """Helper function; returns 0 if we have to reset the connection 99 params, 1 if the cached values are ok""" 100 if forcedYN: 101 return 0 102 if handler != self.handler or url != self.url: 103 return 0 104 return 1
105
106 - def _set_connection(self, url):
107 "Instantiates a connection object" 108 109 serverObj = connection.StreamConnection(url, proxy=CFG.HTTP_PROXY, 110 username=CFG.HTTP_PROXY_USERNAME, password=CFG.HTTP_PROXY_PASSWORD, 111 xml_dump_version=self.xml_dump_version, timeout=CFG.timeout) 112 BaseWireSource.serverObj = serverObj 113 return serverObj
114
115 - def _set_ssl_trusted_certs(self, serverObj):
116 if not self.sslYN: 117 return None 118 119 # Check certificate 120 if CFG.ISS_PARENT: 121 caChain = CFG.ISS_CA_CHAIN 122 else: 123 caChain = CFG.CA_CHAIN 124 if caChain: 125 # require RHNS-CA-CERT file to be able to authenticate the SSL 126 # connections. 127 if not os.access(caChain, os.R_OK): 128 message = "ERROR: can not find RHN CA file: %s" % caChain 129 log(-1, message, stream=sys.stderr) 130 raise Exception(message) 131 # force the validation of the SSL cert 132 serverObj.add_trusted_cert(caChain) 133 return caChain 134 135 message = '--- Warning: SSL connection made but no CA certificate used' 136 log(1, message, stream=sys.stderr) 137 return None
138
139 - def _openSocketStream(self, method, params):
140 """Wraps the gzipstream.GzipStream instantiation in a test block so we 141 can open normally if stream is not gzipped.""" 142 143 stream = None 144 retryYN = 0 145 wait = 0.33 146 lastErrorMsg = '' 147 cfg = config.initUp2dateConfig() 148 for i in range(cfg['networkRetries']): 149 server = self.getServer(retryYN) 150 if server is None: 151 log2(-1, 2, 'ERROR: server unable to initialize, attempt %s' % i, stream=sys.stderr) 152 retryYN = 1 153 time.sleep(wait) 154 continue 155 func = getattr(server, method) 156 try: 157 stream = func(*params) 158 if CFG.SYNC_TO_TEMP: 159 import tempfile 160 cached = tempfile.NamedTemporaryFile() 161 stream.read_to_file(cached) 162 cached.seek(0) 163 return cached 164 else: 165 return stream 166 except rpclib.xmlrpclib.ProtocolError: 167 e = sys.exc_info()[1] 168 p = tuple(['<the systemid>'] + list(params[1:])) 169 lastErrorMsg = 'ERROR: server.%s%s: %s' % (method, p, e) 170 log2(-1, 2, lastErrorMsg, stream=sys.stderr) 171 retryYN = 1 172 time.sleep(wait) 173 # do not reraise this exception! 174 except (KeyboardInterrupt, SystemExit): 175 raise 176 except rpclib.xmlrpclib.Fault: 177 e = sys.exc_info()[1] 178 lastErrorMsg = e.faultString 179 break 180 except Exception: # pylint: disable=E0012, W0703 181 e = sys.exc_info()[1] 182 p = tuple(['<the systemid>'] + list(params[1:])) 183 lastErrorMsg = 'ERROR: server.%s%s: %s' % (method, p, e) 184 log2(-1, 2, lastErrorMsg, stream=sys.stderr) 185 break 186 # do not reraise this exception! 187 if lastErrorMsg: 188 raise_with_tb(RhnSyncException(lastErrorMsg), sys.exc_info()[2]) 189 # Returns a stream 190 # Should never be reached 191 return stream
192
193 - def setServerHandler(self, isIss=0):
194 if isIss: 195 self.server_handler = CFG.RHN_ISS_METADATA_HANDLER 196 else: 197 self.server_handler = CFG.RHN_METADATA_HANDLER
198
199 200 -class MetadataWireSource(BaseWireSource):
201 202 """retrieve specific xml stream through xmlrpc interface.""" 203 204 @staticmethod
205 - def is_disk_loader():
206 return False
207
208 - def _prepare(self):
209 self.setServer(self.server_handler)
210
211 - def getArchesXmlStream(self):
212 """retrieve xml stream for arch data.""" 213 self._prepare() 214 return self._openSocketStream("dump.arches", (self.systemid,))
215
216 - def getArchesExtraXmlStream(self):
217 "retrieve xml stream for the server group type arch compat" 218 self._prepare() 219 return self._openSocketStream("dump.arches_extra", (self.systemid,))
220
221 - def getProductNamesXmlStream(self):
222 "retrieve xml stream for the product names data" 223 self._prepare() 224 return self._openSocketStream("dump.product_names", (self.systemid,))
225
227 """retrieve xml stream for channel family data.""" 228 self._prepare() 229 return self._openSocketStream("dump.channel_families", (self.systemid,))
230
231 - def getOrgsXmlStream(self):
232 """retrieve xml stream for org data.""" 233 self._prepare() 234 return self._openSocketStream("dump.orgs", (self.systemid,))
235
236 - def getChannelXmlStream(self):
237 """retrieve xml stream for channel data given a 238 list of channel labels.""" 239 self._prepare() 240 return self._openSocketStream("dump.channels", (self.systemid, []))
241
242 - def getShortPackageXmlStream(self, packageIds):
243 """retrieve xml stream for short package data given 244 a list of package ids.""" 245 self._prepare() 246 return self._openSocketStream("dump.packages_short", (self.systemid, packageIds))
247
248 - def getChannelShortPackagesXmlStream(self, channel, last_modified):
249 """retrieve xml stream for short package data given a channel 250 label and the last modified timestamp of the channel""" 251 self._prepare() 252 return self._openSocketStream("dump.channel_packages_short", 253 (self.systemid, channel, last_modified))
254
255 - def getPackageXmlStream(self, packageIds):
256 """retrieve xml stream for package data given a 257 list of package ids.""" 258 self._prepare() 259 return self._openSocketStream("dump.packages", (self.systemid, packageIds))
260
261 - def getSourcePackageXmlStream(self, packageIds):
262 """retrieve xml stream for package data given a 263 list of package ids.""" 264 self._prepare() 265 return self._openSocketStream("dump.source_packages", (self.systemid, packageIds))
266
267 - def getErrataXmlStream(self, erratumIds):
268 """retrieve xml stream for erratum data given a list of erratum ids.""" 269 self._prepare() 270 return self._openSocketStream("dump.errata", (self.systemid, erratumIds))
271
272 - def getKickstartsXmlStream(self, ksLabels):
273 "retrieve xml stream for kickstart trees" 274 self._prepare() 275 return self._openSocketStream("dump.kickstartable_trees", 276 (self.systemid, ksLabels))
277
278 - def getComps(self, channel):
279 return self._openSocketStream("dump.get_comps", 280 (self.systemid, channel))
281
282 - def getModules(self, channel):
283 return self._openSocketStream("dump.get_modules", 284 (self.systemid, channel))
285
286 - def getRpm(self, nvrea, channel):
287 release = nvrea[2] 288 epoch = nvrea[3] 289 if epoch: 290 release = "%s:%s" % (release, epoch) 291 package_name = "%s-%s-%s.%s.rpm" % (nvrea[0], nvrea[1], release, 292 nvrea[4]) 293 self._prepare() 294 return self._openSocketStream("dump.get_rpm", 295 (self.systemid, package_name, channel))
296
297 - def getKickstartFile(self, ks_label, relative_path):
298 self._prepare() 299 return self._openSocketStream("dump.get_ks_file", 300 (self.systemid, ks_label, relative_path))
301
302 303 -class XMLRPCWireSource(BaseWireSource):
304 305 "Base class for all the XMLRPC calls" 306 307 @staticmethod
308 - def _xmlrpc(function, params):
309 try: 310 retval = getattr(BaseWireSource.serverObj, function)(*params) 311 except TypeError: 312 e = sys.exc_info()[1] 313 log(-1, 'ERROR: during "getattr(BaseWireSource.serverObj, %s)(*(%s))"' % (function, params)) 314 raise 315 except rpclib.xmlrpclib.ProtocolError: 316 e = sys.exc_info()[1] 317 log2(-1, 2, 'ERROR: ProtocolError: %s' % e, stream=sys.stderr) 318 raise 319 return retval
320
321 322 -class AuthWireSource(XMLRPCWireSource):
323 324 """Simply authenticate this systemid as a satellite.""" 325
326 - def checkAuth(self):
327 self.setServer(CFG.RHN_XMLRPC_HANDLER) 328 authYN = None 329 log(2, ' +++ Satellite synchronization tool checking in.') 330 try: 331 authYN = self._xmlrpc('authentication.check', (self.systemid,)) 332 except (rpclib.xmlrpclib.ProtocolError, rpclib.xmlrpclib.Fault): 333 raise 334 if authYN: 335 log(2, ' +++ Entitled satellite validated.', stream=sys.stderr) 336 elif authYN is None: 337 log(-1, ' --- An error occurred upon authentication of this satellite -- ' 338 'review the pertinent log file (%s) and/or alert RHN at rhn-satellite@redhat.com.' % CFG.LOG_FILE, 339 stream=sys.stderr) 340 sys.exit(-1) 341 elif authYN == 0: 342 log(-1, ' --- This server is not an entitled satellite.', stream=sys.stderr) 343 sys.exit(-1) 344 return authYN
345
346 -class RPCGetWireSource(BaseWireSource):
347 348 "Class to retrieve various files via authenticated GET requests" 349 get_server_obj = None 350 login_token = None 351 get_server_obj = None 352
353 - def __init__(self, systemid, sslYN, xml_dump_version):
354 BaseWireSource.__init__(self, systemid, sslYN, xml_dump_version) 355 self.extinctErrorYN = 0
356 357 @staticmethod
358 - def _set_connection_params(handler, url):
361
362 - def login(self, force=0):
363 "Perform a login, return a GET Server instance" 364 if force: 365 # Invalidate it 366 self._set_login_token(None) 367 if self.login_token: 368 # Return cached one 369 return self.get_server_obj 370 371 # Force a login otherwise 372 self._set_login_token(self._login()) 373 url = self.url + self.handler 374 get_server_obj = connection.GETServer(url, proxy=CFG.HTTP_PROXY, 375 username=CFG.HTTP_PROXY_USERNAME, password=CFG.HTTP_PROXY_PASSWORD, 376 headers=self.login_token, timeout=CFG.timeout) 377 # Add SSL trusted cert 378 self._set_ssl_trusted_certs(get_server_obj) 379 self._set_rpc_server(get_server_obj) 380 return self.get_server_obj
381
382 - def _login(self):
383 if not self.systemid: 384 raise Exception("systemid not set!") 385 386 # Set the URL to the one for regular XML-RPC calls 387 self.setServer(CFG.RHN_XMLRPC_HANDLER) 388 389 try: 390 login_token = self.getServer().authentication.login(self.systemid) 391 except rpclib.xmlrpclib.ProtocolError: 392 e = sys.exc_info()[1] 393 log2(-1, 2, 'ERROR: ProtocolError: %s' % e, stream=sys.stderr) 394 raise 395 return login_token
396 397 @staticmethod
398 - def _set_login_token(token):
400 401 @staticmethod
402 - def _set_rpc_server(server):
404
405 - def _rpc_call(self, function_name, params):
406 get_server_obj = self.login() 407 # Try a couple of times 408 fault_count = 0 409 expired_token = 0 410 cfg = config.initUp2dateConfig() 411 while fault_count - expired_token < cfg['networkRetries']: 412 try: 413 ret = getattr(get_server_obj, function_name)(*params) 414 except rpclib.xmlrpclib.ProtocolError: 415 e = sys.exc_info()[1] 416 # We have two codes to check: the HTTP error code, and the 417 # combination (failtCode, faultString) encoded in the headers 418 # of the request. 419 http_error_code = e.errcode 420 fault_code, fault_string = rpclib.reportError(e.headers) 421 fault_count += 1 422 if http_error_code == 401 and fault_code == -34: 423 # Login token expired 424 get_server_obj = self.login(force=1) 425 # allow exactly one respin for expired token 426 expired_token = 1 427 continue 428 if http_error_code == 404 and fault_code == -17: 429 # File not found 430 self.extinctErrorYN = 1 431 return None 432 log(-1, 'ERROR: http error code :%s; fault code: %s; %s' % 433 (http_error_code, fault_code, fault_string)) 434 # XXX 435 raise 436 else: 437 return ret 438 raise Exception("Failed after multiple attempts!")
439
440 - def getPackageStream(self, channel, nvrea):
441 release = nvrea[2] 442 epoch = nvrea[3] 443 if epoch: 444 release = "%s:%s" % (release, epoch) 445 package_name = "%s-%s-%s.%s.rpm" % (nvrea[0], nvrea[1], release, 446 nvrea[4]) 447 return self._rpc_call("getPackage", (channel, package_name))
448
449 - def getKickstartFileStream(self, channel, ks_tree_label, relative_path):
450 return self._rpc_call("getKickstartFile", (channel, ks_tree_label, 451 relative_path))
452
453 - def getCompsFileStream(self, channel):
454 return self._rpc_call("repodata", (channel, 'comps.xml'))
455
456 - def getModulesFilesStram(self, channel):
457 return self._rpc_call("repodata", (channel, 'modules.yaml'))
458