1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 import datetime
22 import os
23 import sys
24 import stat
25 import time
26 import exceptions
27 import fnmatch
28 try:
29
30 import Queue
31 except ImportError:
32
33 import queue as Queue
34 import threading
35 from optparse import Option, OptionParser
36 import gettext
37 from rhn.connections import idn_ascii_to_puny, idn_puny_to_unicode
38
39 sys.path.append("/usr/share/rhn")
40 from up2date_client import config
41
42
43 from spacewalk.common import usix
44 from spacewalk.common import rhnMail, rhnLib
45 from spacewalk.common.rhnLog import initLOG
46 from spacewalk.common.rhnConfig import CFG, initCFG, PRODUCT_NAME
47 from spacewalk.common.rhnTB import exitWithTraceback, fetchTraceback
48 from spacewalk.common.checksum import getFileChecksum
49 from spacewalk.server import rhnSQL
50 from spacewalk.server.rhnSQL import SQLError, SQLSchemaError, SQLConnectError
51 from spacewalk.server.rhnLib import get_package_path
52 from spacewalk.common import fileutils
53
54
55 import xmlWireSource
56 import xmlDiskSource
57 from progress_bar import ProgressBar
58 from xmlSource import FatalParseException, ParseException
59 from diskImportLib import rpmsPath
60
61 from syncLib import log, log2, log2disk, log2stderr, log2email
62 from syncLib import RhnSyncException, RpmManip, ReprocessingNeeded
63 from syncLib import initEMAIL_LOG, dumpEMAIL_LOG
64 from syncLib import FileCreationError, FileManip
65
66 from SequenceServer import SequenceServer
67 from spacewalk.server.importlib.errataCache import schedule_errata_cache_update
68
69 from spacewalk.server.importlib.importLib import InvalidChannelFamilyError
70 from spacewalk.server.importlib.importLib import MissingParentChannelError
71 from spacewalk.server.importlib.importLib import get_nevra, get_nevra_dict
72
73 import satCerts
74 import req_channels
75 import messages
76 import sync_handlers
77 import constants
78
79 translation = gettext.translation('spacewalk-backend-server', fallback=True)
80 _ = translation.ugettext
81 initCFG('server.satellite')
82 initLOG(CFG.LOG_FILE, CFG.DEBUG)
83
84 _DEFAULT_SYSTEMID_PATH = '/etc/sysconfig/rhn/systemid'
85 DEFAULT_ORG = 1
86
87
88
89 OPTIONS = None
95 step_precedence = {
96 'packages': ['download-packages'],
97 'source-packages': ['download-source-packages'],
98 'errata': ['download-errata'],
99 'kickstarts': ['download-kickstarts'],
100 'rpms': [''],
101 'srpms': [''],
102 'channels': ['channel-families'],
103 'channel-families': [''],
104 'short': [''],
105 'download-errata': ['errata'],
106 'download-packages': [''],
107 'download-source-packages': [''],
108 'download-kickstarts': [''],
109 'arches': [''],
110 'orgs': [''],
111 }
112
113
114
115 step_hierarchy = [
116 'orgs',
117 'channel-families',
118 'arches',
119 'channels',
120 'short',
121 'download-packages',
122 'rpms',
123 'packages',
124 'srpms',
125 'download-source-packages',
126 'download-errata',
127 'download-kickstarts',
128 'source-packages',
129 'errata',
130 'kickstarts',
131 ]
132
134 self.syncer = None
135 self.packages_report = None
136 self._xml_file_dir_error_message = ''
137 self._affected_channels = None
138 self._packages_report = None
139 self._actions = None
140
141
155
156
157
159 ad = actionDict
160 for dependent in self.step_precedence[step]:
161 if dependent in ad:
162 ad[dependent] = 0
163 return ad
164
166 """Main routine: commandline processing, etc..."""
167
168
169 timeStart = time.time()
170
171 actionDict, channels = processCommandline()
172
173
174
175 for st in self.step_hierarchy:
176 actionDict = self._handle_step_dependents(actionDict, st)
177 self._actions = actionDict
178
179
180 if 'list-channels' in actionDict:
181 if actionDict['list-channels'] == 1:
182 actionDict['channels'] = 1
183 actionDict['arches'] = 0
184 actionDict['channel-families'] = 1
185 channels = []
186
187
188 _verifyPkgRepMountPoint()
189
190 if OPTIONS.email:
191 initEMAIL_LOG()
192
193
194 self.syncer = Syncer(channels, actionDict['list-channels'], actionDict['rpms'],
195 forceAllErrata=actionDict['force-all-errata'])
196 try:
197 self.syncer.initialize()
198 except (KeyboardInterrupt, SystemExit):
199 raise
200 except xmlWireSource.rpclib.xmlrpclib.Fault:
201 e = sys.exc_info()[1]
202 if CFG.ISS_PARENT:
203 if CFG.PRODUCT_NAME == 'Spacewalk':
204 log(-1, ['', messages.sw_iss_not_available % e.faultString], )
205 else:
206 log(-1, ['', messages.sat_iss_not_available % e.faultString], )
207 sys.exit(26)
208 else:
209 log(-1, ['', messages.syncer_error % e.faultString], )
210 sys.exit(9)
211
212 except Exception:
213 e = sys.exc_info()[1]
214 log(-1, ['', messages.syncer_error % e], )
215 sys.exit(10)
216
217 log(1, ' db: %s/<password>@%s' % (CFG.DB_USER, CFG.DB_NAME))
218
219 selected = [action for action in list(actionDict) if actionDict[action]]
220 log2(-1, 3, "Action list/commandline toggles: %s" % repr(selected),
221 stream=sys.stderr)
222
223 if OPTIONS.mount_point:
224 self._xml_file_dir_error_message = messages.file_dir_error % \
225 OPTIONS.mount_point
226
227 if CFG.DB_BACKEND == 'oracle':
228 import cx_Oracle
229 exception = cx_Oracle.IntegrityError
230 if CFG.DB_BACKEND == 'postgresql':
231 import psycopg2
232 exception = psycopg2.IntegrityError
233
234 for _try in range(2):
235 try:
236 for step in self.step_hierarchy:
237 if not actionDict[step]:
238 continue
239 method_name = '_step_' + step.replace('-', '_')
240 if not hasattr(self, method_name):
241 log(-1, _("No handler for step %s") % step)
242 continue
243 method = getattr(self, method_name)
244 ret = method()
245 if ret:
246 sys.exit(ret)
247 else:
248
249 break
250 except ReprocessingNeeded:
251
252
253 log(1, _('Environment changed, trying again...'))
254 continue
255 except RhnSyncException:
256 rhnSQL.rollback()
257 raise
258 except exception:
259 e = sys.exc_info()[1]
260 msg = _("ERROR: Encountered IntegrityError: \n"
261 + str(e)
262 + "\nconsider removing satellite-sync cache at /var/cache/rhn/satsync/*"
263 + " and re-run satellite-sync with same options.\n"
264 + "If this error persits after removing cache, please contact Red Hat support.")
265 log2stderr(-1, msg, cleanYN=1)
266 return 1
267 else:
268 log(1, _('Repeated failures'))
269
270 timeEnd = time.time()
271 delta_str = self._get_elapsed_time(timeEnd - timeStart)
272
273 log(1, _("""\
274 Import complete:
275 Begin time: %s
276 End time: %s
277 Elapsed: %s
278 """) % (formatDateTime(dt=time.localtime(timeStart)),
279 formatDateTime(dt=time.localtime(timeEnd)),
280 delta_str),
281 cleanYN=1)
282
283
284 sendMail()
285 return 0
286
287 @staticmethod
289 elapsed = int(elapsed)
290 hours = elapsed / 60 / 60
291 mins = elapsed / 60 - hours * 60
292 secs = elapsed - mins * 60 - hours * 60 * 60
293
294 delta_list = [[hours, _("hours")], [mins, _("minutes")], [secs, _("seconds")]]
295 delta_str = ", ".join(["%s %s" % (l[0], l[1]) for l in delta_list])
296 return delta_str
297
318
321
324
336
346
349
352
354 self._packages_report = self.syncer.download_rpms()
355 return None
356
357
358
359
362
365
368
369
370
371
377
380
389
392 """ Send email summary """
393 if forceEmail or (OPTIONS is not None and OPTIONS.email):
394 body = dumpEMAIL_LOG()
395 if body:
396 print(_("+++ sending log as an email +++"))
397 host_label = idn_puny_to_unicode(os.uname()[1])
398 headers = {
399 'Subject' : _('RHN Management Satellite sync. report from %s') % host_label,
400 }
401 sndr = "root@%s" % host_label
402 if CFG.default_mail_from:
403 sndr = CFG.default_mail_from
404 rhnMail.send(headers, body, sender=sndr)
405 else:
406 print(_("+++ email requested, but there is nothing to send +++"))
407
408 OPTIONS.email = None
409
412
413 """ high-level sychronization/import class
414 NOTE: there should *ONLY* be one instance of this.
415 """
416
417 - def __init__(self, channels, listChannelsYN, check_rpms, forceAllErrata=False):
418 """ Base initialization. Most work done in self.initialize() which
419 needs to be called soon after instantiation.
420 """
421
422 self._requested_channels = channels
423 self.mountpoint = OPTIONS.mount_point
424 self.listChannelsYN = listChannelsYN
425 self.forceAllErrata = forceAllErrata
426 self.sslYN = not OPTIONS.no_ssl
427 self._systemidPath = OPTIONS.systemid or _DEFAULT_SYSTEMID_PATH
428 self._batch_size = OPTIONS.batch_size
429 self.master_label = OPTIONS.master
430
431 self.xml_dump_version = OPTIONS.dump_version or str(constants.PROTOCOL_VERSION)
432 self.check_rpms = check_rpms
433 self.keep_rpms = OPTIONS.keep_rpms
434
435
436 self._channel_req = None
437 self._channel_collection = sync_handlers.ChannelCollection()
438
439 self.containerHandler = sync_handlers.ContainerHandler(
440 self.master_label)
441
442
443 self.xmlDataServer = None
444 self.systemid = None
445
446 self.reporegen = set()
447
448
449
450 self._channel_packages = {}
451 self._channel_packages_full = {}
452 self._avail_channel_packages = {}
453
454 self._missing_channel_packages = None
455 self._missing_fs_packages = None
456
457 self._failed_fs_packages = Queue.Queue()
458 self._extinct_packages = Queue.Queue()
459
460 self._channel_errata = {}
461 self._missing_channel_errata = {}
462
463 self._channel_source_packages = {}
464 self._channel_source_packages_full = {}
465
466 self._channel_kickstarts = {}
467 self._avail_channel_source_packages = None
468 self._missing_channel_src_packages = None
469 self._missing_fs_source_packages = None
470
472 """Initialization that requires IO, etc."""
473
474
475 if self.mountpoint:
476 log(1, [_(PRODUCT_NAME + ' - file-system synchronization'),
477 ' mp: %s' % self.mountpoint])
478 self.xmlDataServer = xmlDiskSource.MetadataDiskSource(self.mountpoint)
479
480 else:
481 self.xmlDataServer = xmlWireSource.MetadataWireSource(self.systemid,
482 self.sslYN, self.xml_dump_version)
483 if CFG.ISS_PARENT:
484 sync_parent = CFG.ISS_PARENT
485 self.systemid = 'N/A'
486 is_iss = 1
487 else:
488 log(1, _(PRODUCT_NAME + ' - live synchronization'))
489 log(-1, _("ERROR: Live content synchronizing with RHN Classic Hosted is no longer supported.\nPlease "
490 "use the cdn-sync command instead unless you are attempting to sync from another Satellite "
491 "via Inter-Satelite-Sync (ISS), or from local content on disk via Channel Dump ISOs."),
492 stream=sys.stderr)
493 sys.exit(1)
494
495 url = self.xmlDataServer.schemeAndUrl(sync_parent)
496 log(1, [_(PRODUCT_NAME + ' - live synchronization'),
497 _(' url: %s') % url,
498 _(' debug/output level: %s') % CFG.DEBUG])
499 self.xmlDataServer.setServerHandler(isIss=is_iss)
500
501 if not self.systemid:
502
503
504 if (os.path.exists(self._systemidPath)
505 and os.access(self._systemidPath, os.R_OK)):
506 self.systemid = open(self._systemidPath, 'rb').read()
507 else:
508 usix.raise_with_tb(RhnSyncException(_('ERROR: this server must be registered with RHN.')),
509 sys.exc_info()[2])
510
511 auth = xmlWireSource.AuthWireSource(self.systemid, self.sslYN,
512 self.xml_dump_version)
513 auth.checkAuth()
514
516 self.containerHandler.close()
517
519 """ Wrapper function that can process metadata that is relatively
520 simple. This does the decoding of data (over the wire or from
521 disk).
522
523 step_name is just for pretty printing the actual --step name to
524 the console.
525
526 The remote function is passed by name (as a string), to mimic the
527 lazy behaviour of the if block
528 """
529
530 log(1, ["", _("Retrieving / parsing %s data") % step_name])
531
532 stream = None
533 method = getattr(self.xmlDataServer, remote_function_name)
534 stream = method()
535
536
537 try:
538 self.containerHandler.process(stream)
539 except KeyboardInterrupt:
540 log(-1, _('*** SYSTEM INTERRUPT CALLED ***'), stream=sys.stderr)
541 raise
542 except (FatalParseException, ParseException, Exception):
543 e = sys.exc_info()[1]
544
545 self.containerHandler.clear()
546 msg = ''
547 if isinstance(e, FatalParseException):
548 msg = (_('ERROR: fatal parser exception occurred ') +
549 _('(line: %s, col: %s msg: %s)') % (
550 e.getLineNumber(), e.getColumnNumber(),
551 e._msg))
552 elif isinstance(e, ParseException):
553 msg = (_('ERROR: parser exception occurred: %s') % (e))
554 elif isinstance(e, exceptions.SystemExit):
555 log(-1, _('*** SYSTEM INTERRUPT CALLED ***'), stream=sys.stderr)
556 raise
557 else:
558 msg = _('ERROR: exception (during parse) occurred: ')
559 log2stderr(-1, _(' Encountered some errors with %s data '
560 + '(see logs (%s) for more information)') % (step_name, CFG.LOG_FILE))
561 log2(-1, 3, [_(' Encountered some errors with %s data:') % step_name,
562 _(' ------- %s PARSE/IMPORT ERROR -------') % step_name,
563 ' %s' % msg,
564 _(' ---------------------------------------')], stream=sys.stderr)
565 exitWithTraceback(e, '', 11)
566 self.containerHandler.reset()
567 log(1, _("%s data complete") % step_name)
568
572
575
577 self._process_simple("getChannelFamilyXmlStream", "channel-families")
578
579 try:
580 self._process_simple("getProductNamesXmlStream", "product names")
581 except Exception:
582 pass
583
584 - def _write_repomd(self, repomd_path, getRepomdFunc, repomdFileStreamFunc, label, timestamp):
585 full_path = os.path.join(CFG.MOUNT_POINT, repomd_path)
586 if not os.path.exists(full_path):
587 if self.mountpoint or CFG.ISS_PARENT:
588 stream = getRepomdFunc(label)
589 else:
590 stream = repomdFileStreamFunc(label)
591 f = FileManip(repomd_path, timestamp, None)
592 f.write_file(stream)
593 self.reporegen.add(label)
594
596 comps_path = 'rhn/comps/%s/comps-%s.xml' % (label, timestamp)
597 self._write_repomd(comps_path, self.xmlDataServer.getComps, \
598 xmlWireSource.RPCGetWireSource(self.systemid, self.sslYN, self.xml_dump_version).getCompsFileStream, \
599 label, timestamp)
600 data = {label: None}
601 backend.lookupChannels(data)
602 rhnSQL.Procedure('rhn_channel.set_comps')(data[label]['id'], comps_path, 1, timestamp)
603
604
605
607 modules_path = 'rhn/modules/%s/modules-%s.yaml' % (label, timestamp)
608 self._write_repomd(modules_path, self.xmlDataServer.getModules, \
609 xmlWireSource.RPCGetWireSource(self.systemid, self.sslYN, self.xml_dump_version).getModulesFilesStram, \
610 label, timestamp)
611 data = {label: None}
612 backend.lookupChannels(data)
613 rhnSQL.Procedure('rhn_channel.set_comps')(data[label]['id'], modules_path, 2, timestamp)
614
615
617 """ push channels, channel-family and dist. map information
618 as well upon parsing.
619 """
620 log(1, ["", _("Retrieving / parsing channel data")])
621
622 h = sync_handlers.get_channel_handler()
623
624
625 stream = self.xmlDataServer.getChannelXmlStream()
626 if self.mountpoint:
627 for substream in stream:
628 h.process(substream)
629 doEOSYN = 0
630 else:
631 h.process(stream)
632 doEOSYN = 1
633
634 h.close()
635
636
637
638 self._compute_channel_request()
639
640
641
642
643
644
645 self._printChannelTree(doEOSYN=doEOSYN)
646
647 if self.listChannelsYN:
648
649 return
650
651 requested_channels = self._channel_req.get_requested_channels()
652 try:
653 importer = sync_handlers.import_channels(requested_channels,
654 orgid=OPTIONS.orgid or None,
655 master=OPTIONS.master or None)
656 for label in requested_channels:
657 timestamp = self._channel_collection.get_channel_timestamp(label)
658 ch = self._channel_collection.get_channel(label, timestamp)
659 if ch.has_key('comps_last_modified') and ch['comps_last_modified'] is not None:
660 self._process_comps(importer.backend, label, sync_handlers._to_timestamp(ch['comps_last_modified']))
661 if ch.has_key('modules_last_modified') and ch['modules_last_modified'] is not None:
662 self._process_modules(importer.backend, label, \
663 sync_handlers._to_timestamp(ch['modules_last_modified']))
664
665 except InvalidChannelFamilyError:
666 usix.raise_with_tb(RhnSyncException(messages.invalid_channel_family_error %
667 ''.join(requested_channels)), sys.exc_info()[2])
668 except MissingParentChannelError:
669 raise
670
671 rhnSQL.commit()
672
673 log(1, _("Channel data complete"))
674
675 @staticmethod
699
700 - def _printChannel(self, label, channel_object, log_format, is_imported):
701 assert channel_object is not None
702 all_pkgs = channel_object['all-packages'] or channel_object['packages']
703 pkgs_count = len(all_pkgs)
704 if is_imported:
705 status = _('p')
706 else:
707 status = _('.')
708 log(1, log_format % (status, label, pkgs_count, self._formatChannelExportType(channel_object)))
709
711 "pretty prints a tree of channel information"
712
713 log(1, _(' p = previously imported/synced channel'))
714 log(1, _(' . = channel not yet imported/synced'))
715 ch_end_of_service = self._channel_req.get_end_of_service()
716 ch_typos = self._channel_req.get_typos()
717 ch_requested_imported = self._channel_req.get_requested_imported()
718 relevant = self._channel_req.get_requested_channels()
719 if doEOSYN and ch_end_of_service:
720 log(1, _(' e = channel no longer supported (end-of-service)'))
721 if doTyposYN and ch_typos:
722 log(1, _(' ? = channel label invalid --- typo?'))
723
724 pc_labels = sorted(self._channel_collection.get_parent_channel_labels())
725
726 t_format = _(' %s:')
727 p_format = _(' %s %-40s %4s %s')
728 log(1, t_format % _('base-channels'))
729
730 no_base_channel = True
731 for plabel in pc_labels:
732 if plabel not in relevant:
733 continue
734
735 no_base_channel = False
736 timestamp = self._channel_collection.get_channel_timestamp(plabel)
737 channel_object = self._channel_collection.get_channel(plabel,
738 timestamp)
739 self._printChannel(plabel, channel_object, p_format, (plabel in ch_requested_imported))
740 if no_base_channel:
741 log(1, p_format % (' ', _('NONE RELEVANT'), '', ''))
742
743
744 for plabel in pc_labels:
745 cchannels = self._channel_collection.get_child_channels(plabel)
746
747
748 chns = []
749 for clabel, ctimestamp in cchannels:
750 if clabel in relevant:
751 chns.append((clabel, ctimestamp))
752 if not chns:
753
754 continue
755 log(1, t_format % plabel)
756 for clabel, ctimestamp in sorted(chns):
757 channel_object = self._channel_collection.get_channel(clabel,
758 ctimestamp)
759 self._printChannel(clabel, channel_object, p_format, (clabel in ch_requested_imported))
760 log(2, '')
761
762 if doEOSYN and ch_end_of_service:
763 log(1, t_format % _('end-of-service'))
764 status = _('e')
765 for chn in ch_end_of_service:
766 log(1, p_format % (status, chn, '', ''))
767 log(2, '')
768
769 if doTyposYN and ch_typos:
770 log(1, _(' typos:'))
771 status = _('?')
772 for chn in ch_typos:
773 log(1, p_format % (status, chn, '', ''))
774 log(2, '')
775 log(1, '')
776
778 """ channels request is verify and categorized.
779
780 NOTE: self.channel_req *will be* initialized by this method
781 """
782
783
784 importedChannels = _getImportedChannels()
785 availableChannels = self._channel_collection.get_channel_labels()
786 log(6, _('XXX: imported channels: %s') % importedChannels, 1)
787 log(6, _('XXX: cached channels: %s') % availableChannels, 1)
788
789
790 if self.listChannelsYN:
791 requested_channels = availableChannels
792 log(6, _('XXX: list channels called'), 1)
793 else:
794 requested_channels = set()
795 for channel in self._requested_channels:
796 match_channels = set(fnmatch.filter(importedChannels, channel))
797 match_channels = match_channels.union(fnmatch.filter(availableChannels, channel))
798 if not match_channels:
799 match_channels = [channel,]
800 requested_channels = requested_channels.union(match_channels)
801
802 rc = req_channels.RequestedChannels(list(requested_channels))
803 rc.set_available(availableChannels)
804 rc.set_imported(importedChannels)
805
806 rc.compute()
807
808 typos = rc.get_typos()
809 if typos:
810 log(-1, _("ERROR: these channels either do not exist or "
811 "are not available:"))
812 for chn in typos:
813 log(-1, " %s" % chn)
814 log(-1, _(" (to see a list of channel labels: %s --list-channels)") % sys.argv[0])
815 sys.exit(12)
816 self._channel_req = rc
817 return rc
818
826
828 """ process package metadata for one channel at a time """
829 relevant = sorted(self._channel_req.get_requested_channels())
830 self._channel_packages = {}
831 self._channel_packages_full = {}
832 self._avail_channel_packages = {}
833 already_seen_ids = set()
834 for chn in relevant:
835 timestamp = self._get_channel_timestamp(chn)
836
837 channel_obj = self._channel_collection.get_channel(chn, timestamp)
838 avail_package_ids = sorted(set(channel_obj['packages'] or []))
839 package_full_ids = sorted(set(channel_obj['all-packages'] or [])) or avail_package_ids
840 package_ids = sorted(set(package_full_ids) - already_seen_ids)
841
842 self._channel_packages[chn] = package_ids
843 self._channel_packages_full[chn] = package_full_ids
844 self._avail_channel_packages[chn] = avail_package_ids
845 already_seen_ids.update(package_ids)
846
848 log(1, ["", "Retrieving short package metadata (used for indexing)"])
849
850
851 self._compute_unique_packages()
852
853 stream_loader = StreamProducer(
854 sync_handlers.get_short_package_handler(),
855 self.xmlDataServer, 'getChannelShortPackagesXmlStream')
856
857 sorted_channels = sorted(list(self._channel_packages.items()), key=lambda x: x[0])
858 for channel_label, package_ids in sorted_channels:
859 log(1, _(" Retrieving / parsing short package metadata: %s (%s)") %
860 (channel_label, len(package_ids)))
861
862 if package_ids:
863 lm = self._channel_collection.get_channel_timestamp(channel_label)
864 channel_last_modified = int(rhnLib.timestamp(lm))
865
866 stream_loader.set_args(channel_label, channel_last_modified)
867 stream_loader.process(package_ids)
868
869 stream_loader.close()
870
871 self._diff_packages()
872
873 _query_compare_packages = """
874 select p.id, c.checksum_type, c.checksum, p.path, p.package_size,
875 TO_CHAR(p.last_modified, 'YYYYMMDDHH24MISS') last_modified
876 from rhnPackage p, rhnChecksumView c
877 where p.name_id = lookup_package_name(:name)
878 and p.evr_id = lookup_evr(:epoch, :version, :release)
879 and p.package_arch_id = lookup_package_arch(:arch)
880 and (p.org_id = :org_id or
881 (p.org_id is null and :org_id is null))
882 and p.checksum_id = c.id
883 """
884
912
913
915 self._missing_channel_packages = {}
916 self._missing_fs_packages = {}
917
918 sorted_channels = sorted(list(self._channel_packages.items()), key=lambda x: x[0])
919 for channel_label, upids in sorted_channels:
920 log(1, _("Diffing package metadata (what's missing locally?): %s") %
921 channel_label)
922 self._missing_channel_packages[channel_label] = []
923 self._missing_fs_packages[channel_label] = []
924 self._process_batch(channel_label, upids[:], None,
925 self._diff_packages_process,
926 _('Diffing: '),
927 [channel_label])
928
929 self._verify_missing_channel_packages(self._missing_channel_packages)
930
932 """Verify if all the missing packages are actually available somehow.
933 In an incremental approach, one may request packages that are actually
934 not available in the current dump, probably because of applying an
935 incremental to the wrong base"""
936 for channel_label, pids in missing_channel_packages.items():
937 if sources:
938 avail_pids = [x[0] for x in self._avail_channel_source_packages[channel_label]]
939 else:
940 avail_pids = self._avail_channel_packages[channel_label]
941
942 if set(pids or []) > set(avail_pids or []):
943 raise RhnSyncException(_('ERROR: incremental dump skipped'))
944
945 @staticmethod
949
950 @staticmethod
951 - def _verify_file(path, mtime, size, checksum_type, checksum):
952 """
953 Verifies if the file is on the filesystem and matches the mtime and checksum.
954 Computing the checksum is costly, that's why we rely on mtime comparisons.
955 Returns errcode:
956 0 - file is ok, it has either the specified mtime and size
957 or checksum matches (then function sets mtime)
958 1 - file does not exist at all
959 2 - file has a different checksum
960 """
961 if not path:
962 return 1
963 abs_path = os.path.join(CFG.MOUNT_POINT, path)
964 try:
965 stat_info = os.stat(abs_path)
966 except OSError:
967
968 return 1
969
970 l_mtime = stat_info[stat.ST_MTIME]
971 l_size = stat_info[stat.ST_SIZE]
972 if l_mtime == mtime and l_size == size:
973
974 return 0
975
976
977 l_checksum = getFileChecksum(checksum_type, filename=abs_path)
978 if l_checksum != checksum:
979 return 2
980
981
982 os.utime(abs_path, (mtime, mtime))
983 return 0
984
985 - def _process_package(self, package_id, package, l_timestamp, row,
986 m_channel_packages, m_fs_packages, check_rpms=1):
987 path = None
988 channel_package = None
989 fs_package = None
990 if row:
991
992 checksum_type = row['checksum_type']
993 if checksum_type in package['checksums']:
994 checksum = package['checksums'][row['checksum_type']]
995 package_size = package['package_size']
996
997 db_timestamp = int(rhnLib.timestamp(row['last_modified']))
998 db_checksum = row['checksum']
999 db_package_size = row['package_size']
1000 db_path = row['path']
1001
1002 if not (l_timestamp <= db_timestamp and
1003 checksum == db_checksum and
1004 package_size == db_package_size):
1005
1006 channel_package = package_id
1007
1008 if check_rpms:
1009 if db_path:
1010
1011 errcode = self._verify_file(db_path, l_timestamp,
1012 package_size, checksum_type, checksum)
1013 if errcode:
1014
1015 fs_package = package_id
1016 channel_package = package_id
1017 path = db_path
1018 else:
1019
1020 channel_package = package_id
1021 fs_package = package_id
1022 else:
1023
1024 channel_package = package_id
1025 fs_package = package_id
1026
1027 if channel_package:
1028 m_channel_packages.append(channel_package)
1029 if fs_package:
1030 m_fs_packages.append((fs_package, path))
1031 return
1032
1034 log(1, ["", _("Downloading rpm packages")])
1035
1036 sorted_channels = sorted(list(self._missing_fs_packages.items()), key=lambda x: x[0])
1037 for channel, missing_fs_packages in sorted_channels:
1038 missing_packages_count = len(missing_fs_packages)
1039 log(1, _(" Fetching any missing RPMs: %s (%s)") %
1040 (channel, missing_packages_count or _('NONE MISSING')))
1041 if missing_packages_count == 0:
1042 continue
1043
1044
1045
1046 self._fetch_packages(channel, missing_fs_packages)
1047 continue
1048
1049 log(1, _("Processing rpm packages complete"))
1050
1073
1096
1098 self._compute_unique_source_packages()
1099 self._diff_source_packages()
1100 log(1, ["", _("Downloading srpm packages")])
1101
1102
1103 sorted_channels = sorted(list(self._missing_fs_source_packages.items()), key=lambda x: x[0])
1104 for channel, missing_fs_source_packages in sorted_channels:
1105 missing_source_packages_count = len(missing_fs_source_packages)
1106 log(1, _(" Fetching any missing SRPMs: %s (%s)") %
1107 (channel, missing_source_packages_count or _('NONE MISSING')))
1108 if missing_source_packages_count == 0:
1109 continue
1110
1111
1112
1113 self._fetch_packages(channel, missing_fs_source_packages, sources=1)
1114 continue
1115
1116 log(1, "Processing srpm packages complete")
1117
1119 """ process package metadata for one channel at a time"""
1120 relevant = self._channel_req.get_requested_channels()
1121 self._channel_source_packages = {}
1122 self._channel_source_packages_full = {}
1123 self._avail_channel_source_packages = {}
1124
1125 already_seen_ids = set()
1126 for chn in relevant:
1127 timestamp = self._get_channel_timestamp(chn)
1128
1129 channel_obj = self._channel_collection.get_channel(chn, timestamp)
1130 sps = set(channel_obj['source_packages'])
1131 if not sps:
1132
1133 continue
1134 ret_sps = []
1135 for sp in sps:
1136 if isinstance(sp, usix.StringType):
1137
1138 ret_sps.append((sp, None))
1139 else:
1140 ret_sps.append((sp['id'], sp['last_modified']))
1141 del sps
1142 ret_sps.sort()
1143 self._channel_source_packages[chn] = sorted(set(ret_sps) - already_seen_ids)
1144 self._channel_source_packages_full[chn] = ret_sps
1145 self._avail_channel_source_packages[chn] = ret_sps
1146 already_seen_ids.update(ret_sps)
1147
1161
1162 _query_compare_source_packages = """
1163 select ps.id, c.checksum_type, c.checksum, ps.path, ps.package_size,
1164 TO_CHAR(ps.last_modified, 'YYYYMMDDHH24MISS') last_modified
1165 from rhnPackageSource ps, rhnChecksumView c
1166 where ps.source_rpm_id = lookup_source_name(:package_id)
1167 and (ps.org_id = :org_id or
1168 (ps.org_id is null and :org_id is null))
1169 and ps.checksum_id = c.id
1170 and c.checksum = :checksum
1171 and c.checksum_type = :checksum_type
1172 """
1173
1197
1198
1200 self._missing_channel_src_packages = {}
1201 self._missing_fs_source_packages = {}
1202 for channel_label, upids in self._channel_source_packages.items():
1203 log(1, _("Diffing source package metadata (what's missing locally?): %s") % channel_label)
1204 self._missing_channel_src_packages[channel_label] = []
1205 self._missing_fs_source_packages[channel_label] = []
1206 self._process_batch(channel_label, upids[:], None,
1207 self._diff_source_packages_process,
1208 _('Diffing: '),
1209 [channel_label])
1210
1211 self._verify_missing_channel_packages(self._missing_channel_src_packages, sources=1)
1212
1235
1237 """ process package metadata for one channel at a time"""
1238 relevant = self._channel_req.get_requested_channels()
1239 self._channel_kickstarts = {}
1240 already_seen_kickstarts = set()
1241 for chn in relevant:
1242 timestamp = self._get_channel_timestamp(chn)
1243
1244 channel_obj = self._channel_collection.get_channel(chn, timestamp)
1245 self._channel_kickstarts[chn] = \
1246 sorted(set(channel_obj['kickstartable_trees'])
1247 - already_seen_kickstarts)
1248 already_seen_kickstarts.update(self._channel_kickstarts[chn])
1249
1251 """ process package metadata for one channel at a time"""
1252 relevant = self._channel_req.get_requested_channels()
1253 coll = sync_handlers.KickstartableTreesCollection()
1254 missing_kickstarts = {}
1255 for chn in relevant:
1256 timestamp = self._get_channel_timestamp(chn)
1257
1258 channel_obj = self._channel_collection.get_channel(chn, timestamp)
1259 kickstart_trees = channel_obj['kickstartable_trees']
1260
1261 for ktid in kickstart_trees:
1262
1263 kt = coll.get_item(ktid, timestamp=None)
1264 assert kt is not None
1265 kt_label = kt['label']
1266
1267
1268
1269
1270 missing_kickstarts[kt_label] = None
1271
1272 ret = list(missing_kickstarts.items())
1273 ret.sort()
1274 return ret
1275
1277 cfg = config.initUp2dateConfig()
1278 assert len(chunk) == 1
1279 item = chunk[0]
1280 label, base_path, relative_path, timestamp, file_size = item
1281 path = os.path.join(base_path, relative_path)
1282 f = FileManip(path, timestamp=timestamp, file_size=file_size)
1283
1284 for _try in range(cfg['networkRetries']):
1285 stream = self._get_ks_file_stream(channel_label, label, relative_path)
1286 try:
1287 f.write_file(stream)
1288 break
1289 except FileCreationError:
1290 e = sys.exc_info()[1]
1291 msg = e.args[0]
1292 log2disk(-1, _("Unable to save file %s: %s") % (path,
1293 msg))
1294
1295 continue
1296 else:
1297
1298
1299 log2disk(-1, _("Failed to fetch file %s") % path)
1300
1302 """Downloads all the kickstart-related information"""
1303
1304 log(1, ["", _("Downloading kickstartable trees metadata")])
1305
1306 self._compute_unique_kickstarts()
1307
1308 stream_loader = StreamProducer(
1309 sync_handlers.get_kickstarts_handler(),
1310 self.xmlDataServer, 'getKickstartsXmlStream')
1311
1312 for channel, ktids in self._channel_kickstarts.items():
1313 self._process_batch(channel, ktids[:], messages.kickstart_parsing,
1314 stream_loader.process)
1315 stream_loader.close()
1316
1317 missing_ks_files = self._compute_missing_ks_files()
1318
1319 log(1, ["", _("Downloading kickstartable trees files")])
1320 sorted_channels = sorted(list(missing_ks_files.items()), key=lambda x: x[0])
1321 for channel, files in sorted_channels:
1322 self._process_batch(channel, files[:], messages.kickstart_downloading,
1323 self._download_kickstarts_file,
1324 nevermorethan=1,
1325 process_function_args=[channel])
1326
1340
1342 coll = sync_handlers.KickstartableTreesCollection()
1343
1344 missing_ks_files = {}
1345
1346 for channel, ktids in self._channel_kickstarts.items():
1347 missing_ks_files[channel] = missing = []
1348 for ktid in ktids:
1349
1350 kt = coll.get_item(ktid, timestamp=None)
1351 assert kt is not None
1352 kt_label = kt['label']
1353 base_path = kt['base_path']
1354 files = kt['files']
1355 for f in files:
1356 relative_path = f['relative_path']
1357 dest_path = os.path.join(base_path, relative_path)
1358 timestamp = rhnLib.timestamp(f['last_modified'])
1359 file_size = f['file_size']
1360 errcode = self._verify_file(dest_path,
1361 timestamp, file_size, f['checksum_type'], f['checksum'])
1362 if errcode != 0:
1363
1364 val = (kt_label, base_path, relative_path,
1365 timestamp, file_size)
1366 missing.append(val)
1367 return missing_ks_files
1368
1393
1408
1409 _query_get_db_errata = rhnSQL.Statement("""
1410 select e.id, e.advisory_name,
1411 TO_CHAR(e.last_modified, 'YYYYMMDDHH24MISS') last_modified
1412 from rhnChannelErrata ce, rhnErrata e, rhnChannel c
1413 where c.label = :channel
1414 and ce.channel_id = c.id
1415 and ce.errata_id = e.id
1416 """)
1417
1419 """
1420 Fetch the errata stored in the local satellite's database. Returned
1421 as a hash of channel to another hash of advisory names to a tuple of
1422 errata id and last modified date.
1423 """
1424 db_channel_errata = {}
1425 relevant = self._channel_req.get_requested_channels()
1426 h = rhnSQL.prepare(self._query_get_db_errata)
1427 for channel in relevant:
1428 db_channel_errata[channel] = ce = {}
1429 h.execute(channel=channel)
1430 while 1:
1431 row = h.fetchone_dict()
1432 if not row:
1433 break
1434 advisory_name = row['advisory_name']
1435 erratum_id = row['id']
1436 last_modified = rhnLib.timestamp(row['last_modified'])
1437 ce[advisory_name] = (erratum_id, last_modified)
1438 return db_channel_errata
1439
1441 """ Fetch the errata for this channel"""
1442 db_channel_errata = self._get_db_channel_errata()
1443
1444 relevant = self._channel_req.get_requested_channels()
1445
1446
1447 channel_errata = {}
1448 for chn in relevant:
1449 db_ce = db_channel_errata[chn]
1450 timestamp = self._get_channel_timestamp(chn)
1451
1452 channel_obj = self._channel_collection.get_channel(chn, timestamp)
1453 errata_timestamps = channel_obj['errata_timestamps']
1454 if errata_timestamps is None or self.forceAllErrata:
1455
1456 erratum_ids = channel_obj['errata']
1457 errata = [(x, None, None) for x in erratum_ids]
1458 log(2, _("Grabbing all errata for channel %s") % chn)
1459 else:
1460 errata = []
1461
1462 for erratum in errata_timestamps:
1463 erratum_id = erratum['id']
1464 last_modified = erratum['last_modified']
1465 last_modified = rhnLib.timestamp(last_modified)
1466 advisory_name = erratum['advisory_name']
1467 if advisory_name in db_ce:
1468 _foo, db_last_modified = db_ce[advisory_name]
1469 if last_modified == db_last_modified:
1470
1471 continue
1472 errata.append((erratum_id, last_modified, advisory_name))
1473 errata.sort()
1474 channel_errata[chn] = errata
1475
1476
1477 already_seen_errata = set()
1478 for channel, errata in channel_errata.items():
1479 uq_errata = set(errata) - already_seen_errata
1480 self._channel_errata[channel] = sorted(uq_errata)
1481 already_seen_errata.update(uq_errata)
1482
1484 """ Compute errata that are missing from the satellite
1485 Kind of similar to diff_errata, if we had the timestamp and advisory
1486 information available
1487 """
1488 errata_collection = sync_handlers.ErrataCollection()
1489 self._missing_channel_errata = missing_channel_errata = {}
1490 db_channel_errata = self._get_db_channel_errata()
1491 for channel, errata in self._channel_errata.items():
1492 ch_erratum_ids = missing_channel_errata[channel] = []
1493 for eid, timestamp, advisory_name in errata:
1494 if timestamp is not None:
1495
1496 ch_erratum_ids.append((eid, timestamp, advisory_name))
1497 continue
1498
1499 erratum = errata_collection.get_erratum(eid, timestamp)
1500 timestamp = rhnLib.timestamp(erratum['last_modified'])
1501 advisory_name = erratum['advisory_name']
1502 db_erratum = db_channel_errata[channel].get(advisory_name)
1503 if db_erratum is None or db_erratum[1] != timestamp or \
1504 self.forceAllErrata:
1505 ch_erratum_ids.append((eid, timestamp, advisory_name))
1506
1508 log(1, ["", _("Downloading errata data")])
1509 if self.forceAllErrata:
1510 log(2, _("Forcing download of all errata data for requested channels."))
1511 self._diff_errata()
1512 not_cached_errata = self._compute_not_cached_errata()
1513 stream_loader = StreamProducer(
1514 sync_handlers.get_errata_handler(),
1515 self.xmlDataServer, 'getErrataXmlStream')
1516
1517 sorted_channels = sorted(list(not_cached_errata.items()), key=lambda x: x[0])
1518 for channel, erratum_ids in sorted_channels:
1519 self._process_batch(channel, erratum_ids[:], messages.erratum_parsing,
1520 stream_loader.process)
1521 stream_loader.close()
1522
1523
1524 self._diff_db_errata()
1525 log(1, _("Downloading errata data complete"))
1526
1527
1528 - def _processWithProgressBar(self, batch, size,
1529 process_function,
1530 prompt=_('Downloading:'),
1531 nevermorethan=None,
1532 process_function_args=()):
1533 pb = ProgressBar(prompt=prompt, endTag=_(' - complete'),
1534 finalSize=size, finalBarLength=40, stream=sys.stdout)
1535 if CFG.DEBUG > 2:
1536 pb.redrawYN = 0
1537 pb.printAll(1)
1538
1539 ss = SequenceServer(batch, nevermorethan=(nevermorethan or self._batch_size))
1540 while not ss.doneYN():
1541 chunk = ss.getChunk()
1542 item_count = len(chunk)
1543 process_function(chunk, *process_function_args)
1544 ss.clearChunk()
1545 pb.addTo(item_count)
1546 pb.printIncrement()
1547 pb.printComplete()
1548
1549 - def _process_batch(self, channel, batch, log_msg,
1550 process_function,
1551 prompt=_('Downloading:'),
1552 process_function_args=(),
1553 nevermorethan=None,
1554 is_slow=False):
1555 count = len(batch)
1556 if log_msg:
1557 log(1, log_msg % (channel, count or _('NONE RELEVANT')))
1558 if not count:
1559 return
1560 if is_slow:
1561 log(1, messages.warning_slow)
1562 self._processWithProgressBar(batch, count, process_function,
1563 prompt, nevermorethan, process_function_args)
1564
1575
1577 if sources:
1578 log(1, ["", _("Importing source package metadata")])
1579 missing_channel_items = self._missing_channel_src_packages
1580 else:
1581 log(1, ["", _("Importing package metadata")])
1582 missing_channel_items = self._missing_channel_packages
1583
1584 sorted_channels = sorted(list(missing_channel_items.items()), key=lambda x: x[0])
1585 for channel, packages in sorted_channels:
1586 self._process_batch(channel, packages[:],
1587 messages.package_importing,
1588 self._import_packages_process,
1589 _('Importing: '),
1590 [sources])
1591 return self._link_channel_packages()
1592
1639
1640 @staticmethod
1661
1663 log(1, ["", _("Importing channel errata")])
1664 errata_collection = sync_handlers.ErrataCollection()
1665
1666 sorted_channels = sorted(list(self._missing_channel_errata.items()), key=lambda x: x[0])
1667 for chn, errata in sorted_channels:
1668 log(2, _("Importing %s errata for channel %s.") % (len(errata), chn))
1669 batch = []
1670 for eid, timestamp, _advisory_name in errata:
1671 erratum = errata_collection.get_erratum(eid, timestamp)
1672
1673
1674 if erratum is not None:
1675 self._fix_erratum(erratum)
1676 batch.append(erratum)
1677
1678 self._process_batch(chn, batch, messages.errata_importing,
1679 sync_handlers.import_errata)
1680
1681 @staticmethod
1683 """ Replace the list of packages with references to short packages"""
1684 if erratum['org_id'] is not None:
1685 erratum['org_id'] = OPTIONS.orgid or DEFAULT_ORG
1686
1687 sp_coll = sync_handlers.ShortPackageCollection()
1688 pids = set(erratum['packages'] or [])
1689
1690 packages = []
1691
1692 for pid in pids:
1693 if not sp_coll.has_package(pid):
1694
1695
1696 continue
1697 package = sp_coll.get_package(pid)
1698 if package['org_id'] is not None:
1699 package['org_id'] = erratum['org_id']
1700
1701 packages.append(package)
1702
1703 erratum['packages'] = packages
1704
1705 if OPTIONS.channel:
1706
1707
1708
1709 imported_channels = _getImportedChannels(withAdvisory=erratum)
1710
1711 imported_channels += OPTIONS.channel
1712 else:
1713
1714 imported_channels = _getImportedChannels()
1715
1716 erratum['channels'] = [c for c in erratum['channels']
1717 if c['label'] in imported_channels]
1718
1719
1720 for errata_file in (erratum['files'] or []):
1721 errata_file_package = errata_file.get('package')
1722 errata_file_source_package = errata_file.get('source-package')
1723 if errata_file['file_type'] == 'RPM' and \
1724 errata_file_package is not None:
1725 package = None
1726 if sp_coll.has_package(errata_file_package):
1727 package = sp_coll.get_package(errata_file_package)
1728 errata_file['pkgobj'] = package
1729 elif errata_file['file_type'] == 'SRPM' and \
1730 errata_file_source_package is not None:
1731
1732 errata_file['pkgobj'] = None
1733
1735 short_package_collection = sync_handlers.ShortPackageCollection()
1736 if sources:
1737
1738 package_collection = sync_handlers.SourcePackageCollection()
1739 else:
1740
1741 package_collection = sync_handlers.PackageCollection()
1742
1743 self._failed_fs_packages = Queue.Queue()
1744 self._extinct_packages = Queue.Queue()
1745 pkgs_total = len(missing_fs_packages)
1746 pkg_current = 0
1747 total_size = 0
1748 queue = Queue.Queue()
1749 out_queue = Queue.Queue()
1750 lock = threading.Lock()
1751
1752
1753 for package_id, path in missing_fs_packages:
1754 package = package_collection.get_package(package_id)
1755 total_size = total_size + package['package_size']
1756 queue.put((package_id, path))
1757
1758 log(1, messages.package_fetch_total_size %
1759 (self._bytes_to_fuzzy(total_size)))
1760 real_processed_size = processed_size = 0
1761 real_total_size = total_size
1762 start_time = round(time.time())
1763
1764 all_threads = []
1765 for _thread in range(4):
1766 t = ThreadDownload(lock, queue, out_queue, short_package_collection, package_collection,
1767 self, self._failed_fs_packages, self._extinct_packages, sources, channel)
1768 t.setDaemon(True)
1769 t.start()
1770 all_threads.append(t)
1771
1772 while ([x for x in all_threads if x.isAlive()]
1773 and pkg_current < pkgs_total):
1774 try:
1775 (rpmManip, package, is_done) = out_queue.get(False, 0.1)
1776 except Queue.Empty:
1777 continue
1778 pkg_current = pkg_current + 1
1779
1780 if not is_done:
1781 real_total_size -= package['package_size']
1782 processed_size += package['package_size']
1783 try:
1784 out_queue.task_done()
1785 except AttributeError:
1786 pass
1787 continue
1788
1789 filename = os.path.basename(rpmManip.relative_path)
1790
1791
1792 size = package['package_size']
1793 real_processed_size += size
1794 processed_size += size
1795 current_time = round(time.time())
1796
1797 remain_time = (datetime.timedelta(seconds=current_time - start_time)) * \
1798 ((real_total_size * 10000) / real_processed_size - 10000) / 10000
1799
1800 remain_time = datetime.timedelta(remain_time.days, remain_time.seconds)
1801 log(1, messages.package_fetch_remain_size_time %
1802 (self._bytes_to_fuzzy(processed_size), self._bytes_to_fuzzy(total_size), remain_time))
1803
1804 log(1, messages.package_fetch_successful %
1805 (pkg_current, pkgs_total, filename, size))
1806 try:
1807 out_queue.task_done()
1808 except AttributeError:
1809 pass
1810
1811 extinct_count = self._extinct_packages.qsize()
1812 failed_count = self._failed_fs_packages.qsize()
1813
1814
1815 log(2, messages.package_fetch_summary % channel, notimeYN=1)
1816 log(2, messages.package_fetch_summary_success %
1817 (pkgs_total - extinct_count - failed_count), notimeYN=1)
1818 log(2, messages.package_fetch_summary_failed % failed_count,
1819 notimeYN=1)
1820 log(2, messages.package_fetch_summary_extinct % extinct_count,
1821 notimeYN=1)
1822
1823
1824 @staticmethod
1826 units = ['bytes', 'kiB', 'MiB', 'GiB', 'TiB', 'PiB']
1827 base = 1024
1828 fuzzy = b
1829 for unit in units:
1830 if fuzzy >= base:
1831 fuzzy = float(fuzzy) / base
1832 else:
1833 break
1834 int_len = len("%d" % fuzzy)
1835 fract_len = 3 - int_len
1836
1837 return "%*.*f %s" % (int_len, fract_len, fuzzy, unit)
1838
1840 """ returns (filepath, stream), so in the case of a "wire source",
1841 the return value is, of course, (None, stream)
1842 """
1843
1844
1845 if self.mountpoint:
1846 rpmFile = rpmsPath(package_id, self.mountpoint, sources)
1847 try:
1848 stream = open(rpmFile)
1849 except IOError:
1850 e = sys.exc_info()[1]
1851 if e.errno != 2:
1852 raise
1853 return (rpmFile, None)
1854
1855 return (rpmFile, stream)
1856
1857
1858 if CFG.ISS_PARENT:
1859 stream = self.xmlDataServer.getRpm(nvrea, channel)
1860 else:
1861 rpmServer = xmlWireSource.RPCGetWireSource(self.systemid, self.sslYN,
1862 self.xml_dump_version)
1863 stream = rpmServer.getPackageStream(channel, nvrea)
1864
1865 return (None, stream)
1866
1869
1870 - def __init__(self, lock, queue, out_queue, short_package_collection, package_collection, syncer,
1871 failed_fs_packages, extinct_packages, sources, channel):
1872 threading.Thread.__init__(self)
1873 self.queue = queue
1874 self.out_queue = out_queue
1875 self.short_package_collection = short_package_collection
1876 self.package_collection = package_collection
1877 self.syncer = syncer
1878 self.failed_fs_packages = failed_fs_packages
1879 self.extinct_packages = extinct_packages
1880 self.sources = sources
1881 self.channel = channel
1882 self.lock = lock
1883
1885 while not self.queue.empty():
1886
1887 (package_id, path) = self.queue.get()
1888 package = self.package_collection.get_package(package_id)
1889 last_modified = package['last_modified']
1890 checksum_type = package['checksum_type']
1891 checksum = package['checksum']
1892 package_size = package['package_size']
1893 if not path:
1894 nevra = get_nevra(package)
1895 orgid = None
1896 if package['org_id']:
1897 orgid = OPTIONS.orgid or DEFAULT_ORG
1898 path = self.syncer._get_rel_package_path(nevra, orgid, self.sources,
1899 checksum_type, checksum)
1900
1901
1902 package['path'] = path
1903 self.package_collection.add_item(package)
1904
1905 errcode = self.syncer._verify_file(path, rhnLib.timestamp(last_modified),
1906 package_size, checksum_type, checksum)
1907 if errcode == 0:
1908
1909
1910 try:
1911 self.queue.task_done()
1912 except AttributeError:
1913 pass
1914 self.out_queue.put((None, package, False))
1915 continue
1916
1917 cfg = config.initUp2dateConfig()
1918 rpmManip = RpmManip(package, path)
1919 nvrea = rpmManip.nvrea()
1920
1921
1922 for _try in range(cfg['networkRetries']):
1923 self.lock.acquire()
1924 try:
1925 rpmFile, stream = self.syncer._get_package_stream(self.channel,
1926 package_id, nvrea, self.sources)
1927 except:
1928 self.lock.release()
1929 raise
1930 self.lock.release()
1931 if stream is None:
1932
1933 self.extinct_packages.put(package_id)
1934 log(1, messages.package_fetch_extinct %
1935 (os.path.basename(path)))
1936 break
1937
1938 try:
1939 rpmManip.write_file(stream)
1940 break
1941 except FileCreationError:
1942 e = sys.exc_info()[1]
1943 msg = e.args[0]
1944 log2disk(-1, _("Unable to save file %s: %s") % (
1945 rpmManip.full_path, msg))
1946
1947 continue
1948
1949 else:
1950
1951
1952 self.failed_fs_packages.put(package_id)
1953 log(1, messages.package_fetch_failed %
1954 (os.path.basename(path)))
1955
1956 try:
1957 self.queue.task_done()
1958 except AttributeError:
1959 pass
1960 self.out_queue.put((rpmManip, package, False))
1961 continue
1962
1963 if stream is None:
1964
1965 try:
1966 self.queue.task_done()
1967 except AttributeError:
1968 pass
1969 self.out_queue.put((rpmManip, package, False))
1970 continue
1971
1972 if self.syncer.mountpoint and not self.syncer.keep_rpms:
1973
1974
1975
1976 assert(rpmFile is not None)
1977 try:
1978 os.unlink(rpmFile)
1979 except (OSError, IOError):
1980 pass
1981
1982
1983 try:
1984 self.queue.task_done()
1985 except AttributeError:
1986 pass
1987 self.out_queue.put((rpmManip, package, True))
1988
1991
1992 - def __init__(self, handler, data_source_class, source_func):
2000
2003
2006
2008 if self.is_disk_loader:
2009 for oid in batch:
2010 self.loader.setID(oid)
2011 stream = self.loader.load()
2012 self.handler.process(stream)
2013 else:
2014
2015 args = self._args or (batch, )
2016 stream = self.loader(*args)
2017 self.handler.process(stream)
2018
2042
2045 """Validate the orgids associated with packages.
2046 If its redhat channel default to Null org
2047 If custom channel and org is specified use that.
2048 If custom and package org is not valid default to org 1
2049 """
2050 orgid = OPTIONS.orgid or None
2051 for pkg in batch:
2052 if not pkg['org_id'] or pkg['org_id'] == 'None':
2053
2054 pkg['org_id'] = None
2055 elif orgid:
2056
2057 pkg['org_id'] = orgid
2058 else:
2059
2060 pkg['org_id'] = DEFAULT_ORG
2061
2064 "Retrieves the channels already imported in the satellite's database"
2065
2066 query = "select distinct c.label from rhnChannel c"
2067 query_args={}
2068
2069 if withAdvisory:
2070 query += """
2071 inner join rhnChannelErrata ce on c.id = ce.channel_id
2072 inner join rhnErrata e on ce.errata_id = e.id
2073 and e.advisory_name = :advisory
2074 and e.org_id = :org_id
2075 """
2076 query_args={"advisory": withAdvisory["advisory_name"],
2077 "org_id": withAdvisory["org_id"]}
2078
2079 if not OPTIONS.include_custom_channels:
2080 query += " where c.org_id is null"
2081
2082 try:
2083 h = rhnSQL.prepare(query)
2084 h.execute(**query_args)
2085 return [x['label'] for x in h.fetchall_dict() or []]
2086 except (SQLError, SQLSchemaError, SQLConnectError):
2087 e = sys.exc_info()[1]
2088
2089 exitWithTraceback(e, 'SQL ERROR during xml processing', 17)
2090 return []
2091
2094 sql = "select label from rhnISSMaster where is_current_master = 'Y'"
2095 h = rhnSQL.prepare(sql)
2096 h.execute()
2097 row = h.fetchone_dict()
2098 if not row:
2099 return None
2100 return row['label']
2101
2104 sql = "select ca_cert from rhnISSMaster where label = :label"
2105 h = rhnSQL.prepare(sql)
2106 h.execute(label=master)
2107 row = h.fetchone_dict()
2108 if not row:
2109 return None
2110 return row['ca_cert']
2111
2114 "process the commandline, setting the OPTIONS object"
2115
2116 log2disk(-1, _("Commandline: %s") % repr(sys.argv))
2117 optionsTable = [
2118 Option('--batch-size', action='store',
2119 help=_('DEBUG ONLY: max. batch-size for XML/database-import processing (1..%s).'
2120 + '"man satellite-sync" for more information.') % SequenceServer.NEVER_MORE_THAN),
2121 Option('--ca-cert', action='store',
2122 help=_('alternative SSL CA Cert (fullpath to cert file)')),
2123 Option('-c', '--channel', action='append',
2124 help=_('process data for this channel only')),
2125 Option('--consider-full', action='store_true',
2126 help=_('disk dump will be considered to be a full export; '
2127 'see "man satellite-sync" for more information.')),
2128 Option('--include-custom-channels', action='store_true',
2129 help=_('existing custom channels will also be synced (unless -c is used)')),
2130 Option('--debug-level', action='store',
2131 help=_('override debug level set in /etc/rhn/rhn.conf (which is currently set at %s).') % CFG.DEBUG),
2132 Option('--dump-version', action='store',
2133 help=_("requested version of XML dump (default: %s)") % constants.PROTOCOL_VERSION),
2134 Option('--email', action='store_true',
2135 help=_('e-mail a report of what was synced/imported')),
2136 Option('--force-all-errata', action='store_true',
2137 help=_('forcibly process all (not a diff of) errata metadata')),
2138 Option('--ignore-proxy', action='store_true',
2139 help=_('Do not use an http proxy under any circumstances.')),
2140 Option('--http-proxy', action='store',
2141 help=_('alternative http proxy (hostname:port)')),
2142 Option('--http-proxy-username', action='store',
2143 help=_('alternative http proxy username')),
2144 Option('--http-proxy-password', action='store',
2145 help=_('alternative http proxy password')),
2146 Option('--iss-parent', action='store',
2147 help=_('parent satellite to import content from')),
2148 Option('-l', '--list-channels', action='store_true',
2149 help=_('list all available channels and exit')),
2150 Option('--list-error-codes', action='store_true',
2151 help=_("help on all error codes satellite-sync returns")),
2152 Option('-m', '--mount-point', action='store',
2153 help=_('source mount point for import - disk update only')),
2154 Option('--no-errata', action='store_true',
2155 help=_('do not process errata data')),
2156 Option('--no-kickstarts', action='store_true',
2157 help=_('do not process kickstart data (provisioning only)')),
2158 Option('--no-packages', action='store_true',
2159 help=_('do not process full package metadata')),
2160 Option('--no-rpms', action='store_true',
2161 help=_('do not download, or process any RPMs')),
2162 Option('--no-ssl', action='store_true',
2163 help=_('turn off SSL (not recommended)')),
2164 Option('--orgid', action='store',
2165 help=_('org to which the sync imports data. defaults to the admin account')),
2166 Option('-p', '--print-configuration', action='store_true',
2167 help=_('print the configuration and exit')),
2168 Option('-s', '--server', action='store',
2169 help=_('alternative server with which to connect (hostname)')),
2170 Option('--step', action='store',
2171 help=_('synchronize to this step (man satellite-sync for more info)')),
2172 Option('--sync-to-temp', action='store_true',
2173 help=_('write complete data to tempfile before streaming to remainder of app')),
2174 Option('--systemid', action='store',
2175 help=_("DEBUG ONLY: alternative path to digital system id")),
2176 Option('--traceback-mail', action='store',
2177 help=_('alternative email address(es) for sync output (--email option)')),
2178 Option('--keep-rpms', action='store_true',
2179 help=_('do not remove rpms when importing from local dump')),
2180 Option('--master', action='store',
2181 help=_('the fully qualified domain name of the master Satellite. '
2182 'Valid with --mount-point only. '
2183 'Required if you want to import org data and channel permissions.')),
2184 ]
2185 optionParser = OptionParser(option_list=optionsTable)
2186 global OPTIONS
2187 OPTIONS, args = optionParser.parse_args()
2188
2189
2190 if args:
2191 msg = _("ERROR: these arguments make no sense in this context (try --help): %s") % repr(args)
2192 log2stderr(-1, msg, 1, 1)
2193 sys.exit(19)
2194
2195
2196
2197
2198 try:
2199 rhnSQL.initDB()
2200 rhnSQL.clear_log_id()
2201 rhnSQL.set_log_auth_login('SETUP')
2202 except (SQLError, SQLSchemaError, SQLConnectError):
2203 e = sys.exc_info()[1]
2204
2205 log(-1, _("ERROR: Can't connect to the database: %s") % e, stream=sys.stderr)
2206 log(-1, _("ERROR: Check if your database is running."), stream=sys.stderr)
2207 sys.exit(20)
2208
2209 CFG.set("ISS_Parent", getDbIssParent())
2210 CFG.set("TRACEBACK_MAIL", OPTIONS.traceback_mail or CFG.TRACEBACK_MAIL)
2211 CFG.set("RHN_PARENT", idn_ascii_to_puny(OPTIONS.iss_parent or OPTIONS.server or
2212 CFG.ISS_PARENT or CFG.RHN_PARENT))
2213 if OPTIONS.server and not OPTIONS.iss_parent:
2214
2215 CFG.set("ISS_PARENT", None)
2216 else:
2217 CFG.set("ISS_PARENT", idn_ascii_to_puny(OPTIONS.iss_parent or CFG.ISS_PARENT))
2218 CFG.set("ISS_CA_CHAIN", OPTIONS.ca_cert or getDbCaChain(CFG.RHN_PARENT)
2219 or CFG.CA_CHAIN)
2220
2221 if not OPTIONS.ignore_proxy:
2222 CFG.set("HTTP_PROXY", idn_ascii_to_puny(OPTIONS.http_proxy or CFG.HTTP_PROXY))
2223 CFG.set("HTTP_PROXY_USERNAME", OPTIONS.http_proxy_username or CFG.HTTP_PROXY_USERNAME)
2224 CFG.set("HTTP_PROXY_PASSWORD", OPTIONS.http_proxy_password or CFG.HTTP_PROXY_PASSWORD)
2225 CFG.set("CA_CHAIN", OPTIONS.ca_cert or CFG.CA_CHAIN)
2226
2227 CFG.set("SYNC_TO_TEMP", OPTIONS.sync_to_temp or CFG.SYNC_TO_TEMP)
2228
2229
2230 if OPTIONS.debug_level:
2231 debugRange = 6
2232 try:
2233 debugLevel = int(OPTIONS.debug_level)
2234 if not (0 <= debugLevel <= debugRange):
2235 usix.raise_with_tb(RhnSyncException("exception will be caught"), sys.exc_info()[2])
2236 except KeyboardInterrupt:
2237 e = sys.exc_info()[1]
2238 raise
2239
2240 except Exception:
2241 msg = [_("ERROR: --debug-level takes an in integer value within the range %s.")
2242 % repr(tuple(range(debugRange + 1))),
2243 _(" 0 - little logging/messaging."),
2244 _(" 1 - minimal logging/messaging."),
2245 _(" 2 - normal level of logging/messaging."),
2246 _(" 3 - lots of logging/messaging."),
2247 _(" 4+ - excessive logging/messaging.")]
2248 log(-1, msg, 1, 1, sys.stderr)
2249 sys.exit(21)
2250 else:
2251 CFG.set('DEBUG', debugLevel)
2252 initLOG(CFG.LOG_FILE, debugLevel)
2253
2254 if OPTIONS.print_configuration:
2255 CFG.show()
2256 sys.exit(0)
2257
2258 if OPTIONS.master:
2259 if not OPTIONS.mount_point:
2260 msg = _("ERROR: The --master option is only valid with the --mount-point option")
2261 log2stderr(-1, msg, cleanYN=1)
2262 sys.exit(28)
2263 elif CFG.ISS_PARENT:
2264 OPTIONS.master = CFG.ISS_PARENT
2265
2266 if OPTIONS.orgid:
2267
2268 orgs = [a['id'] for a in satCerts.get_all_orgs()]
2269 if int(OPTIONS.orgid) not in orgs:
2270 msg = _("ERROR: Unable to lookup Org Id %s") % OPTIONS.orgid
2271 log2stderr(-1, msg, cleanYN=1)
2272 sys.exit(27)
2273
2274
2275 actionDict = {}
2276
2277 if OPTIONS.list_channels:
2278 if OPTIONS.step:
2279 log(-1, _("WARNING: --list-channels option overrides any --step option. --step ignored."))
2280 OPTIONS.step = 'channels'
2281 actionDict['list-channels'] = 1
2282 else:
2283 actionDict['list-channels'] = 0
2284
2285
2286
2287
2288 stepHierarchy = Runner.step_hierarchy
2289
2290 if not OPTIONS.step:
2291 OPTIONS.step = stepHierarchy[-1]
2292
2293 if OPTIONS.step not in stepHierarchy:
2294 log2stderr(-1, _("ERROR: '%s' is not a valid step. See 'man satellite-sync' for more detail.")
2295 % OPTIONS.step, 1, 1)
2296 sys.exit(22)
2297
2298
2299
2300
2301
2302 for step in stepHierarchy:
2303 actionDict[step] = 1
2304 if step == OPTIONS.step:
2305 break
2306
2307
2308 for step in stepHierarchy:
2309 actionDict[step] = step in actionDict
2310
2311 channels = OPTIONS.channel or []
2312 if OPTIONS.list_channels:
2313 actionDict['channels'] = 1
2314 actionDict['arches'] = 0
2315 actionDict['channel-families'] = 1
2316 channels = []
2317
2318
2319
2320
2321 if not channels:
2322 channels = _getImportedChannels()
2323
2324 if not channels:
2325 if actionDict['channels'] and not actionDict['list-channels']:
2326 msg = _("ERROR: No channels currently imported; try satellite-sync --list-channels; "
2327 + "then satellite-sync -c chn0 -c chn1...")
2328 log2disk(-1, msg)
2329 log2stderr(-1, msg, cleanYN=1)
2330 sys.exit(0)
2331
2332
2333 otherActions = {"no_rpms": 'no-rpms',
2334
2335 "no_packages": 'no-packages',
2336
2337 "no_errata": 'no-errata',
2338 "no_kickstarts": 'no-kickstarts',
2339 "force_all_errata": 'force-all-errata',
2340 'no_ssl': 'no-ssl'}
2341
2342 for oa in otherActions:
2343 if getattr(OPTIONS, oa):
2344 actionDict[otherActions[oa]] = 1
2345 else:
2346 actionDict[otherActions[oa]] = 0
2347
2348 if actionDict['no-kickstarts']:
2349 actionDict['kickstarts'] = 0
2350
2351 if actionDict['no-errata']:
2352 actionDict['errata'] = 0
2353
2354
2355 actionDict['source-packages'] = 0
2356
2357 if actionDict['no-packages']:
2358 actionDict['packages'] = 0
2359 actionDict['short'] = 0
2360 actionDict['download-packages'] = 0
2361 actionDict['rpms'] = 0
2362
2363 if actionDict['no-rpms']:
2364 actionDict['rpms'] = 0
2365
2366
2367 actionDict['srpms'] = 0
2368
2369 if not OPTIONS.master:
2370 actionDict['orgs'] = 0
2371
2372 if OPTIONS.batch_size:
2373 try:
2374 OPTIONS.batch_size = int(OPTIONS.batch_size)
2375 if OPTIONS.batch_size not in range(1, 51):
2376 raise ValueError(_("ERROR: --batch-size must have a value within the range: 1..50"))
2377 except (ValueError, TypeError):
2378
2379
2380 usix.raise_with_tb(ValueError(_("ERROR: --batch-size must have a value within the range: 1..50")),
2381 sys.exc_info()[2])
2382
2383 OPTIONS.mount_point = fileutils.cleanupAbsPath(OPTIONS.mount_point)
2384 OPTIONS.systemid = fileutils.cleanupAbsPath(OPTIONS.systemid)
2385
2386 if OPTIONS.mount_point:
2387 if not os.path.isdir(OPTIONS.mount_point):
2388 msg = _("ERROR: no such directory %s") % OPTIONS.mount_point
2389 log2stderr(-1, msg, cleanYN=1)
2390 sys.exit(25)
2391
2392 if OPTIONS.list_error_codes:
2393 msg = [_("Error Codes: Returned codes means:"),
2394 _(" -1 - Could not lock file or KeyboardInterrupt or SystemExit"),
2395 _(" 0 - User interrupted or intentional exit"),
2396 _(" 1 - attempting to run more than one instance of satellite-sync."),
2397 _(" 2 - Unable to find synchronization tools."),
2398 _(" 3 - a general socket exception occurred"),
2399 _(" 4 - an SSL error occurred. Recheck your SSL settings."),
2400 _(" 5 - RHN error"),
2401 _(" 6 - unhandled exception occurred"),
2402 _(" 7 - unknown sync error"),
2403 _(" 8 - ERROR: must be root to execute"),
2404 _(" 9 - rpclib fault during synchronization init"),
2405 _(" 10 - synchronization init error"),
2406 _(" 11 - Error parsing XML stream"),
2407 _(" 12 - Channel do not exist"),
2408 _(" 13 - SQL error during importing package metadata"),
2409 _(" 14 - SQL error during linking channel packages"),
2410 _(" 15 - SQL error during xml processing"),
2411 _(" 16 - server.mount_point not set in the configuration file"),
2412 _(" 17 - SQL error during retrieving the channels already imported in the satellite's database"),
2413 _(" 18 - Wrong db connection string in rhn.conf"),
2414 _(" 19 - Bad arguments"),
2415 _(" 20 - Could not connect to db."),
2416 _(" 21 - Bad debug level"),
2417 _(" 22 - Not valid step"),
2418 _(" 24 - no such file"),
2419 _(" 25 - no such directory"),
2420 _(" 26 - mount_point does not exist"),
2421 _(" 27 - No such org"),
2422 _(" 28 - error: --master is only valid with --mount-point"), ]
2423 log(-1, msg, 1, 1, sys.stderr)
2424 sys.exit(0)
2425
2426 if OPTIONS.dump_version:
2427 OPTIONS.dump_version = str(OPTIONS.dump_version)
2428 if OPTIONS.dump_version not in constants.ALLOWED_SYNC_PROTOCOL_VERSIONS:
2429 msg = _("ERROR: unknown dump version, try one of %s") % \
2430 constants.ALLOWED_SYNC_PROTOCOL_VERSIONS
2431 log2stderr(-1, msg, cleanYN=1)
2432 sys.exit(19)
2433
2434
2435 return actionDict, channels
2436
2443
2444
2445 if __name__ == '__main__':
2446 sys.stderr.write("!!! running this directly is advisable *ONLY* for testing"
2447 " purposes !!!\n")
2448 try:
2449 sys.exit(Runner().main() or 0)
2450 except (KeyboardInterrupt, SystemExit):
2451 ex = sys.exc_info()[1]
2452 sys.exit(ex)
2453 except Exception:
2454 tb = 'TRACEBACK: ' + fetchTraceback(with_locals=1)
2455 log2disk(-1, tb)
2456 log2email(-1, tb)
2457 sendMail()
2458 sys.exit(-1)
2459