Package backend :: Package satellite_tools :: Module satsync
[hide private]
[frames] | no frames]

Source Code for Module backend.satellite_tools.satsync

   1  # 
   2  # Copyright (c) 2008--2018 Red Hat, Inc. 
   3  # 
   4  # This software is licensed to you under the GNU General Public License, 
   5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
   6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
   7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
   8  # along with this software; if not, see 
   9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
  10  # 
  11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
  12  # granted to use or replicate Red Hat trademarks that are incorporated 
  13  # in this software or its documentation. 
  14  # 
  15  # Spacewalk Incremental Synchronization Tool 
  16  #    main function. 
  17   
  18   
  19  # __lang. imports__ 
  20  # pylint: disable=E0012, C0413 
  21  import datetime 
  22  import os 
  23  import sys 
  24  import stat 
  25  import time 
  26  import exceptions 
  27  import fnmatch 
  28  try: 
  29      #  python 2 
  30      import Queue 
  31  except ImportError: 
  32      #  python3 
  33      import queue as Queue # pylint: disable=F0401 
  34  import threading 
  35  from optparse import Option, OptionParser 
  36  import gettext 
  37  from rhn.connections import idn_ascii_to_puny, idn_puny_to_unicode 
  38   
  39  sys.path.append("/usr/share/rhn") 
  40  from up2date_client import config 
  41   
  42  # __rhn imports__ 
  43  from spacewalk.common import usix 
  44  from spacewalk.common import rhnMail, rhnLib 
  45  from spacewalk.common.rhnLog import initLOG 
  46  from spacewalk.common.rhnConfig import CFG, initCFG, PRODUCT_NAME 
  47  from spacewalk.common.rhnTB import exitWithTraceback, fetchTraceback 
  48  from spacewalk.common.checksum import getFileChecksum 
  49  from spacewalk.server import rhnSQL 
  50  from spacewalk.server.rhnSQL import SQLError, SQLSchemaError, SQLConnectError 
  51  from spacewalk.server.rhnLib import get_package_path 
  52  from spacewalk.common import fileutils 
  53   
  54  # __rhn sync/import imports__ 
  55  import xmlWireSource 
  56  import xmlDiskSource 
  57  from progress_bar import ProgressBar 
  58  from xmlSource import FatalParseException, ParseException 
  59  from diskImportLib import rpmsPath 
  60   
  61  from syncLib import log, log2, log2disk, log2stderr, log2email 
  62  from syncLib import RhnSyncException, RpmManip, ReprocessingNeeded 
  63  from syncLib import initEMAIL_LOG, dumpEMAIL_LOG 
  64  from syncLib import FileCreationError, FileManip 
  65   
  66  from SequenceServer import SequenceServer 
  67  from spacewalk.server.importlib.errataCache import schedule_errata_cache_update 
  68   
  69  from spacewalk.server.importlib.importLib import InvalidChannelFamilyError 
  70  from spacewalk.server.importlib.importLib import MissingParentChannelError 
  71  from spacewalk.server.importlib.importLib import get_nevra, get_nevra_dict 
  72   
  73  import satCerts 
  74  import req_channels 
  75  import messages 
  76  import sync_handlers 
  77  import constants 
  78   
  79  translation = gettext.translation('spacewalk-backend-server', fallback=True) 
  80  _ = translation.ugettext 
  81  initCFG('server.satellite') 
  82  initLOG(CFG.LOG_FILE, CFG.DEBUG) 
  83   
  84  _DEFAULT_SYSTEMID_PATH = '/etc/sysconfig/rhn/systemid' 
  85  DEFAULT_ORG = 1 
  86   
  87  # the option object is used everywhere in this module... make it a 
  88  # global so we don't have to pass it to everyone. 
  89  OPTIONS = None 
90 91 # pylint: disable=W0212 92 93 94 -class Runner:
95 step_precedence = { 96 'packages': ['download-packages'], 97 'source-packages': ['download-source-packages'], 98 'errata': ['download-errata'], 99 'kickstarts': ['download-kickstarts'], 100 'rpms': [''], 101 'srpms': [''], 102 'channels': ['channel-families'], 103 'channel-families': [''], 104 'short': [''], 105 'download-errata': ['errata'], 106 'download-packages': [''], 107 'download-source-packages': [''], 108 'download-kickstarts': [''], 109 'arches': [''], # 5/26/05 wregglej 156079 Added arches to precedence list. 110 'orgs': [''], 111 } 112 113 # The step hierarchy. We need access to it both for command line 114 # processing and for the actions themselves 115 step_hierarchy = [ 116 'orgs', 117 'channel-families', 118 'arches', 119 'channels', 120 'short', 121 'download-packages', 122 'rpms', 123 'packages', 124 'srpms', 125 'download-source-packages', 126 'download-errata', 127 'download-kickstarts', 128 'source-packages', 129 'errata', 130 'kickstarts', 131 ] 132
133 - def __init__(self):
134 self.syncer = None 135 self.packages_report = None 136 self._xml_file_dir_error_message = '' 137 self._affected_channels = None 138 self._packages_report = None 139 self._actions = None
140 141 # 5/24/05 wregglej - 156079 turn off a step's dependents in the step is turned off.
142 - def _handle_step_dependents(self, actionDict, step):
143 ad = actionDict 144 145 if step in ad: 146 # if the step is turned off, then the steps that are dependent on it have to be turned 147 # off as well. 148 if ad[step] == 0: 149 ad = self._turn_off_dependents(ad, step) 150 151 # if the step isn't in the actionDict, then it's dependent actions must be turned off. 152 else: 153 ad = self._turn_off_dependents(ad, step) 154 return ad
155 156 # 5/24/05 wregglej - 156079 actually turns off the dependent steps, which are listed in the step_precedence 157 # dictionary.
158 - def _turn_off_dependents(self, actionDict, step):
159 ad = actionDict 160 for dependent in self.step_precedence[step]: 161 if dependent in ad: 162 ad[dependent] = 0 163 return ad
164
165 - def main(self):
166 """Main routine: commandline processing, etc...""" 167 168 # let's time the whole process 169 timeStart = time.time() 170 171 actionDict, channels = processCommandline() 172 173 # 5/24/05 wregglej - 156079 turn off an step's dependent steps if it's turned off. 174 # look at self.step_precedence for a listing of how the steps are dependent on each other. 175 for st in self.step_hierarchy: 176 actionDict = self._handle_step_dependents(actionDict, st) 177 self._actions = actionDict 178 179 # 5/26/05 wregglej - 156079 have to handle the list-channels special case. 180 if 'list-channels' in actionDict: 181 if actionDict['list-channels'] == 1: 182 actionDict['channels'] = 1 183 actionDict['arches'] = 0 184 actionDict['channel-families'] = 1 185 channels = [] 186 187 # create and set permissions for package repository mountpoint. 188 _verifyPkgRepMountPoint() 189 190 if OPTIONS.email: 191 initEMAIL_LOG() 192 193 # init the synchronization processor 194 self.syncer = Syncer(channels, actionDict['list-channels'], actionDict['rpms'], 195 forceAllErrata=actionDict['force-all-errata']) 196 try: 197 self.syncer.initialize() 198 except (KeyboardInterrupt, SystemExit): 199 raise 200 except xmlWireSource.rpclib.xmlrpclib.Fault: 201 e = sys.exc_info()[1] 202 if CFG.ISS_PARENT: 203 if CFG.PRODUCT_NAME == 'Spacewalk': 204 log(-1, ['', messages.sw_iss_not_available % e.faultString], ) 205 else: 206 log(-1, ['', messages.sat_iss_not_available % e.faultString], ) 207 sys.exit(26) 208 else: 209 log(-1, ['', messages.syncer_error % e.faultString], ) 210 sys.exit(9) 211 212 except Exception: # pylint: disable=E0012, W0703 213 e = sys.exc_info()[1] 214 log(-1, ['', messages.syncer_error % e], ) 215 sys.exit(10) 216 217 log(1, ' db: %s/<password>@%s' % (CFG.DB_USER, CFG.DB_NAME)) 218 219 selected = [action for action in list(actionDict) if actionDict[action]] 220 log2(-1, 3, "Action list/commandline toggles: %s" % repr(selected), 221 stream=sys.stderr) 222 223 if OPTIONS.mount_point: 224 self._xml_file_dir_error_message = messages.file_dir_error % \ 225 OPTIONS.mount_point 226 227 if CFG.DB_BACKEND == 'oracle': 228 import cx_Oracle #pylint: disable=F0401 229 exception = cx_Oracle.IntegrityError 230 if CFG.DB_BACKEND == 'postgresql': 231 import psycopg2 #pylint: disable=F0401 232 exception = psycopg2.IntegrityError 233 234 for _try in range(2): 235 try: 236 for step in self.step_hierarchy: 237 if not actionDict[step]: 238 continue 239 method_name = '_step_' + step.replace('-', '_') 240 if not hasattr(self, method_name): 241 log(-1, _("No handler for step %s") % step) 242 continue 243 method = getattr(self, method_name) 244 ret = method() 245 if ret: 246 sys.exit(ret) 247 else: # for 248 # Everything went fine 249 break 250 except ReprocessingNeeded: 251 # Try one more time - this time it should be faster since 252 # everything should be cached 253 log(1, _('Environment changed, trying again...')) 254 continue 255 except RhnSyncException: 256 rhnSQL.rollback() 257 raise 258 except exception: 259 e = sys.exc_info()[1] 260 msg = _("ERROR: Encountered IntegrityError: \n" 261 + str(e) 262 + "\nconsider removing satellite-sync cache at /var/cache/rhn/satsync/*" 263 + " and re-run satellite-sync with same options.\n" 264 + "If this error persits after removing cache, please contact Red Hat support.") 265 log2stderr(-1, msg, cleanYN=1) 266 return 1 267 else: 268 log(1, _('Repeated failures')) 269 270 timeEnd = time.time() 271 delta_str = self._get_elapsed_time(timeEnd - timeStart) 272 273 log(1, _("""\ 274 Import complete: 275 Begin time: %s 276 End time: %s 277 Elapsed: %s 278 """) % (formatDateTime(dt=time.localtime(timeStart)), 279 formatDateTime(dt=time.localtime(timeEnd)), 280 delta_str), 281 cleanYN=1) 282 283 # mail out that log if appropriate 284 sendMail() 285 return 0
286 287 @staticmethod
288 - def _get_elapsed_time(elapsed):
289 elapsed = int(elapsed) 290 hours = elapsed / 60 / 60 291 mins = elapsed / 60 - hours * 60 292 secs = elapsed - mins * 60 - hours * 60 * 60 293 294 delta_list = [[hours, _("hours")], [mins, _("minutes")], [secs, _("seconds")]] 295 delta_str = ", ".join(["%s %s" % (l[0], l[1]) for l in delta_list]) 296 return delta_str
297
298 - def _run_syncer_step(self, function, step_name):
299 """ Runs a function, and catches the most common error cases """ 300 try: 301 ret = function() 302 except (xmlDiskSource.MissingXmlDiskSourceDirError, 303 xmlDiskSource.MissingXmlDiskSourceFileError), e: 304 log(-1, self._xml_file_dir_error_message + 305 '\n Error message: %s\n' % e) 306 return 1 307 except (KeyboardInterrupt, SystemExit): 308 raise 309 except xmlWireSource.rpclib.xmlrpclib.Fault: 310 e = sys.exc_info()[1] 311 log(-1, messages.failed_step % (step_name, e.faultString)) 312 return 1 313 except Exception: # pylint: disable=E0012, W0703 314 e = sys.exc_info()[1] 315 log(-1, messages.failed_step % (step_name, e)) 316 return 1 317 return ret
318
319 - def _step_arches(self):
320 self.syncer.processArches()
321
322 - def _step_channel_families(self):
323 self.syncer.processChannelFamilies()
324
325 - def _step_channels(self):
326 try: 327 self.syncer.process_channels() 328 except MissingParentChannelError: 329 e = sys.exc_info()[1] 330 msg = messages.parent_channel_error % repr(e.channel) 331 log(-1, msg) 332 # log2email(-1, msg) # redundant 333 sendMail() 334 return 1 335 return 0
336
337 - def _step_short(self):
338 try: 339 return self.syncer.processShortPackages() 340 except xmlDiskSource.MissingXmlDiskSourceFileError: 341 msg = _("ERROR: The dump is missing package data, " 342 + "use --no-rpms to skip this step or fix the content to include package data.") 343 log2disk(-1, msg) 344 log2stderr(-1, msg, cleanYN=1) 345 sys.exit(25)
346
347 - def _step_download_packages(self):
348 return self.syncer.download_package_metadata()
349
351 return self.syncer.download_source_package_metadata()
352
353 - def _step_rpms(self):
354 self._packages_report = self.syncer.download_rpms() 355 return None
356 357 # def _step_srpms(self): 358 # return self.syncer.download_srpms() 359
360 - def _step_download_errata(self):
361 return self.syncer.download_errata()
362
363 - def _step_download_kickstarts(self):
364 return self.syncer.download_kickstarts()
365
366 - def _step_packages(self):
367 self._affected_channels = self.syncer.import_packages()
368 369 # def _step_source_packages(self): 370 # self.syncer.import_packages(sources=1) 371
372 - def _step_errata(self):
373 self.syncer.import_errata() 374 # Now that errata have been populated, schedule an errata cache 375 # refresh 376 schedule_errata_cache_update(self._affected_channels)
377
378 - def _step_kickstarts(self):
379 self.syncer.import_kickstarts()
380
381 - def _step_orgs(self):
382 try: 383 self.syncer.import_orgs() 384 except (RhnSyncException, xmlDiskSource.MissingXmlDiskSourceFileError, 385 xmlDiskSource.MissingXmlDiskSourceDirError): 386 # the orgs() method doesn't exist; that's fine we just 387 # won't sync the orgs 388 log(1, [_("The upstream Satellite does not support syncing orgs data."), _("Skipping...")])
389
390 391 -def sendMail(forceEmail=0):
392 """ Send email summary """ 393 if forceEmail or (OPTIONS is not None and OPTIONS.email): 394 body = dumpEMAIL_LOG() 395 if body: 396 print(_("+++ sending log as an email +++")) 397 host_label = idn_puny_to_unicode(os.uname()[1]) 398 headers = { 399 'Subject' : _('RHN Management Satellite sync. report from %s') % host_label, 400 } 401 sndr = "root@%s" % host_label 402 if CFG.default_mail_from: 403 sndr = CFG.default_mail_from 404 rhnMail.send(headers, body, sender=sndr) 405 else: 406 print(_("+++ email requested, but there is nothing to send +++")) 407 # mail was sent. Let's not allow it to be sent twice... 408 OPTIONS.email = None
409
410 411 -class Syncer:
412 413 """ high-level sychronization/import class 414 NOTE: there should *ONLY* be one instance of this. 415 """ 416
417 - def __init__(self, channels, listChannelsYN, check_rpms, forceAllErrata=False):
418 """ Base initialization. Most work done in self.initialize() which 419 needs to be called soon after instantiation. 420 """ 421 422 self._requested_channels = channels 423 self.mountpoint = OPTIONS.mount_point 424 self.listChannelsYN = listChannelsYN 425 self.forceAllErrata = forceAllErrata 426 self.sslYN = not OPTIONS.no_ssl 427 self._systemidPath = OPTIONS.systemid or _DEFAULT_SYSTEMID_PATH 428 self._batch_size = OPTIONS.batch_size 429 self.master_label = OPTIONS.master 430 #self.create_orgs = OPTIONS.create_missing_orgs 431 self.xml_dump_version = OPTIONS.dump_version or str(constants.PROTOCOL_VERSION) 432 self.check_rpms = check_rpms 433 self.keep_rpms = OPTIONS.keep_rpms 434 435 # Object to help with channel math 436 self._channel_req = None 437 self._channel_collection = sync_handlers.ChannelCollection() 438 439 self.containerHandler = sync_handlers.ContainerHandler( 440 self.master_label) 441 442 # instantiated in self.initialize() 443 self.xmlDataServer = None 444 self.systemid = None 445 446 self.reporegen = set() 447 448 # self._*_full hold list of all ids for appropriate channel while 449 # non-full self._* contain every id only once (in first channel it appeared) 450 self._channel_packages = {} 451 self._channel_packages_full = {} 452 self._avail_channel_packages = {} 453 454 self._missing_channel_packages = None 455 self._missing_fs_packages = None 456 457 self._failed_fs_packages = Queue.Queue() 458 self._extinct_packages = Queue.Queue() 459 460 self._channel_errata = {} 461 self._missing_channel_errata = {} 462 463 self._channel_source_packages = {} 464 self._channel_source_packages_full = {} 465 466 self._channel_kickstarts = {} 467 self._avail_channel_source_packages = None 468 self._missing_channel_src_packages = None 469 self._missing_fs_source_packages = None
470
471 - def initialize(self):
472 """Initialization that requires IO, etc.""" 473 474 # Sync from filesystem: 475 if self.mountpoint: 476 log(1, [_(PRODUCT_NAME + ' - file-system synchronization'), 477 ' mp: %s' % self.mountpoint]) 478 self.xmlDataServer = xmlDiskSource.MetadataDiskSource(self.mountpoint) 479 # Sync across the wire: 480 else: 481 self.xmlDataServer = xmlWireSource.MetadataWireSource(self.systemid, 482 self.sslYN, self.xml_dump_version) 483 if CFG.ISS_PARENT: 484 sync_parent = CFG.ISS_PARENT 485 self.systemid = 'N/A' # systemid is not used in ISS auth process 486 is_iss = 1 487 else: 488 log(1, _(PRODUCT_NAME + ' - live synchronization')) 489 log(-1, _("ERROR: Live content synchronizing with RHN Classic Hosted is no longer supported.\nPlease " 490 "use the cdn-sync command instead unless you are attempting to sync from another Satellite " 491 "via Inter-Satelite-Sync (ISS), or from local content on disk via Channel Dump ISOs."), 492 stream=sys.stderr) 493 sys.exit(1) 494 495 url = self.xmlDataServer.schemeAndUrl(sync_parent) 496 log(1, [_(PRODUCT_NAME + ' - live synchronization'), 497 _(' url: %s') % url, 498 _(' debug/output level: %s') % CFG.DEBUG]) 499 self.xmlDataServer.setServerHandler(isIss=is_iss) 500 501 if not self.systemid: 502 # check and fetch systemid (NOTE: systemid kept in memory... may or may not 503 # be better to do it this way). 504 if (os.path.exists(self._systemidPath) 505 and os.access(self._systemidPath, os.R_OK)): 506 self.systemid = open(self._systemidPath, 'rb').read() 507 else: 508 usix.raise_with_tb(RhnSyncException(_('ERROR: this server must be registered with RHN.')), 509 sys.exc_info()[2]) 510 # authorization check of the satellite 511 auth = xmlWireSource.AuthWireSource(self.systemid, self.sslYN, 512 self.xml_dump_version) 513 auth.checkAuth()
514
515 - def __del__(self):
516 self.containerHandler.close()
517
518 - def _process_simple(self, remote_function_name, step_name):
519 """ Wrapper function that can process metadata that is relatively 520 simple. This does the decoding of data (over the wire or from 521 disk). 522 523 step_name is just for pretty printing the actual --step name to 524 the console. 525 526 The remote function is passed by name (as a string), to mimic the 527 lazy behaviour of the if block 528 """ 529 530 log(1, ["", _("Retrieving / parsing %s data") % step_name]) 531 # get XML stream 532 stream = None 533 method = getattr(self.xmlDataServer, remote_function_name) 534 stream = method() 535 536 # parse/process XML stream 537 try: 538 self.containerHandler.process(stream) 539 except KeyboardInterrupt: 540 log(-1, _('*** SYSTEM INTERRUPT CALLED ***'), stream=sys.stderr) 541 raise 542 except (FatalParseException, ParseException, Exception): # pylint: disable=E0012, W0703 543 e = sys.exc_info()[1] 544 # nuke the container batch upon error! 545 self.containerHandler.clear() 546 msg = '' 547 if isinstance(e, FatalParseException): 548 msg = (_('ERROR: fatal parser exception occurred ') + 549 _('(line: %s, col: %s msg: %s)') % ( 550 e.getLineNumber(), e.getColumnNumber(), 551 e._msg)) 552 elif isinstance(e, ParseException): 553 msg = (_('ERROR: parser exception occurred: %s') % (e)) 554 elif isinstance(e, exceptions.SystemExit): 555 log(-1, _('*** SYSTEM INTERRUPT CALLED ***'), stream=sys.stderr) 556 raise 557 else: 558 msg = _('ERROR: exception (during parse) occurred: ') 559 log2stderr(-1, _(' Encountered some errors with %s data ' 560 + '(see logs (%s) for more information)') % (step_name, CFG.LOG_FILE)) 561 log2(-1, 3, [_(' Encountered some errors with %s data:') % step_name, 562 _(' ------- %s PARSE/IMPORT ERROR -------') % step_name, 563 ' %s' % msg, 564 _(' ---------------------------------------')], stream=sys.stderr) 565 exitWithTraceback(e, '', 11) 566 self.containerHandler.reset() 567 log(1, _("%s data complete") % step_name)
568
569 - def processArches(self):
570 self._process_simple("getArchesXmlStream", "arches") 571 self._process_simple("getArchesExtraXmlStream", "additional arches")
572
573 - def import_orgs(self):
574 self._process_simple("getOrgsXmlStream", "orgs")
575
576 - def processChannelFamilies(self):
577 self._process_simple("getChannelFamilyXmlStream", "channel-families") 578 # pylint: disable=W0703 579 try: 580 self._process_simple("getProductNamesXmlStream", "product names") 581 except Exception: 582 pass
583
584 - def _write_repomd(self, repomd_path, getRepomdFunc, repomdFileStreamFunc, label, timestamp):
585 full_path = os.path.join(CFG.MOUNT_POINT, repomd_path) 586 if not os.path.exists(full_path): 587 if self.mountpoint or CFG.ISS_PARENT: 588 stream = getRepomdFunc(label) 589 else: 590 stream = repomdFileStreamFunc(label) 591 f = FileManip(repomd_path, timestamp, None) 592 f.write_file(stream) 593 self.reporegen.add(label)
594
595 - def _process_comps(self, backend, label, timestamp):
596 comps_path = 'rhn/comps/%s/comps-%s.xml' % (label, timestamp) 597 self._write_repomd(comps_path, self.xmlDataServer.getComps, \ 598 xmlWireSource.RPCGetWireSource(self.systemid, self.sslYN, self.xml_dump_version).getCompsFileStream, \ 599 label, timestamp) 600 data = {label: None} 601 backend.lookupChannels(data) 602 rhnSQL.Procedure('rhn_channel.set_comps')(data[label]['id'], comps_path, 1, timestamp)
603 604 605
606 - def _process_modules(self, backend, label, timestamp):
607 modules_path = 'rhn/modules/%s/modules-%s.yaml' % (label, timestamp) 608 self._write_repomd(modules_path, self.xmlDataServer.getModules, \ 609 xmlWireSource.RPCGetWireSource(self.systemid, self.sslYN, self.xml_dump_version).getModulesFilesStram, \ 610 label, timestamp) 611 data = {label: None} 612 backend.lookupChannels(data) 613 rhnSQL.Procedure('rhn_channel.set_comps')(data[label]['id'], modules_path, 2, timestamp)
614 615
616 - def process_channels(self):
617 """ push channels, channel-family and dist. map information 618 as well upon parsing. 619 """ 620 log(1, ["", _("Retrieving / parsing channel data")]) 621 622 h = sync_handlers.get_channel_handler() 623 624 # get channel XML stream 625 stream = self.xmlDataServer.getChannelXmlStream() 626 if self.mountpoint: 627 for substream in stream: 628 h.process(substream) 629 doEOSYN = 0 630 else: 631 h.process(stream) 632 doEOSYN = 1 633 634 h.close() 635 636 # clean up the channel request and populate self._channel_request 637 # This essentially determines which channels are to be imported 638 self._compute_channel_request() 639 640 # print out the relevant channel tree 641 # 3/6/06 wregglej 183213 Don't print out the end-of-service message if 642 # satellite-sync is running with the --mount-point (-m) option. If it 643 # did, it would incorrectly list channels as end-of-service if they had been 644 # synced already but aren't in the channel dump. 645 self._printChannelTree(doEOSYN=doEOSYN) 646 647 if self.listChannelsYN: 648 # We're done here 649 return 650 651 requested_channels = self._channel_req.get_requested_channels() 652 try: 653 importer = sync_handlers.import_channels(requested_channels, 654 orgid=OPTIONS.orgid or None, 655 master=OPTIONS.master or None) 656 for label in requested_channels: 657 timestamp = self._channel_collection.get_channel_timestamp(label) 658 ch = self._channel_collection.get_channel(label, timestamp) 659 if ch.has_key('comps_last_modified') and ch['comps_last_modified'] is not None: 660 self._process_comps(importer.backend, label, sync_handlers._to_timestamp(ch['comps_last_modified'])) 661 if ch.has_key('modules_last_modified') and ch['modules_last_modified'] is not None: 662 self._process_modules(importer.backend, label, \ 663 sync_handlers._to_timestamp(ch['modules_last_modified'])) 664 665 except InvalidChannelFamilyError: 666 usix.raise_with_tb(RhnSyncException(messages.invalid_channel_family_error % 667 ''.join(requested_channels)), sys.exc_info()[2]) 668 except MissingParentChannelError: 669 raise 670 671 rhnSQL.commit() 672 673 log(1, _("Channel data complete"))
674 675 @staticmethod
676 - def _formatChannelExportType(channel):
677 """returns pretty formated text with type of channel export""" 678 if 'export-type' not in channel or channel['export-type'] is None: 679 return '' 680 else: 681 export_type = channel['export-type'] 682 if 'export-start-date' in channel and channel['export-start-date'] is not None: 683 start_date = channel['export-start-date'] 684 else: 685 start_date = '' 686 if 'export-end-date' in channel and channel['export-end-date'] is not None: 687 end_date = channel['export-end-date'] 688 else: 689 end_date = '' 690 if end_date and not start_date: 691 return _("%10s import from %s") % (export_type, 692 formatDateTime(end_date)) 693 elif end_date and start_date: 694 return _("%10s import from %s - %s") % (export_type, 695 formatDateTime(start_date), 696 formatDateTime(end_date)) 697 else: 698 return _("%10s") % export_type
699
700 - def _printChannel(self, label, channel_object, log_format, is_imported):
701 assert channel_object is not None 702 all_pkgs = channel_object['all-packages'] or channel_object['packages'] 703 pkgs_count = len(all_pkgs) 704 if is_imported: 705 status = _('p') 706 else: 707 status = _('.') 708 log(1, log_format % (status, label, pkgs_count, self._formatChannelExportType(channel_object)))
709
710 - def _printChannelTree(self, doEOSYN=1, doTyposYN=1):
711 "pretty prints a tree of channel information" 712 713 log(1, _(' p = previously imported/synced channel')) 714 log(1, _(' . = channel not yet imported/synced')) 715 ch_end_of_service = self._channel_req.get_end_of_service() 716 ch_typos = self._channel_req.get_typos() 717 ch_requested_imported = self._channel_req.get_requested_imported() 718 relevant = self._channel_req.get_requested_channels() 719 if doEOSYN and ch_end_of_service: 720 log(1, _(' e = channel no longer supported (end-of-service)')) 721 if doTyposYN and ch_typos: 722 log(1, _(' ? = channel label invalid --- typo?')) 723 724 pc_labels = sorted(self._channel_collection.get_parent_channel_labels()) 725 726 t_format = _(' %s:') 727 p_format = _(' %s %-40s %4s %s') 728 log(1, t_format % _('base-channels')) 729 # Relevant parent channels 730 no_base_channel = True 731 for plabel in pc_labels: 732 if plabel not in relevant: 733 continue 734 735 no_base_channel = False 736 timestamp = self._channel_collection.get_channel_timestamp(plabel) 737 channel_object = self._channel_collection.get_channel(plabel, 738 timestamp) 739 self._printChannel(plabel, channel_object, p_format, (plabel in ch_requested_imported)) 740 if no_base_channel: 741 log(1, p_format % (' ', _('NONE RELEVANT'), '', '')) 742 743 # Relevant parent channels 744 for plabel in pc_labels: 745 cchannels = self._channel_collection.get_child_channels(plabel) 746 # chns has only the channels we are interested in 747 # (and that's all the channels if we list them) 748 chns = [] 749 for clabel, ctimestamp in cchannels: 750 if clabel in relevant: 751 chns.append((clabel, ctimestamp)) 752 if not chns: 753 # No child channels, skip 754 continue 755 log(1, t_format % plabel) 756 for clabel, ctimestamp in sorted(chns): 757 channel_object = self._channel_collection.get_channel(clabel, 758 ctimestamp) 759 self._printChannel(clabel, channel_object, p_format, (clabel in ch_requested_imported)) 760 log(2, '') 761 762 if doEOSYN and ch_end_of_service: 763 log(1, t_format % _('end-of-service')) 764 status = _('e') 765 for chn in ch_end_of_service: 766 log(1, p_format % (status, chn, '', '')) 767 log(2, '') 768 769 if doTyposYN and ch_typos: 770 log(1, _(' typos:')) 771 status = _('?') 772 for chn in ch_typos: 773 log(1, p_format % (status, chn, '', '')) 774 log(2, '') 775 log(1, '')
776
777 - def _compute_channel_request(self):
778 """ channels request is verify and categorized. 779 780 NOTE: self.channel_req *will be* initialized by this method 781 """ 782 783 # channels already imported, and all channels 784 importedChannels = _getImportedChannels() 785 availableChannels = self._channel_collection.get_channel_labels() 786 log(6, _('XXX: imported channels: %s') % importedChannels, 1) 787 log(6, _('XXX: cached channels: %s') % availableChannels, 1) 788 789 # if requested a channel list, we are requesting all channels 790 if self.listChannelsYN: 791 requested_channels = availableChannels 792 log(6, _('XXX: list channels called'), 1) 793 else: 794 requested_channels = set() 795 for channel in self._requested_channels: 796 match_channels = set(fnmatch.filter(importedChannels, channel)) 797 match_channels = match_channels.union(fnmatch.filter(availableChannels, channel)) 798 if not match_channels: 799 match_channels = [channel,] 800 requested_channels = requested_channels.union(match_channels) 801 802 rc = req_channels.RequestedChannels(list(requested_channels)) 803 rc.set_available(availableChannels) 804 rc.set_imported(importedChannels) 805 # rc does all the logic of doing intersections and stuff 806 rc.compute() 807 808 typos = rc.get_typos() 809 if typos: 810 log(-1, _("ERROR: these channels either do not exist or " 811 "are not available:")) 812 for chn in typos: 813 log(-1, " %s" % chn) 814 log(-1, _(" (to see a list of channel labels: %s --list-channels)") % sys.argv[0]) 815 sys.exit(12) 816 self._channel_req = rc 817 return rc
818
819 - def _get_channel_timestamp(self, channel):
820 try: 821 timestamp = self._channel_collection.get_channel_timestamp(channel) 822 except KeyError: 823 # XXX Do something with this exception 824 raise 825 return timestamp
826
827 - def _compute_unique_packages(self):
828 """ process package metadata for one channel at a time """ 829 relevant = sorted(self._channel_req.get_requested_channels()) 830 self._channel_packages = {} 831 self._channel_packages_full = {} 832 self._avail_channel_packages = {} 833 already_seen_ids = set() 834 for chn in relevant: 835 timestamp = self._get_channel_timestamp(chn) 836 837 channel_obj = self._channel_collection.get_channel(chn, timestamp) 838 avail_package_ids = sorted(set(channel_obj['packages'] or [])) 839 package_full_ids = sorted(set(channel_obj['all-packages'] or [])) or avail_package_ids 840 package_ids = sorted(set(package_full_ids) - already_seen_ids) 841 842 self._channel_packages[chn] = package_ids 843 self._channel_packages_full[chn] = package_full_ids 844 self._avail_channel_packages[chn] = avail_package_ids 845 already_seen_ids.update(package_ids)
846
847 - def processShortPackages(self):
848 log(1, ["", "Retrieving short package metadata (used for indexing)"]) 849 850 # Compute the unique packages and populate self._channel_packages 851 self._compute_unique_packages() 852 853 stream_loader = StreamProducer( 854 sync_handlers.get_short_package_handler(), 855 self.xmlDataServer, 'getChannelShortPackagesXmlStream') 856 857 sorted_channels = sorted(list(self._channel_packages.items()), key=lambda x: x[0]) # sort by channel_label 858 for channel_label, package_ids in sorted_channels: 859 log(1, _(" Retrieving / parsing short package metadata: %s (%s)") % 860 (channel_label, len(package_ids))) 861 862 if package_ids: 863 lm = self._channel_collection.get_channel_timestamp(channel_label) 864 channel_last_modified = int(rhnLib.timestamp(lm)) 865 866 stream_loader.set_args(channel_label, channel_last_modified) 867 stream_loader.process(package_ids) 868 869 stream_loader.close() 870 871 self._diff_packages()
872 873 _query_compare_packages = """ 874 select p.id, c.checksum_type, c.checksum, p.path, p.package_size, 875 TO_CHAR(p.last_modified, 'YYYYMMDDHH24MISS') last_modified 876 from rhnPackage p, rhnChecksumView c 877 where p.name_id = lookup_package_name(:name) 878 and p.evr_id = lookup_evr(:epoch, :version, :release) 879 and p.package_arch_id = lookup_package_arch(:arch) 880 and (p.org_id = :org_id or 881 (p.org_id is null and :org_id is null)) 882 and p.checksum_id = c.id 883 """ 884
885 - def _diff_packages_process(self, chunk, channel_label):
886 package_collection = sync_handlers.ShortPackageCollection() 887 888 h = rhnSQL.prepare(self._query_compare_packages) 889 for pid in chunk: 890 package = package_collection.get_package(pid) 891 assert package is not None 892 l_timestamp = rhnLib.timestamp(package['last_modified']) 893 894 if package['org_id'] is not None: 895 package['org_id'] = OPTIONS.orgid or DEFAULT_ORG 896 nevra = get_nevra_dict(package) 897 nevra['org_id'] = package['org_id'] 898 899 h.execute(**nevra) 900 row = None 901 for r in (h.fetchall_dict() or []): 902 # let's check which checksum we have in database 903 if (r['checksum_type'] in package['checksums'] 904 and package['checksums'][r['checksum_type']] == r['checksum']): 905 row = r 906 break 907 908 self._process_package(pid, package, l_timestamp, row, 909 self._missing_channel_packages[channel_label], 910 self._missing_fs_packages[channel_label], 911 check_rpms=self.check_rpms)
912 913 # XXX the "is null" condition will have to change in multiorg satellites
914 - def _diff_packages(self):
915 self._missing_channel_packages = {} 916 self._missing_fs_packages = {} 917 918 sorted_channels = sorted(list(self._channel_packages.items()), key=lambda x: x[0]) # sort by channel_label 919 for channel_label, upids in sorted_channels: 920 log(1, _("Diffing package metadata (what's missing locally?): %s") % 921 channel_label) 922 self._missing_channel_packages[channel_label] = [] 923 self._missing_fs_packages[channel_label] = [] 924 self._process_batch(channel_label, upids[:], None, 925 self._diff_packages_process, 926 _('Diffing: '), 927 [channel_label]) 928 929 self._verify_missing_channel_packages(self._missing_channel_packages)
930
931 - def _verify_missing_channel_packages(self, missing_channel_packages, sources=0):
932 """Verify if all the missing packages are actually available somehow. 933 In an incremental approach, one may request packages that are actually 934 not available in the current dump, probably because of applying an 935 incremental to the wrong base""" 936 for channel_label, pids in missing_channel_packages.items(): 937 if sources: 938 avail_pids = [x[0] for x in self._avail_channel_source_packages[channel_label]] 939 else: 940 avail_pids = self._avail_channel_packages[channel_label] 941 942 if set(pids or []) > set(avail_pids or []): 943 raise RhnSyncException(_('ERROR: incremental dump skipped'))
944 945 @staticmethod
946 - def _get_rel_package_path(nevra, org_id, source, checksum_type, checksum):
947 return get_package_path(nevra, org_id, prepend=CFG.PREPENDED_DIR, 948 source=source, checksum_type=checksum_type, checksum=checksum)
949 950 @staticmethod
951 - def _verify_file(path, mtime, size, checksum_type, checksum):
952 """ 953 Verifies if the file is on the filesystem and matches the mtime and checksum. 954 Computing the checksum is costly, that's why we rely on mtime comparisons. 955 Returns errcode: 956 0 - file is ok, it has either the specified mtime and size 957 or checksum matches (then function sets mtime) 958 1 - file does not exist at all 959 2 - file has a different checksum 960 """ 961 if not path: 962 return 1 963 abs_path = os.path.join(CFG.MOUNT_POINT, path) 964 try: 965 stat_info = os.stat(abs_path) 966 except OSError: 967 # File is missing completely 968 return 1 969 970 l_mtime = stat_info[stat.ST_MTIME] 971 l_size = stat_info[stat.ST_SIZE] 972 if l_mtime == mtime and l_size == size: 973 # Same mtime, and size, assume identity 974 return 0 975 976 # Have to check checksum 977 l_checksum = getFileChecksum(checksum_type, filename=abs_path) 978 if l_checksum != checksum: 979 return 2 980 981 # Set the mtime 982 os.utime(abs_path, (mtime, mtime)) 983 return 0
984
985 - def _process_package(self, package_id, package, l_timestamp, row, 986 m_channel_packages, m_fs_packages, check_rpms=1):
987 path = None 988 channel_package = None 989 fs_package = None 990 if row: 991 # package found in the DB 992 checksum_type = row['checksum_type'] 993 if checksum_type in package['checksums']: 994 checksum = package['checksums'][row['checksum_type']] 995 package_size = package['package_size'] 996 997 db_timestamp = int(rhnLib.timestamp(row['last_modified'])) 998 db_checksum = row['checksum'] 999 db_package_size = row['package_size'] 1000 db_path = row['path'] 1001 1002 if not (l_timestamp <= db_timestamp and 1003 checksum == db_checksum and 1004 package_size == db_package_size): 1005 # package doesn't match 1006 channel_package = package_id 1007 1008 if check_rpms: 1009 if db_path: 1010 # check the filesystem 1011 errcode = self._verify_file(db_path, l_timestamp, 1012 package_size, checksum_type, checksum) 1013 if errcode: 1014 # file doesn't match 1015 fs_package = package_id 1016 channel_package = package_id 1017 path = db_path 1018 else: 1019 # upload package and reimport metadata 1020 channel_package = package_id 1021 fs_package = package_id 1022 else: 1023 # package is missing from the DB 1024 channel_package = package_id 1025 fs_package = package_id 1026 1027 if channel_package: 1028 m_channel_packages.append(channel_package) 1029 if fs_package: 1030 m_fs_packages.append((fs_package, path)) 1031 return
1032
1033 - def download_rpms(self):
1034 log(1, ["", _("Downloading rpm packages")]) 1035 # Lets go fetch the packages and push them to their proper location: 1036 sorted_channels = sorted(list(self._missing_fs_packages.items()), key=lambda x: x[0]) # sort by channel 1037 for channel, missing_fs_packages in sorted_channels: 1038 missing_packages_count = len(missing_fs_packages) 1039 log(1, _(" Fetching any missing RPMs: %s (%s)") % 1040 (channel, missing_packages_count or _('NONE MISSING'))) 1041 if missing_packages_count == 0: 1042 continue 1043 1044 # Fetch all RPMs whose meta-data is marked for need to be imported 1045 # (ie. high chance of not being there) 1046 self._fetch_packages(channel, missing_fs_packages) 1047 continue 1048 1049 log(1, _("Processing rpm packages complete"))
1050
1052 missing_packages = {} 1053 1054 # First, determine what has to be downloaded 1055 short_package_collection = sync_handlers.ShortPackageCollection() 1056 package_collection = sync_handlers.PackageCollection() 1057 for channel, pids in self._missing_channel_packages.items(): 1058 missing_packages[channel] = mp = [] 1059 1060 if not pids: 1061 # Nothing to see here 1062 continue 1063 1064 for pid in pids: 1065 # XXX Catch errors 1066 if (not package_collection.has_package(pid) 1067 or package_collection.get_package(pid)['last_modified'] 1068 != short_package_collection.get_package(pid)['last_modified']): 1069 # not in the cache 1070 mp.append(pid) 1071 1072 return missing_packages
1073
1074 - def download_package_metadata(self):
1075 log(1, ["", _("Downloading package metadata")]) 1076 # Get the missing but uncached packages 1077 missing_packages = self._missing_not_cached_packages() 1078 1079 stream_loader = StreamProducer( 1080 sync_handlers.get_package_handler(), 1081 self.xmlDataServer, 'getPackageXmlStream') 1082 1083 sorted_channels = sorted(list(missing_packages.items()), key=lambda x: x[0]) # sort by channel 1084 for channel, pids in sorted_channels: 1085 self._process_batch(channel, pids[:], messages.package_parsing, 1086 stream_loader.process, is_slow=True) 1087 stream_loader.close() 1088 1089 # Double-check that we got all the packages 1090 missing_packages = self._missing_not_cached_packages() 1091 for channel, pids in missing_packages.items(): 1092 if pids: 1093 # Something may have changed from the moment we started to 1094 # download the packages till now 1095 raise ReprocessingNeeded
1096
1097 - def download_srpms(self):
1098 self._compute_unique_source_packages() 1099 self._diff_source_packages() 1100 log(1, ["", _("Downloading srpm packages")]) 1101 # Lets go fetch the source packages and push them to their proper location: 1102 # sort by channel_label 1103 sorted_channels = sorted(list(self._missing_fs_source_packages.items()), key=lambda x: x[0]) 1104 for channel, missing_fs_source_packages in sorted_channels: 1105 missing_source_packages_count = len(missing_fs_source_packages) 1106 log(1, _(" Fetching any missing SRPMs: %s (%s)") % 1107 (channel, missing_source_packages_count or _('NONE MISSING'))) 1108 if missing_source_packages_count == 0: 1109 continue 1110 1111 # Fetch all SRPMs whose meta-data is marked for need to be imported 1112 # (ie. high chance of not being there) 1113 self._fetch_packages(channel, missing_fs_source_packages, sources=1) 1114 continue 1115 1116 log(1, "Processing srpm packages complete")
1117
1119 """ process package metadata for one channel at a time""" 1120 relevant = self._channel_req.get_requested_channels() 1121 self._channel_source_packages = {} 1122 self._channel_source_packages_full = {} 1123 self._avail_channel_source_packages = {} 1124 1125 already_seen_ids = set() 1126 for chn in relevant: 1127 timestamp = self._get_channel_timestamp(chn) 1128 1129 channel_obj = self._channel_collection.get_channel(chn, timestamp) 1130 sps = set(channel_obj['source_packages']) 1131 if not sps: 1132 # No source package info 1133 continue 1134 ret_sps = [] 1135 for sp in sps: 1136 if isinstance(sp, usix.StringType): 1137 # Old style 1138 ret_sps.append((sp, None)) 1139 else: 1140 ret_sps.append((sp['id'], sp['last_modified'])) 1141 del sps 1142 ret_sps.sort() 1143 self._channel_source_packages[chn] = sorted(set(ret_sps) - already_seen_ids) 1144 self._channel_source_packages_full[chn] = ret_sps 1145 self._avail_channel_source_packages[chn] = ret_sps 1146 already_seen_ids.update(ret_sps)
1147
1149 missing_sps = {} 1150 1151 # First, determine what has to be downloaded 1152 sp_collection = sync_handlers.SourcePackageCollection() 1153 for channel, sps in self._channel_source_packages.items(): 1154 missing_sps[channel] = [] 1155 if not sps: 1156 # Nothing to see here 1157 continue 1158 missing_sps[channel] = [sp_id for (sp_id, _timestamp) in sps 1159 if not sp_collection.has_package(sp_id)] 1160 return missing_sps
1161 1162 _query_compare_source_packages = """ 1163 select ps.id, c.checksum_type, c.checksum, ps.path, ps.package_size, 1164 TO_CHAR(ps.last_modified, 'YYYYMMDDHH24MISS') last_modified 1165 from rhnPackageSource ps, rhnChecksumView c 1166 where ps.source_rpm_id = lookup_source_name(:package_id) 1167 and (ps.org_id = :org_id or 1168 (ps.org_id is null and :org_id is null)) 1169 and ps.checksum_id = c.id 1170 and c.checksum = :checksum 1171 and c.checksum_type = :checksum_type 1172 """ 1173
1174 - def _diff_source_packages_process(self, chunk, channel_label):
1175 package_collection = sync_handlers.SourcePackageCollection() 1176 sql_params = ['package_id', 'checksum', 'checksum_type'] 1177 h = rhnSQL.prepare(self._query_compare_source_packages) 1178 for pid, _timestamp in chunk: 1179 package = package_collection.get_package(pid) 1180 assert package is not None 1181 1182 params = {} 1183 for t in sql_params: 1184 params[t] = package[t] or "" 1185 1186 if package['org_id'] is not None: 1187 params['org_id'] = OPTIONS.orgid or DEFAULT_ORG 1188 package['org_id'] = OPTIONS.orgid or DEFAULT_ORG 1189 else: 1190 params['org_id'] = package['org_id'] 1191 1192 h.execute(**params) 1193 row = h.fetchone_dict() 1194 self._process_package(pid, package, None, row, 1195 self._missing_channel_src_packages[channel_label], 1196 self._missing_fs_source_packages[channel_label])
1197 1198 # XXX the "is null" condition will have to change in multiorg satellites
1199 - def _diff_source_packages(self):
1200 self._missing_channel_src_packages = {} 1201 self._missing_fs_source_packages = {} 1202 for channel_label, upids in self._channel_source_packages.items(): 1203 log(1, _("Diffing source package metadata (what's missing locally?): %s") % channel_label) 1204 self._missing_channel_src_packages[channel_label] = [] 1205 self._missing_fs_source_packages[channel_label] = [] 1206 self._process_batch(channel_label, upids[:], None, 1207 self._diff_source_packages_process, 1208 _('Diffing: '), 1209 [channel_label]) 1210 1211 self._verify_missing_channel_packages(self._missing_channel_src_packages, sources=1)
1212
1214 log(1, ["", _("Downloading source package metadata")]) 1215 1216 # Get the missing but uncached packages 1217 missing_packages = self._compute_not_cached_source_packages() 1218 1219 stream_loader = StreamProducer( 1220 sync_handlers.get_source_package_handler(), 1221 self.xmlDataServer, 'getSourcePackageXmlStream') 1222 1223 for channel, pids in missing_packages.items(): 1224 self._process_batch(channel, pids[:], messages.package_parsing, 1225 stream_loader.process, is_slow=True) 1226 stream_loader.close() 1227 1228 # Double-check that we got all the packages 1229 missing_packages = self._compute_not_cached_source_packages() 1230 for channel, pids in missing_packages.items(): 1231 if pids: 1232 # Something may have changed from the moment we started to 1233 # download the packages till now 1234 raise ReprocessingNeeded
1235
1236 - def _compute_unique_kickstarts(self):
1237 """ process package metadata for one channel at a time""" 1238 relevant = self._channel_req.get_requested_channels() 1239 self._channel_kickstarts = {} 1240 already_seen_kickstarts = set() 1241 for chn in relevant: 1242 timestamp = self._get_channel_timestamp(chn) 1243 1244 channel_obj = self._channel_collection.get_channel(chn, timestamp) 1245 self._channel_kickstarts[chn] = \ 1246 sorted(set(channel_obj['kickstartable_trees']) 1247 - already_seen_kickstarts) 1248 already_seen_kickstarts.update(self._channel_kickstarts[chn])
1249
1251 """ process package metadata for one channel at a time""" 1252 relevant = self._channel_req.get_requested_channels() 1253 coll = sync_handlers.KickstartableTreesCollection() 1254 missing_kickstarts = {} 1255 for chn in relevant: 1256 timestamp = self._get_channel_timestamp(chn) 1257 1258 channel_obj = self._channel_collection.get_channel(chn, timestamp) 1259 kickstart_trees = channel_obj['kickstartable_trees'] 1260 1261 for ktid in kickstart_trees: 1262 # No timestamp for kickstartable trees 1263 kt = coll.get_item(ktid, timestamp=None) 1264 assert kt is not None 1265 kt_label = kt['label'] 1266 1267 # XXX rhnKickstartableTree does not have a last_modified 1268 # Once we add it, we should be able to do more meaningful 1269 # diffs 1270 missing_kickstarts[kt_label] = None 1271 1272 ret = list(missing_kickstarts.items()) 1273 ret.sort() 1274 return ret
1275
1276 - def _download_kickstarts_file(self, chunk, channel_label):
1277 cfg = config.initUp2dateConfig() 1278 assert len(chunk) == 1 1279 item = chunk[0] 1280 label, base_path, relative_path, timestamp, file_size = item 1281 path = os.path.join(base_path, relative_path) 1282 f = FileManip(path, timestamp=timestamp, file_size=file_size) 1283 # Retry a number of times, we may have network errors 1284 for _try in range(cfg['networkRetries']): 1285 stream = self._get_ks_file_stream(channel_label, label, relative_path) 1286 try: 1287 f.write_file(stream) 1288 break # inner for 1289 except FileCreationError: 1290 e = sys.exc_info()[1] 1291 msg = e.args[0] 1292 log2disk(-1, _("Unable to save file %s: %s") % (path, 1293 msg)) 1294 # Try again 1295 continue 1296 else: # for 1297 # Retried a number of times and it still failed; log the 1298 # file as being failed and move on 1299 log2disk(-1, _("Failed to fetch file %s") % path)
1300
1301 - def download_kickstarts(self):
1302 """Downloads all the kickstart-related information""" 1303 1304 log(1, ["", _("Downloading kickstartable trees metadata")]) 1305 1306 self._compute_unique_kickstarts() 1307 1308 stream_loader = StreamProducer( 1309 sync_handlers.get_kickstarts_handler(), 1310 self.xmlDataServer, 'getKickstartsXmlStream') 1311 1312 for channel, ktids in self._channel_kickstarts.items(): 1313 self._process_batch(channel, ktids[:], messages.kickstart_parsing, 1314 stream_loader.process) 1315 stream_loader.close() 1316 1317 missing_ks_files = self._compute_missing_ks_files() 1318 1319 log(1, ["", _("Downloading kickstartable trees files")]) 1320 sorted_channels = sorted(list(missing_ks_files.items()), key=lambda x: x[0]) # sort by channel 1321 for channel, files in sorted_channels: 1322 self._process_batch(channel, files[:], messages.kickstart_downloading, 1323 self._download_kickstarts_file, 1324 nevermorethan=1, 1325 process_function_args=[channel])
1326
1327 - def _get_ks_file_stream(self, channel, kstree_label, relative_path):
1328 if self.mountpoint: 1329 s = xmlDiskSource.KickstartFileDiskSource(self.mountpoint) 1330 s.setID(kstree_label) 1331 s.set_relative_path(relative_path) 1332 return s.load() 1333 1334 if CFG.ISS_PARENT: 1335 return self.xmlDataServer.getKickstartFile(kstree_label, relative_path) 1336 else: 1337 srv = xmlWireSource.RPCGetWireSource(self.systemid, self.sslYN, 1338 self.xml_dump_version) 1339 return srv.getKickstartFileStream(channel, kstree_label, relative_path)
1340
1341 - def _compute_missing_ks_files(self):
1342 coll = sync_handlers.KickstartableTreesCollection() 1343 1344 missing_ks_files = {} 1345 # download files for the ks trees 1346 for channel, ktids in self._channel_kickstarts.items(): 1347 missing_ks_files[channel] = missing = [] 1348 for ktid in ktids: 1349 # No timestamp for kickstartable trees 1350 kt = coll.get_item(ktid, timestamp=None) 1351 assert kt is not None 1352 kt_label = kt['label'] 1353 base_path = kt['base_path'] 1354 files = kt['files'] 1355 for f in files: 1356 relative_path = f['relative_path'] 1357 dest_path = os.path.join(base_path, relative_path) 1358 timestamp = rhnLib.timestamp(f['last_modified']) 1359 file_size = f['file_size'] 1360 errcode = self._verify_file(dest_path, 1361 timestamp, file_size, f['checksum_type'], f['checksum']) 1362 if errcode != 0: 1363 # Have to download it 1364 val = (kt_label, base_path, relative_path, 1365 timestamp, file_size) 1366 missing.append(val) 1367 return missing_ks_files
1368
1369 - def import_kickstarts(self):
1370 """Imports the kickstart-related information""" 1371 1372 missing_kickstarts = self._compute_missing_kickstarts() 1373 1374 if not missing_kickstarts: 1375 log(1, messages.kickstart_import_nothing_to_do) 1376 return 1377 1378 ks_count = len(missing_kickstarts) 1379 log(1, messages.kickstart_importing % ks_count) 1380 1381 coll = sync_handlers.KickstartableTreesCollection() 1382 batch = [] 1383 for ks, timestamp in missing_kickstarts: 1384 ksobj = coll.get_item(ks, timestamp=timestamp) 1385 assert ksobj is not None 1386 1387 if ksobj['org_id'] is not None: 1388 ksobj['org_id'] = OPTIONS.orgid or DEFAULT_ORG 1389 batch.append(ksobj) 1390 1391 _importer = sync_handlers.import_kickstarts(batch) 1392 log(1, messages.kickstart_imported % ks_count)
1393
1394 - def _compute_not_cached_errata(self):
1395 missing_errata = {} 1396 1397 # First, determine what has to be downloaded 1398 errata_collection = sync_handlers.ErrataCollection() 1399 for channel, errata in self._channel_errata.items(): 1400 missing_errata[channel] = [] 1401 if not errata: 1402 # Nothing to see here 1403 continue 1404 missing_errata[channel] = [eid for (eid, timestamp, _advisory_name) in errata 1405 if not errata_collection.has_erratum(eid, timestamp) 1406 or self.forceAllErrata] 1407 return missing_errata
1408 1409 _query_get_db_errata = rhnSQL.Statement(""" 1410 select e.id, e.advisory_name, 1411 TO_CHAR(e.last_modified, 'YYYYMMDDHH24MISS') last_modified 1412 from rhnChannelErrata ce, rhnErrata e, rhnChannel c 1413 where c.label = :channel 1414 and ce.channel_id = c.id 1415 and ce.errata_id = e.id 1416 """) 1417
1418 - def _get_db_channel_errata(self):
1419 """ 1420 Fetch the errata stored in the local satellite's database. Returned 1421 as a hash of channel to another hash of advisory names to a tuple of 1422 errata id and last modified date. 1423 """ 1424 db_channel_errata = {} 1425 relevant = self._channel_req.get_requested_channels() 1426 h = rhnSQL.prepare(self._query_get_db_errata) 1427 for channel in relevant: 1428 db_channel_errata[channel] = ce = {} 1429 h.execute(channel=channel) 1430 while 1: 1431 row = h.fetchone_dict() 1432 if not row: 1433 break 1434 advisory_name = row['advisory_name'] 1435 erratum_id = row['id'] 1436 last_modified = rhnLib.timestamp(row['last_modified']) 1437 ce[advisory_name] = (erratum_id, last_modified) 1438 return db_channel_errata
1439
1440 - def _diff_errata(self):
1441 """ Fetch the errata for this channel""" 1442 db_channel_errata = self._get_db_channel_errata() 1443 1444 relevant = self._channel_req.get_requested_channels() 1445 1446 # Now get the channel's errata 1447 channel_errata = {} 1448 for chn in relevant: 1449 db_ce = db_channel_errata[chn] 1450 timestamp = self._get_channel_timestamp(chn) 1451 1452 channel_obj = self._channel_collection.get_channel(chn, timestamp) 1453 errata_timestamps = channel_obj['errata_timestamps'] 1454 if errata_timestamps is None or self.forceAllErrata: 1455 # No unique key information, so assume we need all errata 1456 erratum_ids = channel_obj['errata'] 1457 errata = [(x, None, None) for x in erratum_ids] 1458 log(2, _("Grabbing all errata for channel %s") % chn) 1459 else: 1460 errata = [] 1461 # Check the advisory name and last modification 1462 for erratum in errata_timestamps: 1463 erratum_id = erratum['id'] 1464 last_modified = erratum['last_modified'] 1465 last_modified = rhnLib.timestamp(last_modified) 1466 advisory_name = erratum['advisory_name'] 1467 if advisory_name in db_ce: 1468 _foo, db_last_modified = db_ce[advisory_name] 1469 if last_modified == db_last_modified: 1470 # We already have this erratum 1471 continue 1472 errata.append((erratum_id, last_modified, advisory_name)) 1473 errata.sort() 1474 channel_errata[chn] = errata 1475 1476 # Uniquify the errata 1477 already_seen_errata = set() 1478 for channel, errata in channel_errata.items(): 1479 uq_errata = set(errata) - already_seen_errata 1480 self._channel_errata[channel] = sorted(uq_errata) 1481 already_seen_errata.update(uq_errata)
1482
1483 - def _diff_db_errata(self):
1484 """ Compute errata that are missing from the satellite 1485 Kind of similar to diff_errata, if we had the timestamp and advisory 1486 information available 1487 """ 1488 errata_collection = sync_handlers.ErrataCollection() 1489 self._missing_channel_errata = missing_channel_errata = {} 1490 db_channel_errata = self._get_db_channel_errata() 1491 for channel, errata in self._channel_errata.items(): 1492 ch_erratum_ids = missing_channel_errata[channel] = [] 1493 for eid, timestamp, advisory_name in errata: 1494 if timestamp is not None: 1495 # Should have been caught by diff_errata 1496 ch_erratum_ids.append((eid, timestamp, advisory_name)) 1497 continue 1498 # timestamp is None, grab the erratum from the cache 1499 erratum = errata_collection.get_erratum(eid, timestamp) 1500 timestamp = rhnLib.timestamp(erratum['last_modified']) 1501 advisory_name = erratum['advisory_name'] 1502 db_erratum = db_channel_errata[channel].get(advisory_name) 1503 if db_erratum is None or db_erratum[1] != timestamp or \ 1504 self.forceAllErrata: 1505 ch_erratum_ids.append((eid, timestamp, advisory_name))
1506
1507 - def download_errata(self):
1508 log(1, ["", _("Downloading errata data")]) 1509 if self.forceAllErrata: 1510 log(2, _("Forcing download of all errata data for requested channels.")) 1511 self._diff_errata() 1512 not_cached_errata = self._compute_not_cached_errata() 1513 stream_loader = StreamProducer( 1514 sync_handlers.get_errata_handler(), 1515 self.xmlDataServer, 'getErrataXmlStream') 1516 1517 sorted_channels = sorted(list(not_cached_errata.items()), key=lambda x: x[0]) # sort by channel 1518 for channel, erratum_ids in sorted_channels: 1519 self._process_batch(channel, erratum_ids[:], messages.erratum_parsing, 1520 stream_loader.process) 1521 stream_loader.close() 1522 # XXX This step should go away once the channel info contains the 1523 # errata timestamps and advisory names 1524 self._diff_db_errata() 1525 log(1, _("Downloading errata data complete"))
1526 1527 # __private methods__
1528 - def _processWithProgressBar(self, batch, size, 1529 process_function, 1530 prompt=_('Downloading:'), 1531 nevermorethan=None, 1532 process_function_args=()):
1533 pb = ProgressBar(prompt=prompt, endTag=_(' - complete'), 1534 finalSize=size, finalBarLength=40, stream=sys.stdout) 1535 if CFG.DEBUG > 2: 1536 pb.redrawYN = 0 1537 pb.printAll(1) 1538 1539 ss = SequenceServer(batch, nevermorethan=(nevermorethan or self._batch_size)) 1540 while not ss.doneYN(): 1541 chunk = ss.getChunk() 1542 item_count = len(chunk) 1543 process_function(chunk, *process_function_args) 1544 ss.clearChunk() 1545 pb.addTo(item_count) 1546 pb.printIncrement() 1547 pb.printComplete()
1548
1549 - def _process_batch(self, channel, batch, log_msg, 1550 process_function, 1551 prompt=_('Downloading:'), 1552 process_function_args=(), 1553 nevermorethan=None, 1554 is_slow=False):
1555 count = len(batch) 1556 if log_msg: 1557 log(1, log_msg % (channel, count or _('NONE RELEVANT'))) 1558 if not count: 1559 return 1560 if is_slow: 1561 log(1, messages.warning_slow) 1562 self._processWithProgressBar(batch, count, process_function, 1563 prompt, nevermorethan, process_function_args)
1564
1565 - def _import_packages_process(self, chunk, sources):
1566 batch = self._get_cached_package_batch(chunk, sources) 1567 # check to make sure the orgs exported are valid 1568 _validate_package_org(batch) 1569 try: 1570 sync_handlers.import_packages(batch, sources) 1571 except (SQLError, SQLSchemaError, SQLConnectError): 1572 e = sys.exc_info()[1] 1573 # an SQL error is fatal... crash and burn 1574 exitWithTraceback(e, 'Exception caught during import', 13)
1575
1576 - def import_packages(self, sources=0):
1577 if sources: 1578 log(1, ["", _("Importing source package metadata")]) 1579 missing_channel_items = self._missing_channel_src_packages 1580 else: 1581 log(1, ["", _("Importing package metadata")]) 1582 missing_channel_items = self._missing_channel_packages 1583 1584 sorted_channels = sorted(list(missing_channel_items.items()), key=lambda x: x[0]) # sort by channel 1585 for channel, packages in sorted_channels: 1586 self._process_batch(channel, packages[:], 1587 messages.package_importing, 1588 self._import_packages_process, 1589 _('Importing: '), 1590 [sources]) 1591 return self._link_channel_packages()
1592 1639 1640 @staticmethod
1641 - def _get_cached_package_batch(chunk, sources=0):
1642 """ short-circuit the most common case""" 1643 if not chunk: 1644 return [] 1645 short_package_collection = sync_handlers.ShortPackageCollection() 1646 if sources: 1647 package_collection = sync_handlers.SourcePackageCollection() 1648 else: 1649 package_collection = sync_handlers.PackageCollection() 1650 batch = [] 1651 for pid in chunk: 1652 package = package_collection.get_package(pid) 1653 if (package is None or package['last_modified'] 1654 != short_package_collection.get_package(pid) 1655 ['last_modified']): 1656 # not in the cache 1657 raise Exception(_("Package Not Found in Cache, Clear the Cache to \ 1658 Regenerate it.")) 1659 batch.append(package) 1660 return batch
1661
1662 - def import_errata(self):
1663 log(1, ["", _("Importing channel errata")]) 1664 errata_collection = sync_handlers.ErrataCollection() 1665 # sort by channel_label 1666 sorted_channels = sorted(list(self._missing_channel_errata.items()), key=lambda x: x[0]) 1667 for chn, errata in sorted_channels: 1668 log(2, _("Importing %s errata for channel %s.") % (len(errata), chn)) 1669 batch = [] 1670 for eid, timestamp, _advisory_name in errata: 1671 erratum = errata_collection.get_erratum(eid, timestamp) 1672 # bug 161144: it seems that incremental dumps can create an 1673 # errata collection None 1674 if erratum is not None: 1675 self._fix_erratum(erratum) 1676 batch.append(erratum) 1677 1678 self._process_batch(chn, batch, messages.errata_importing, 1679 sync_handlers.import_errata)
1680 1681 @staticmethod
1682 - def _fix_erratum(erratum):
1683 """ Replace the list of packages with references to short packages""" 1684 if erratum['org_id'] is not None: 1685 erratum['org_id'] = OPTIONS.orgid or DEFAULT_ORG 1686 1687 sp_coll = sync_handlers.ShortPackageCollection() 1688 pids = set(erratum['packages'] or []) 1689 # map all the pkgs objects to the erratum 1690 packages = [] 1691 # remove packages which are not in the export (e.g. archs we are not syncing) 1692 for pid in pids: 1693 if not sp_coll.has_package(pid): 1694 # Package not found, go on - may be part of a channel we don't 1695 # sync 1696 continue 1697 package = sp_coll.get_package(pid) 1698 if package['org_id'] is not None: 1699 package['org_id'] = erratum['org_id'] 1700 1701 packages.append(package) 1702 1703 erratum['packages'] = packages 1704 1705 if OPTIONS.channel: 1706 # If we are syncing only selected channels, do not link 1707 # to channels that do not have this erratum, they may not 1708 # have all related packages synced 1709 imported_channels = _getImportedChannels(withAdvisory=erratum) 1710 # Import erratum to channels that are being synced 1711 imported_channels += OPTIONS.channel 1712 else: 1713 # Associate errata to channels that are synced already 1714 imported_channels = _getImportedChannels() 1715 1716 erratum['channels'] = [c for c in erratum['channels'] 1717 if c['label'] in imported_channels] 1718 1719 # Now fix the files 1720 for errata_file in (erratum['files'] or []): 1721 errata_file_package = errata_file.get('package') 1722 errata_file_source_package = errata_file.get('source-package') 1723 if errata_file['file_type'] == 'RPM' and \ 1724 errata_file_package is not None: 1725 package = None 1726 if sp_coll.has_package(errata_file_package): 1727 package = sp_coll.get_package(errata_file_package) 1728 errata_file['pkgobj'] = package 1729 elif errata_file['file_type'] == 'SRPM' and \ 1730 errata_file_source_package is not None: 1731 # XXX misa: deal with source rpms 1732 errata_file['pkgobj'] = None
1733
1734 - def _fetch_packages(self, channel, missing_fs_packages, sources=0):
1735 short_package_collection = sync_handlers.ShortPackageCollection() 1736 if sources: 1737 # acronym = "SRPM" 1738 package_collection = sync_handlers.SourcePackageCollection() 1739 else: 1740 # acronym = "RPM" 1741 package_collection = sync_handlers.PackageCollection() 1742 1743 self._failed_fs_packages = Queue.Queue() 1744 self._extinct_packages = Queue.Queue() 1745 pkgs_total = len(missing_fs_packages) 1746 pkg_current = 0 1747 total_size = 0 1748 queue = Queue.Queue() 1749 out_queue = Queue.Queue() 1750 lock = threading.Lock() 1751 1752 # count size of missing packages 1753 for package_id, path in missing_fs_packages: 1754 package = package_collection.get_package(package_id) 1755 total_size = total_size + package['package_size'] 1756 queue.put((package_id, path)) 1757 1758 log(1, messages.package_fetch_total_size % 1759 (self._bytes_to_fuzzy(total_size))) 1760 real_processed_size = processed_size = 0 1761 real_total_size = total_size 1762 start_time = round(time.time()) 1763 1764 all_threads = [] 1765 for _thread in range(4): 1766 t = ThreadDownload(lock, queue, out_queue, short_package_collection, package_collection, 1767 self, self._failed_fs_packages, self._extinct_packages, sources, channel) 1768 t.setDaemon(True) 1769 t.start() 1770 all_threads.append(t) 1771 1772 while ([x for x in all_threads if x.isAlive()] 1773 and pkg_current < pkgs_total): 1774 try: 1775 (rpmManip, package, is_done) = out_queue.get(False, 0.1) 1776 except Queue.Empty: 1777 continue 1778 pkg_current = pkg_current + 1 1779 1780 if not is_done: # package failed to download or already exist on disk 1781 real_total_size -= package['package_size'] 1782 processed_size += package['package_size'] 1783 try: 1784 out_queue.task_done() 1785 except AttributeError: 1786 pass 1787 continue 1788 # Package successfully saved 1789 filename = os.path.basename(rpmManip.relative_path) 1790 1791 # Determine downloaded size and remaining time 1792 size = package['package_size'] 1793 real_processed_size += size 1794 processed_size += size 1795 current_time = round(time.time()) 1796 # timedalta could not be multiplicated by float 1797 remain_time = (datetime.timedelta(seconds=current_time - start_time)) * \ 1798 ((real_total_size * 10000) / real_processed_size - 10000) / 10000 1799 # cut off miliseconds 1800 remain_time = datetime.timedelta(remain_time.days, remain_time.seconds) 1801 log(1, messages.package_fetch_remain_size_time % 1802 (self._bytes_to_fuzzy(processed_size), self._bytes_to_fuzzy(total_size), remain_time)) 1803 1804 log(1, messages.package_fetch_successful % 1805 (pkg_current, pkgs_total, filename, size)) 1806 try: 1807 out_queue.task_done() 1808 except AttributeError: 1809 pass 1810 1811 extinct_count = self._extinct_packages.qsize() 1812 failed_count = self._failed_fs_packages.qsize() 1813 1814 # Printing summary 1815 log(2, messages.package_fetch_summary % channel, notimeYN=1) 1816 log(2, messages.package_fetch_summary_success % 1817 (pkgs_total - extinct_count - failed_count), notimeYN=1) 1818 log(2, messages.package_fetch_summary_failed % failed_count, 1819 notimeYN=1) 1820 log(2, messages.package_fetch_summary_extinct % extinct_count, 1821 notimeYN=1)
1822 1823 # Translate x bytes to string "x MB", "x GB" or "x kB" 1824 @staticmethod
1825 - def _bytes_to_fuzzy(b):
1826 units = ['bytes', 'kiB', 'MiB', 'GiB', 'TiB', 'PiB'] 1827 base = 1024 1828 fuzzy = b 1829 for unit in units: 1830 if fuzzy >= base: 1831 fuzzy = float(fuzzy) / base 1832 else: 1833 break 1834 int_len = len("%d" % fuzzy) 1835 fract_len = 3 - int_len 1836 # pylint: disable=W0631 1837 return "%*.*f %s" % (int_len, fract_len, fuzzy, unit)
1838
1839 - def _get_package_stream(self, channel, package_id, nvrea, sources):
1840 """ returns (filepath, stream), so in the case of a "wire source", 1841 the return value is, of course, (None, stream) 1842 """ 1843 1844 # Returns a package stream from disk 1845 if self.mountpoint: 1846 rpmFile = rpmsPath(package_id, self.mountpoint, sources) 1847 try: 1848 stream = open(rpmFile) 1849 except IOError: 1850 e = sys.exc_info()[1] 1851 if e.errno != 2: # No such file or directory 1852 raise 1853 return (rpmFile, None) 1854 1855 return (rpmFile, stream) 1856 1857 # Wire stream 1858 if CFG.ISS_PARENT: 1859 stream = self.xmlDataServer.getRpm(nvrea, channel) 1860 else: 1861 rpmServer = xmlWireSource.RPCGetWireSource(self.systemid, self.sslYN, 1862 self.xml_dump_version) 1863 stream = rpmServer.getPackageStream(channel, nvrea) 1864 1865 return (None, stream)
1866
1867 1868 -class ThreadDownload(threading.Thread):
1869
1870 - def __init__(self, lock, queue, out_queue, short_package_collection, package_collection, syncer, 1871 failed_fs_packages, extinct_packages, sources, channel):
1872 threading.Thread.__init__(self) 1873 self.queue = queue 1874 self.out_queue = out_queue 1875 self.short_package_collection = short_package_collection 1876 self.package_collection = package_collection 1877 self.syncer = syncer 1878 self.failed_fs_packages = failed_fs_packages 1879 self.extinct_packages = extinct_packages 1880 self.sources = sources 1881 self.channel = channel 1882 self.lock = lock
1883
1884 - def run(self):
1885 while not self.queue.empty(): 1886 # grabs host from queue 1887 (package_id, path) = self.queue.get() 1888 package = self.package_collection.get_package(package_id) 1889 last_modified = package['last_modified'] 1890 checksum_type = package['checksum_type'] 1891 checksum = package['checksum'] 1892 package_size = package['package_size'] 1893 if not path: 1894 nevra = get_nevra(package) 1895 orgid = None 1896 if package['org_id']: 1897 orgid = OPTIONS.orgid or DEFAULT_ORG 1898 path = self.syncer._get_rel_package_path(nevra, orgid, self.sources, 1899 checksum_type, checksum) 1900 1901 # update package path 1902 package['path'] = path 1903 self.package_collection.add_item(package) 1904 1905 errcode = self.syncer._verify_file(path, rhnLib.timestamp(last_modified), 1906 package_size, checksum_type, checksum) 1907 if errcode == 0: 1908 # file is already there 1909 # do not count this size to time estimate 1910 try: 1911 self.queue.task_done() 1912 except AttributeError: 1913 pass 1914 self.out_queue.put((None, package, False)) 1915 continue 1916 1917 cfg = config.initUp2dateConfig() 1918 rpmManip = RpmManip(package, path) 1919 nvrea = rpmManip.nvrea() 1920 1921 # Retry a number of times, we may have network errors 1922 for _try in range(cfg['networkRetries']): 1923 self.lock.acquire() 1924 try: 1925 rpmFile, stream = self.syncer._get_package_stream(self.channel, 1926 package_id, nvrea, self.sources) 1927 except: 1928 self.lock.release() 1929 raise 1930 self.lock.release() 1931 if stream is None: 1932 # Mark the package as extinct 1933 self.extinct_packages.put(package_id) 1934 log(1, messages.package_fetch_extinct % 1935 (os.path.basename(path))) 1936 break # inner for 1937 1938 try: 1939 rpmManip.write_file(stream) 1940 break # inner for 1941 except FileCreationError: 1942 e = sys.exc_info()[1] 1943 msg = e.args[0] 1944 log2disk(-1, _("Unable to save file %s: %s") % ( 1945 rpmManip.full_path, msg)) 1946 # Try again 1947 continue # inner for 1948 1949 else: # for 1950 # Ran out of iterations 1951 # Mark the package as failed and move on 1952 self.failed_fs_packages.put(package_id) 1953 log(1, messages.package_fetch_failed % 1954 (os.path.basename(path))) 1955 # Move to the next package 1956 try: 1957 self.queue.task_done() 1958 except AttributeError: 1959 pass 1960 self.out_queue.put((rpmManip, package, False)) 1961 continue 1962 1963 if stream is None: 1964 # Package is extinct. Move on 1965 try: 1966 self.queue.task_done() 1967 except AttributeError: 1968 pass 1969 self.out_queue.put((rpmManip, package, False)) 1970 continue 1971 1972 if self.syncer.mountpoint and not self.syncer.keep_rpms: 1973 # Channel dumps import; try to unlink to preserve disk space 1974 # rpmFile is always returned by _get_package_stream for 1975 # disk-based imports 1976 assert(rpmFile is not None) 1977 try: 1978 os.unlink(rpmFile) 1979 except (OSError, IOError): 1980 pass 1981 1982 # signals to queue job is done 1983 try: 1984 self.queue.task_done() 1985 except AttributeError: 1986 pass 1987 self.out_queue.put((rpmManip, package, True))
1988
1989 1990 -class StreamProducer:
1991
1992 - def __init__(self, handler, data_source_class, source_func):
1993 self.handler = handler 1994 self.is_disk_loader = data_source_class.is_disk_loader() 1995 if self.is_disk_loader: 1996 self.loader = getattr(data_source_class, source_func)() 1997 else: 1998 self.loader = getattr(data_source_class, source_func) 1999 self._args = ()
2000
2001 - def set_args(self, *args):
2002 self._args = args
2003
2004 - def close(self):
2005 self.handler.close()
2006
2007 - def process(self, batch):
2008 if self.is_disk_loader: 2009 for oid in batch: 2010 self.loader.setID(oid) 2011 stream = self.loader.load() 2012 self.handler.process(stream) 2013 else: 2014 # Only use the extra arguments if needed, for now 2015 args = self._args or (batch, ) 2016 stream = self.loader(*args) 2017 self.handler.process(stream)
2018
2019 2020 -def _verifyPkgRepMountPoint():
2021 """ Checks the base package repository directory tree for 2022 existance and permissions. 2023 2024 Creates base dir if need be, and chowns to apache.root (required 2025 for rhnpush). 2026 """ 2027 2028 if not CFG.MOUNT_POINT: 2029 # Incomplete configuration 2030 log(-1, _("ERROR: server.mount_point not set in the configuration file")) 2031 sys.exit(16) 2032 2033 if not os.path.exists(fileutils.cleanupAbsPath(CFG.MOUNT_POINT)): 2034 log(-1, _("ERROR: server.mount_point %s do not exist") 2035 % fileutils.cleanupAbsPath(CFG.MOUNT_POINT)) 2036 sys.exit(26) 2037 2038 if not os.path.exists(fileutils.cleanupAbsPath(CFG.MOUNT_POINT + '/' + CFG.PREPENDED_DIR)): 2039 log(-1, _("ERROR: path under server.mount_point (%s) do not exist") 2040 % fileutils.cleanupAbsPath(CFG.MOUNT_POINT + '/' + CFG.PREPENDED_DIR)) 2041 sys.exit(26)
2042
2043 2044 -def _validate_package_org(batch):
2045 """Validate the orgids associated with packages. 2046 If its redhat channel default to Null org 2047 If custom channel and org is specified use that. 2048 If custom and package org is not valid default to org 1 2049 """ 2050 orgid = OPTIONS.orgid or None 2051 for pkg in batch: 2052 if not pkg['org_id'] or pkg['org_id'] == 'None': 2053 # default to Null so do nothing 2054 pkg['org_id'] = None 2055 elif orgid: 2056 # if options.orgid specified use it 2057 pkg['org_id'] = orgid 2058 else: 2059 # org from server is not valid 2060 pkg['org_id'] = DEFAULT_ORG
2061
2062 2063 -def _getImportedChannels(withAdvisory=None):
2064 "Retrieves the channels already imported in the satellite's database" 2065 2066 query = "select distinct c.label from rhnChannel c" 2067 query_args={} 2068 2069 if withAdvisory: 2070 query += """ 2071 inner join rhnChannelErrata ce on c.id = ce.channel_id 2072 inner join rhnErrata e on ce.errata_id = e.id 2073 and e.advisory_name = :advisory 2074 and e.org_id = :org_id 2075 """ 2076 query_args={"advisory": withAdvisory["advisory_name"], 2077 "org_id": withAdvisory["org_id"]} 2078 2079 if not OPTIONS.include_custom_channels: 2080 query += " where c.org_id is null" 2081 2082 try: 2083 h = rhnSQL.prepare(query) 2084 h.execute(**query_args) 2085 return [x['label'] for x in h.fetchall_dict() or []] 2086 except (SQLError, SQLSchemaError, SQLConnectError): 2087 e = sys.exc_info()[1] 2088 # An SQL error is fatal... crash and burn 2089 exitWithTraceback(e, 'SQL ERROR during xml processing', 17) 2090 return []
2091
2092 2093 -def getDbIssParent():
2094 sql = "select label from rhnISSMaster where is_current_master = 'Y'" 2095 h = rhnSQL.prepare(sql) 2096 h.execute() 2097 row = h.fetchone_dict() 2098 if not row: 2099 return None 2100 return row['label']
2101
2102 2103 -def getDbCaChain(master):
2104 sql = "select ca_cert from rhnISSMaster where label = :label" 2105 h = rhnSQL.prepare(sql) 2106 h.execute(label=master) 2107 row = h.fetchone_dict() 2108 if not row: 2109 return None 2110 return row['ca_cert']
2111
2112 2113 -def processCommandline():
2114 "process the commandline, setting the OPTIONS object" 2115 2116 log2disk(-1, _("Commandline: %s") % repr(sys.argv)) 2117 optionsTable = [ 2118 Option('--batch-size', action='store', 2119 help=_('DEBUG ONLY: max. batch-size for XML/database-import processing (1..%s).' 2120 + '"man satellite-sync" for more information.') % SequenceServer.NEVER_MORE_THAN), 2121 Option('--ca-cert', action='store', 2122 help=_('alternative SSL CA Cert (fullpath to cert file)')), 2123 Option('-c', '--channel', action='append', 2124 help=_('process data for this channel only')), 2125 Option('--consider-full', action='store_true', 2126 help=_('disk dump will be considered to be a full export; ' 2127 'see "man satellite-sync" for more information.')), 2128 Option('--include-custom-channels', action='store_true', 2129 help=_('existing custom channels will also be synced (unless -c is used)')), 2130 Option('--debug-level', action='store', 2131 help=_('override debug level set in /etc/rhn/rhn.conf (which is currently set at %s).') % CFG.DEBUG), 2132 Option('--dump-version', action='store', 2133 help=_("requested version of XML dump (default: %s)") % constants.PROTOCOL_VERSION), 2134 Option('--email', action='store_true', 2135 help=_('e-mail a report of what was synced/imported')), 2136 Option('--force-all-errata', action='store_true', 2137 help=_('forcibly process all (not a diff of) errata metadata')), 2138 Option('--ignore-proxy', action='store_true', 2139 help=_('Do not use an http proxy under any circumstances.')), 2140 Option('--http-proxy', action='store', 2141 help=_('alternative http proxy (hostname:port)')), 2142 Option('--http-proxy-username', action='store', 2143 help=_('alternative http proxy username')), 2144 Option('--http-proxy-password', action='store', 2145 help=_('alternative http proxy password')), 2146 Option('--iss-parent', action='store', 2147 help=_('parent satellite to import content from')), 2148 Option('-l', '--list-channels', action='store_true', 2149 help=_('list all available channels and exit')), 2150 Option('--list-error-codes', action='store_true', 2151 help=_("help on all error codes satellite-sync returns")), 2152 Option('-m', '--mount-point', action='store', 2153 help=_('source mount point for import - disk update only')), 2154 Option('--no-errata', action='store_true', 2155 help=_('do not process errata data')), 2156 Option('--no-kickstarts', action='store_true', 2157 help=_('do not process kickstart data (provisioning only)')), 2158 Option('--no-packages', action='store_true', 2159 help=_('do not process full package metadata')), 2160 Option('--no-rpms', action='store_true', 2161 help=_('do not download, or process any RPMs')), 2162 Option('--no-ssl', action='store_true', 2163 help=_('turn off SSL (not recommended)')), 2164 Option('--orgid', action='store', 2165 help=_('org to which the sync imports data. defaults to the admin account')), 2166 Option('-p', '--print-configuration', action='store_true', 2167 help=_('print the configuration and exit')), 2168 Option('-s', '--server', action='store', 2169 help=_('alternative server with which to connect (hostname)')), 2170 Option('--step', action='store', 2171 help=_('synchronize to this step (man satellite-sync for more info)')), 2172 Option('--sync-to-temp', action='store_true', 2173 help=_('write complete data to tempfile before streaming to remainder of app')), 2174 Option('--systemid', action='store', 2175 help=_("DEBUG ONLY: alternative path to digital system id")), 2176 Option('--traceback-mail', action='store', 2177 help=_('alternative email address(es) for sync output (--email option)')), 2178 Option('--keep-rpms', action='store_true', 2179 help=_('do not remove rpms when importing from local dump')), 2180 Option('--master', action='store', 2181 help=_('the fully qualified domain name of the master Satellite. ' 2182 'Valid with --mount-point only. ' 2183 'Required if you want to import org data and channel permissions.')), 2184 ] 2185 optionParser = OptionParser(option_list=optionsTable) 2186 global OPTIONS 2187 OPTIONS, args = optionParser.parse_args() 2188 2189 # we take extra commandline arguments that are not linked to an option 2190 if args: 2191 msg = _("ERROR: these arguments make no sense in this context (try --help): %s") % repr(args) 2192 log2stderr(-1, msg, 1, 1) 2193 sys.exit(19) 2194 2195 # 2196 # process anything CFG related (db, debug, server, and print) 2197 # 2198 try: 2199 rhnSQL.initDB() 2200 rhnSQL.clear_log_id() 2201 rhnSQL.set_log_auth_login('SETUP') 2202 except (SQLError, SQLSchemaError, SQLConnectError): 2203 e = sys.exc_info()[1] 2204 # An SQL error is fatal... crash and burn 2205 log(-1, _("ERROR: Can't connect to the database: %s") % e, stream=sys.stderr) 2206 log(-1, _("ERROR: Check if your database is running."), stream=sys.stderr) 2207 sys.exit(20) 2208 2209 CFG.set("ISS_Parent", getDbIssParent()) 2210 CFG.set("TRACEBACK_MAIL", OPTIONS.traceback_mail or CFG.TRACEBACK_MAIL) 2211 CFG.set("RHN_PARENT", idn_ascii_to_puny(OPTIONS.iss_parent or OPTIONS.server or 2212 CFG.ISS_PARENT or CFG.RHN_PARENT)) 2213 if OPTIONS.server and not OPTIONS.iss_parent: 2214 # server option on comman line should override ISS parent from config 2215 CFG.set("ISS_PARENT", None) 2216 else: 2217 CFG.set("ISS_PARENT", idn_ascii_to_puny(OPTIONS.iss_parent or CFG.ISS_PARENT)) 2218 CFG.set("ISS_CA_CHAIN", OPTIONS.ca_cert or getDbCaChain(CFG.RHN_PARENT) 2219 or CFG.CA_CHAIN) 2220 2221 if not OPTIONS.ignore_proxy: 2222 CFG.set("HTTP_PROXY", idn_ascii_to_puny(OPTIONS.http_proxy or CFG.HTTP_PROXY)) 2223 CFG.set("HTTP_PROXY_USERNAME", OPTIONS.http_proxy_username or CFG.HTTP_PROXY_USERNAME) 2224 CFG.set("HTTP_PROXY_PASSWORD", OPTIONS.http_proxy_password or CFG.HTTP_PROXY_PASSWORD) 2225 CFG.set("CA_CHAIN", OPTIONS.ca_cert or CFG.CA_CHAIN) 2226 2227 CFG.set("SYNC_TO_TEMP", OPTIONS.sync_to_temp or CFG.SYNC_TO_TEMP) 2228 2229 # check the validity of the debug level 2230 if OPTIONS.debug_level: 2231 debugRange = 6 2232 try: 2233 debugLevel = int(OPTIONS.debug_level) 2234 if not (0 <= debugLevel <= debugRange): 2235 usix.raise_with_tb(RhnSyncException("exception will be caught"), sys.exc_info()[2]) 2236 except KeyboardInterrupt: 2237 e = sys.exc_info()[1] 2238 raise 2239 # pylint: disable=E0012, W0703 2240 except Exception: 2241 msg = [_("ERROR: --debug-level takes an in integer value within the range %s.") 2242 % repr(tuple(range(debugRange + 1))), 2243 _(" 0 - little logging/messaging."), 2244 _(" 1 - minimal logging/messaging."), 2245 _(" 2 - normal level of logging/messaging."), 2246 _(" 3 - lots of logging/messaging."), 2247 _(" 4+ - excessive logging/messaging.")] 2248 log(-1, msg, 1, 1, sys.stderr) 2249 sys.exit(21) 2250 else: 2251 CFG.set('DEBUG', debugLevel) 2252 initLOG(CFG.LOG_FILE, debugLevel) 2253 2254 if OPTIONS.print_configuration: 2255 CFG.show() 2256 sys.exit(0) 2257 2258 if OPTIONS.master: 2259 if not OPTIONS.mount_point: 2260 msg = _("ERROR: The --master option is only valid with the --mount-point option") 2261 log2stderr(-1, msg, cleanYN=1) 2262 sys.exit(28) 2263 elif CFG.ISS_PARENT: 2264 OPTIONS.master = CFG.ISS_PARENT 2265 2266 if OPTIONS.orgid: 2267 # verify if its a valid org 2268 orgs = [a['id'] for a in satCerts.get_all_orgs()] 2269 if int(OPTIONS.orgid) not in orgs: 2270 msg = _("ERROR: Unable to lookup Org Id %s") % OPTIONS.orgid 2271 log2stderr(-1, msg, cleanYN=1) 2272 sys.exit(27) 2273 2274 # the action dictionary used throughout 2275 actionDict = {} 2276 2277 if OPTIONS.list_channels: 2278 if OPTIONS.step: 2279 log(-1, _("WARNING: --list-channels option overrides any --step option. --step ignored.")) 2280 OPTIONS.step = 'channels' 2281 actionDict['list-channels'] = 1 2282 else: 2283 actionDict['list-channels'] = 0 2284 2285 # 2286 # validate the --step option and set up the hierarchy of sync process steps. 2287 # 2288 stepHierarchy = Runner.step_hierarchy 2289 # if no step stated... we do all steps. 2290 if not OPTIONS.step: 2291 OPTIONS.step = stepHierarchy[-1] 2292 2293 if OPTIONS.step not in stepHierarchy: 2294 log2stderr(-1, _("ERROR: '%s' is not a valid step. See 'man satellite-sync' for more detail.") 2295 % OPTIONS.step, 1, 1) 2296 sys.exit(22) 2297 2298 # XXX: --source is deferred for the time being 2299 #OPTIONS.source = OPTIONS.step in sourceSteps 2300 2301 # populate the action dictionary 2302 for step in stepHierarchy: 2303 actionDict[step] = 1 2304 if step == OPTIONS.step: 2305 break 2306 2307 # make sure *all* steps in the actionDict are handled. 2308 for step in stepHierarchy: 2309 actionDict[step] = step in actionDict 2310 2311 channels = OPTIONS.channel or [] 2312 if OPTIONS.list_channels: 2313 actionDict['channels'] = 1 2314 actionDict['arches'] = 0 2315 actionDict['channel-families'] = 1 2316 channels = [] 2317 2318 # Cleanup selected channels. 2319 # if no channels selected, the default is to "freshen", or select the 2320 # already existing channels in the local database. 2321 if not channels: 2322 channels = _getImportedChannels() 2323 2324 if not channels: 2325 if actionDict['channels'] and not actionDict['list-channels']: 2326 msg = _("ERROR: No channels currently imported; try satellite-sync --list-channels; " 2327 + "then satellite-sync -c chn0 -c chn1...") 2328 log2disk(-1, msg) 2329 log2stderr(-1, msg, cleanYN=1) 2330 sys.exit(0) 2331 2332 # add all the "other" actions specified. 2333 otherActions = {"no_rpms": 'no-rpms', 2334 #"no_srpms" : 'no-srpms', 2335 "no_packages": 'no-packages', 2336 #"no_source_packages" : 'no-source-packages', 2337 "no_errata": 'no-errata', 2338 "no_kickstarts": 'no-kickstarts', 2339 "force_all_errata": 'force-all-errata', 2340 'no_ssl': 'no-ssl'} 2341 2342 for oa in otherActions: 2343 if getattr(OPTIONS, oa): 2344 actionDict[otherActions[oa]] = 1 2345 else: 2346 actionDict[otherActions[oa]] = 0 2347 2348 if actionDict['no-kickstarts']: 2349 actionDict['kickstarts'] = 0 2350 2351 if actionDict['no-errata']: 2352 actionDict['errata'] = 0 2353 2354 # if actionDict['no-source-packages']: 2355 actionDict['source-packages'] = 0 2356 2357 if actionDict['no-packages']: 2358 actionDict['packages'] = 0 2359 actionDict['short'] = 0 2360 actionDict['download-packages'] = 0 2361 actionDict['rpms'] = 0 2362 2363 if actionDict['no-rpms']: 2364 actionDict['rpms'] = 0 2365 2366 # if actionDict['no-srpms']: 2367 actionDict['srpms'] = 0 2368 2369 if not OPTIONS.master: 2370 actionDict['orgs'] = 0 2371 2372 if OPTIONS.batch_size: 2373 try: 2374 OPTIONS.batch_size = int(OPTIONS.batch_size) 2375 if OPTIONS.batch_size not in range(1, 51): 2376 raise ValueError(_("ERROR: --batch-size must have a value within the range: 1..50")) 2377 except (ValueError, TypeError): 2378 # int(None) --> TypeError 2379 # int('a') --> ValueError 2380 usix.raise_with_tb(ValueError(_("ERROR: --batch-size must have a value within the range: 1..50")), 2381 sys.exc_info()[2]) 2382 2383 OPTIONS.mount_point = fileutils.cleanupAbsPath(OPTIONS.mount_point) 2384 OPTIONS.systemid = fileutils.cleanupAbsPath(OPTIONS.systemid) 2385 2386 if OPTIONS.mount_point: 2387 if not os.path.isdir(OPTIONS.mount_point): 2388 msg = _("ERROR: no such directory %s") % OPTIONS.mount_point 2389 log2stderr(-1, msg, cleanYN=1) 2390 sys.exit(25) 2391 2392 if OPTIONS.list_error_codes: 2393 msg = [_("Error Codes: Returned codes means:"), 2394 _(" -1 - Could not lock file or KeyboardInterrupt or SystemExit"), 2395 _(" 0 - User interrupted or intentional exit"), 2396 _(" 1 - attempting to run more than one instance of satellite-sync."), 2397 _(" 2 - Unable to find synchronization tools."), 2398 _(" 3 - a general socket exception occurred"), 2399 _(" 4 - an SSL error occurred. Recheck your SSL settings."), 2400 _(" 5 - RHN error"), 2401 _(" 6 - unhandled exception occurred"), 2402 _(" 7 - unknown sync error"), 2403 _(" 8 - ERROR: must be root to execute"), 2404 _(" 9 - rpclib fault during synchronization init"), 2405 _(" 10 - synchronization init error"), 2406 _(" 11 - Error parsing XML stream"), 2407 _(" 12 - Channel do not exist"), 2408 _(" 13 - SQL error during importing package metadata"), 2409 _(" 14 - SQL error during linking channel packages"), 2410 _(" 15 - SQL error during xml processing"), 2411 _(" 16 - server.mount_point not set in the configuration file"), 2412 _(" 17 - SQL error during retrieving the channels already imported in the satellite's database"), 2413 _(" 18 - Wrong db connection string in rhn.conf"), 2414 _(" 19 - Bad arguments"), 2415 _(" 20 - Could not connect to db."), 2416 _(" 21 - Bad debug level"), 2417 _(" 22 - Not valid step"), 2418 _(" 24 - no such file"), 2419 _(" 25 - no such directory"), 2420 _(" 26 - mount_point does not exist"), 2421 _(" 27 - No such org"), 2422 _(" 28 - error: --master is only valid with --mount-point"), ] 2423 log(-1, msg, 1, 1, sys.stderr) 2424 sys.exit(0) 2425 2426 if OPTIONS.dump_version: 2427 OPTIONS.dump_version = str(OPTIONS.dump_version) 2428 if OPTIONS.dump_version not in constants.ALLOWED_SYNC_PROTOCOL_VERSIONS: 2429 msg = _("ERROR: unknown dump version, try one of %s") % \ 2430 constants.ALLOWED_SYNC_PROTOCOL_VERSIONS 2431 log2stderr(-1, msg, cleanYN=1) 2432 sys.exit(19) 2433 2434 # return the dictionary of actions, channels 2435 return actionDict, channels
2436
2437 2438 -def formatDateTime(dtstring=None, dt=None):
2439 """ Format the date time using your locale settings. This assume that your setlocale has been alread called. """ 2440 if not dt: 2441 dt = time.strptime(dtstring, '%Y%m%d%H%M%S') 2442 return time.strftime("%c", dt)
2443 2444 2445 if __name__ == '__main__': 2446 sys.stderr.write("!!! running this directly is advisable *ONLY* for testing" 2447 " purposes !!!\n") 2448 try: 2449 sys.exit(Runner().main() or 0) 2450 except (KeyboardInterrupt, SystemExit): 2451 ex = sys.exc_info()[1] 2452 sys.exit(ex) 2453 except Exception: # pylint: disable=E0012, W0703 2454 tb = 'TRACEBACK: ' + fetchTraceback(with_locals=1) 2455 log2disk(-1, tb) 2456 log2email(-1, tb) 2457 sendMail() 2458 sys.exit(-1) 2459