1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import time
17 import gzip
18 import sys
19 import tempfile
20 try:
21
22 from cStringIO import StringIO
23 except ImportError:
24
25 from io import StringIO
26
27 from spacewalk.common.usix import raise_with_tb, ListType
28 from spacewalk.common import rhnCache, rhnLib, rhnFlags
29 from spacewalk.common.rhnLog import log_debug, log_error
30 from spacewalk.common.rhnConfig import CFG
31 from spacewalk.common.rhnException import rhnFault
32 from spacewalk.server import rhnSQL
33 from spacewalk.satellite_tools import constants
34 from spacewalk.satellite_tools.exporter import exportLib, xmlWriter
35 from string_buffer import StringBuffer
39
41 self.compress_level = 5
42 self.llimit = None
43 self.ulimit = None
44 self._channel_family_query = """
45 select cf.id channel_family_id, to_number(null, null) quantity
46 from rhnChannelFamily cf
47 """
48 self.channel_ids = []
49 self.channel_ids_for_families = []
50 self.exportable_orgs = 'null'
51 self._raw_stream = None
52
53 - def send(self, data):
56
60
62 query = """
63 select cf.*, scf.quantity max_members
64 from rhnChannelFamily cf,
65 (%s
66 ) scf
67 where scf.channel_family_id = cf.id
68 """ % self._channel_family_query
69 return rhnSQL.prepare(query)
70
71 @staticmethod
73 query = """
74 select wc.id, wc.name
75 from web_customer wc
76 where wc.id in (%s)
77 """ % org_ids
78 return rhnSQL.prepare(query)
79
80 @staticmethod
82
83 args = {
84 'ch_ids': cids
85 }
86
87 query = """
88 select distinct cf.*, to_number(null, null) max_members
89 from rhnchannelfamily cf, rhnchannelfamilymembers cfm
90 where cf.id = cfm.channel_family_id and cfm.channel_id in ( %(ch_ids)s )
91 """
92 return rhnSQL.prepare(query % args)
93
94 @staticmethod
96 if cids:
97 query = """
98 select distinct pn.label, pn.name
99 from rhnchannel c, rhnproductname pn
100 where c.product_name_id = pn.id and c.id in ( %s )
101 """ % cids
102 else:
103 query = """
104 select label, name from rhnproductname
105 """
106 return rhnSQL.prepare(query)
107
109 query = """
110 select c.id channel_id, c.label,
111 ct.label as checksum_type,
112 TO_CHAR(c.last_modified, 'YYYYMMDDHH24MISS') last_modified
113 from rhnChannel c left outer join rhnChecksumType ct on c.checksum_type_id = ct.id,
114 rhnChannelFamilyMembers cfm,
115 (%s
116 ) scf
117 where scf.channel_family_id = cfm.channel_family_id
118 and cfm.channel_id = c.id
119 """ % self._channel_family_query
120 return rhnSQL.prepare(query)
121
123 query = """
124 select p.id package_id,
125 TO_CHAR(p.last_modified, 'YYYYMMDDHH24MISS') last_modified
126 from rhnChannelPackage cp, rhnPackage p,
127 rhnChannelFamilyMembers cfm,
128 (%s
129 ) scf
130 where scf.channel_family_id = cfm.channel_family_id
131 and cfm.channel_id = cp.channel_id
132 and cp.package_id = :package_id
133 and p.id = :package_id
134 """ % self._channel_family_query
135 return rhnSQL.prepare(query)
136
138 query = """
139 select ps.id package_id,
140 TO_CHAR(ps.last_modified, 'YYYYMMDDHH24MISS') last_modified
141 from rhnChannelPackage cp, rhnPackage p, rhnPackageSource ps,
142 rhnChannelFamilyMembers cfm,
143 (%s
144 ) scf
145 where scf.channel_family_id = cfm.channel_family_id
146 and cfm.channel_id = cp.channel_id
147 and cp.package_id = p.id
148 and p.source_rpm_id = ps.source_rpm_id
149 and ((p.org_id is null and ps.org_id is null) or
150 p.org_id = ps.org_id)
151 and ps.id = :package_id
152 """ % self._channel_family_query
153 return rhnSQL.prepare(query)
154
156 query = """
157 select e.id errata_id,
158 TO_CHAR(e.last_modified, 'YYYYMMDDHH24MISS') last_modified
159 from rhnChannelErrata ce, rhnErrata e,
160 rhnChannelFamilyMembers cfm,
161 (%s
162 ) scf
163 where scf.channel_family_id = cfm.channel_family_id
164 and cfm.channel_id = ce.channel_id
165 and ce.errata_id = :errata_id
166 and e.id = :errata_id
167 """ % self._channel_family_query
168 return rhnSQL.prepare(query)
169
172
179
180
185
187 log_debug(2)
188 writer = self._get_xml_writer()
189 dumper = SatelliteDumper(
190 writer,
191 exportLib.ChannelArchesDumper(writer, rpm_arch_type_only=rpm_arch_type_only),
192 exportLib.PackageArchesDumper(writer, rpm_arch_type_only=rpm_arch_type_only),
193 exportLib.ServerArchesDumper(writer, rpm_arch_type_only=rpm_arch_type_only),
194 exportLib.CPUArchesDumper(writer),
195 exportLib.ServerPackageArchCompatDumper(writer, rpm_arch_type_only=rpm_arch_type_only),
196 exportLib.ServerChannelArchCompatDumper(writer, rpm_arch_type_only=rpm_arch_type_only),
197 exportLib.ChannelPackageArchCompatDumper(writer, rpm_arch_type_only=rpm_arch_type_only))
198 dumper.dump()
199 writer.flush()
200 log_debug(4, "OK")
201 self.close()
202 return 0
203
214
221
233
235 if not org_list:
236 self.exportable_orgs = 'null'
237 elif isinstance(org_list, type('')):
238 self.exportable_orgs = org_list
239 else:
240 self.exportable_orgs = ', '.join([str(x) for x in org_list])
241
250
251 - def dump_channels(self, channel_labels=None, start_date=None, end_date=None, use_rhn_date=True, whole_errata=False):
259
261 """to be overwritten in subclass"""
262 pass
263
264 - def dump_channel_packages_short(self, channel_label, last_modified, filepath=None,
265 validate_channels=False, send_headers=False,
266 open_stream=True):
267 log_debug(2, channel_label)
268 if validate_channels:
269 channels = self._validate_channels(channel_labels=[channel_label])
270 channel_obj = channels[channel_label]
271 else:
272 channels = channel_label
273 channel_obj = channels
274 db_last_modified = int(rhnLib.timestamp(channel_obj['last_modified']))
275 last_modified = int(rhnLib.timestamp(last_modified))
276 log_debug(3, "last modified", last_modified, "db last modified",
277 db_last_modified)
278 if last_modified != db_last_modified:
279 raise rhnFault(3013, "The requested channel version does not match"
280 " the upstream version", explain=0)
281 channel_id = channel_obj['channel_id']
282 if filepath:
283 key = filepath
284 else:
285 key = "xml-channel-packages/rhn-channel-%d.data" % channel_id
286
287 val = rhnCache.get(key, compressed=0, raw=1, modified=last_modified)
288 if val is None:
289
290 log_debug(4, "Cache MISS for %s (%s)" % (channel_label,
291 channel_id))
292 stream = self._cache_channel_packages_short(channel_id, key,
293 last_modified)
294 else:
295 log_debug(4, "Cache HIT for %s (%s)" % (channel_label,
296 channel_id))
297 temp_stream = tempfile.TemporaryFile()
298 temp_stream.write(val)
299 temp_stream.flush()
300 stream = self._normalize_compressed_stream(temp_stream)
301
302
303
304
305 buffer_size = 16384
306
307
308 if send_headers:
309 self._send_headers(init_compressed_stream=0)
310 if open_stream:
311 self._raw_stream = open(key, "w")
312 while 1:
313 buff = stream.read(buffer_size)
314 if not buff:
315 break
316 try:
317 self._raw_stream.write(buff)
318 except IOError:
319 log_error("Client disconnected prematurely")
320 self.close()
321 raise_with_tb(ClosedConnectionError, sys.exc_info()[2])
322
323 if open_stream:
324 self._raw_stream.close()
325 return 0
326
327 _query_get_channel_packages = rhnSQL.Statement("""
328 select cp.package_id,
329 TO_CHAR(p.last_modified, 'YYYYMMDDHH24MISS') last_modified
330 from rhnChannelPackage cp,
331 rhnPackage p
332 where cp.channel_id = :channel_id
333 and cp.package_id = p.id
334 """)
335
337 """ Caches the short package entries for channel_id """
338
339 temp_stream = tempfile.TemporaryFile()
340
341 compress_level = 5
342 stream = gzip.GzipFile(None, "wb", compress_level, temp_stream)
343 writer = xmlWriter.XMLWriter(stream=stream)
344
345
346 h = rhnSQL.prepare(self._query_get_channel_packages)
347 h.execute(channel_id=channel_id)
348 package_ids = h.fetchall_dict() or []
349
350 package_ids.sort(lambda a, b: cmp(a['package_id'], b['package_id']))
351
352 dumper = SatelliteDumper(writer,
353 ShortPackagesDumper(writer, package_ids))
354 dumper.dump()
355 writer.flush()
356
357 stream.close()
358 del stream
359 temp_stream.seek(0, 0)
360
361
362 rhnCache.set(key, temp_stream.read(), modified=last_modified,
363 compressed=0, raw=1)
364 return self._normalize_compressed_stream(temp_stream)
365
367 """ Given a compressed stream, will either return the stream, or will
368 decompress it and return it, depending on the compression level
369 self.compress_level
370 """
371 stream.seek(0, 0)
372 if self.compress_level:
373
374 return stream
375
376 return gzip.GzipFile(None, "rb", 0, stream)
377
382
387
392
393 @staticmethod
403
404 - def _packages(self, packages, prefix, dump_class, sources=0,
405 verify_packages=False):
432
434 log_debug(2)
435
436 errata_hash = {}
437 if verify_errata:
438 h = self.get_errata_statement()
439 for erratum in errata:
440 errata_id = self._get_item_id('rhn-erratum-', str(erratum),
441 3004, "Wrong erratum name %s")
442 if errata_id in errata_hash:
443
444 continue
445 h.execute(errata_id=errata_id)
446 row = h.fetchone_dict()
447 if not row:
448
449 raise rhnFault(3005, "No such erratum %s" % erratum)
450
451 errata_hash[errata_id] = row
452 else:
453 for erratum in errata:
454 errata_hash[erratum['errata_id']] = erratum
455
456 self._write_dump(ErrataDumper, params=list(errata_hash.values()))
457 return 0
458
468
470 log_debug(4)
471
472 if channel_labels:
473 if not isinstance(channel_labels, ListType):
474 raise rhnFault(3000,
475 "Expected list of channels, got %s" % type(channel_labels))
476
477 h = self.get_channels_statement()
478 h.execute()
479
480 all_channels_hash = {}
481 while 1:
482 row = h.fetchone_dict()
483 if not row:
484 break
485 all_channels_hash[row['label']] = row
486
487
488 iss_slave_sha256_capable = (float(rhnFlags.get('X-RHN-Satellite-XML-Dump-Version'))
489 >= constants.SHA256_SUPPORTED_VERSION)
490
491 if not channel_labels:
492 channels = all_channels_hash
493 else:
494 channels = {}
495 for label in channel_labels:
496 if label not in all_channels_hash:
497 raise rhnFault(3001, "Could not retrieve channel %s" %
498 label)
499 if not (iss_slave_sha256_capable
500 or all_channels_hash[label]['checksum_type'] in [None, 'sha1']):
501 raise rhnFault(3001,
502 ("Channel %s has incompatible rpm checksum (%s). Please contact\n"
503 + "Red Hat support for information about upgrade to newer version\n"
504 + "of Satellite Server which supports it.") %
505 (label, all_channels_hash[label]['checksum_type']))
506 channels[label] = all_channels_hash[label]
507
508 return channels
509
510 _query_validate_kickstarts = rhnSQL.Statement("""
511 select kt.label kickstart_label,
512 TO_CHAR(kt.modified, 'YYYYMMDDHH24MISS') last_modified
513 from rhnKickstartableTree kt
514 where kt.channel_id = :channel_id
515 and kt.org_id is null
516 """)
517
554
564
567
568 """ A query iterator successively applies the list of params as execute() to the
569 statement that was passed in, and presents the union of the result sets as a
570 single result set.
571 Params is a list of dictionaries that would fill the named bound variables
572 from the statement.
573 """
574
576 self._statement = statement
577 self._params = params
578
579 self._params_pos = -1
580 self._result_set_exhausted = 1
581
583 log_debug(4)
584 while 1:
585 if self._result_set_exhausted:
586
587 pos = self._params_pos
588 pos = pos + 1
589 self._params_pos = pos
590 if pos == len(self._params):
591
592 return None
593
594 log_debug(5, "Using param", pos, self._params[pos])
595 self._statement.execute(**self._params[pos])
596 self._result_set_exhausted = 0
597
598 continue
599
600
601 row = self._statement.fetchone_dict()
602 if row:
603 return row
604
605 self._result_set_exhausted = 1
606
609
610 """ This class will attempt to retrieve information, either from the database or
611 from a local cache.
612
613 Note that we expect at most one result set per database query - this can be
614 easily fixed if we need more.
615 """
616
617 - def __init__(self, statement, params, cache_get):
618 self._statement = statement
619
620
621
622 self._params = params
623 self._params_pos = 0
624 self._cache_get = cache_get
625
627 log_debug(4)
628 while 1:
629 if self._params_pos == len(self._params):
630 log_debug(4, "End of iteration")
631 self.close()
632 return None
633 log_debug(4, "Fetching set for param", self._params_pos)
634
635 params = self._params[self._params_pos]
636 self._params_pos = self._params_pos + 1
637
638
639 val = self._cache_get(params)
640 if val is not None:
641
642 log_debug(2, "Cache HIT for %s" % params)
643 return val
644
645 log_debug(4, "Cache MISS for %s" % params)
646 start = time.time()
647 self._execute(params)
648 row = self._statement.fetchone_dict()
649
650 if row:
651 log_debug(5, "Timer: %.2f" % (time.time() - start))
652 return (params, row)
653
654
655 return None
656
660
662 """ Make sure we remove references to these objects, or circular
663 references can occur.
664 """
665 log_debug(3, "Closing the iterator")
666 self._statement = None
667 self._cache_get = None
668 self._params = None
669
734
737 _query_list_channels = rhnSQL.Statement("""
738 select c.id, c.org_id,
739 c.label, ca.label channel_arch, c.basedir, c.name,
740 c.summary, c.description, c.gpg_key_url,
741 ct.label checksum_type,
742 TO_CHAR(c.last_modified, 'YYYYMMDDHH24MISS') last_modified,
743 pc.label parent_channel, c.channel_access
744 from rhnChannel c left outer join rhnChannel pc on c.parent_channel = pc.id
745 left outer join rhnChecksumType ct on c.checksum_type_id = ct.id, rhnChannelArch ca
746 where c.id = :channel_id
747 and c.channel_arch_id = ca.id
748 """)
749
750 - def __init__(self, writer, channels=(), start_date=None, end_date=None, use_rhn_date=True, whole_errata=False):
751 exportLib.ChannelsDumper.__init__(self, writer, channels)
752 self.start_date = start_date
753 self.end_date = end_date
754 self.use_rhn_date = use_rhn_date
755 self.whole_errata = whole_errata
756
764
772
775 iterator_query = rhnSQL.Statement("""
776 select c.id, c.label, ca.label channel_arch, c.basedir, c.name,
777 c.summary, c.description, c.gpg_key_url, c.org_id,
778 TO_CHAR(c.last_modified, 'YYYYMMDDHH24MISS') last_modified,
779 c.channel_product_id,
780 pc.label parent_channel,
781 cp.product channel_product,
782 cp.version channel_product_version,
783 cp.beta channel_product_beta,
784 c.receiving_updates,
785 ct.label checksum_type,
786 c.channel_access
787 from rhnChannel c left outer join rhnChannel pc on c.parent_channel = pc.id
788 left outer join rhnChannelProduct cp on c.channel_product_id = cp.id
789 left outer join rhnChecksumType ct on c.checksum_type_id = ct.id,
790 rhnChannelArch ca
791 where c.id = :channel_id
792 and c.channel_arch_id = ca.id
793 """)
794
796 channel_id = params['channel_id']
797 return "xml-channels/rhn-channel-%d.xml" % channel_id
798
801 iterator_query = rhnSQL.Statement("""
802 select
803 p.id,
804 p.org_id,
805 pn.name,
806 (pe.evr).version as version,
807 (pe.evr).release as release,
808 (pe.evr).epoch as epoch,
809 pa.label as package_arch,
810 c.checksum_type,
811 c.checksum,
812 p.package_size,
813 TO_CHAR(p.last_modified, 'YYYYMMDDHH24MISS') as last_modified
814 from rhnPackage p, rhnPackageName pn, rhnPackageEVR pe,
815 rhnPackageArch pa, rhnChecksumView c
816 where p.id = :package_id
817 and p.name_id = pn.id
818 and p.evr_id = pe.id
819 and p.package_arch_id = pa.id
820 and p.checksum_id = c.id
821 """)
822 item_id_key = 'package_id'
823 hash_factor = 2
824 key_template = 'xml-short-packages/%s/rhn-package-short-%s.xml'
825
828 iterator_query = rhnSQL.Statement("""
829 select
830 p.id,
831 p.org_id,
832 pn.name,
833 (pe.evr).version as version,
834 (pe.evr).release as release,
835 (pe.evr).epoch as epoch,
836 pa.label as package_arch,
837 pg.name as package_group,
838 p.rpm_version,
839 p.description,
840 p.summary,
841 p.package_size,
842 p.payload_size,
843 p.installed_size,
844 p.build_host,
845 TO_CHAR(p.build_time, 'YYYYMMDDHH24MISS') as build_time,
846 sr.name as source_rpm,
847 c.checksum_type,
848 c.checksum,
849 p.vendor,
850 p.payload_format,
851 p.compat,
852 p.header_sig,
853 p.header_start,
854 p.header_end,
855 p.copyright,
856 p.cookie,
857 TO_CHAR(p.last_modified, 'YYYYMMDDHH24MISS') as last_modified
858 from rhnPackage p, rhnPackageName pn, rhnPackageEVR pe,
859 rhnPackageArch pa, rhnPackageGroup pg, rhnSourceRPM sr,
860 rhnChecksumView c
861 where p.id = :package_id
862 and p.name_id = pn.id
863 and p.evr_id = pe.id
864 and p.package_arch_id = pa.id
865 and p.package_group = pg.id
866 and p.source_rpm_id = sr.id
867 and p.checksum_id = c.id
868 """)
869 item_id_key = 'package_id'
870 hash_factor = 2
871 key_template = 'xml-packages/%s/rhn-package-%s.xml'
872
875 iterator_query = rhnSQL.Statement("""
876 select
877 ps.id,
878 sr.name source_rpm,
879 pg.name package_group,
880 ps.rpm_version,
881 ps.payload_size,
882 ps.build_host,
883 TO_CHAR(ps.build_time, 'YYYYMMDDHH24MISS') build_time,
884 sig.checksum sigchecksum,
885 sig.checksum_type sigchecksum_type,
886 ps.vendor,
887 ps.cookie,
888 ps.package_size,
889 c.checksum_type,
890 c.checksum,
891 TO_CHAR(ps.last_modified, 'YYYYMMDDHH24MISS') last_modified
892 from rhnPackageSource ps, rhnPackageGroup pg, rhnSourceRPM sr,
893 rhnChecksumView c, rhnChecksumView sig
894 where ps.id = :package_id
895 and ps.package_group = pg.id
896 and ps.source_rpm_id = sr.id
897 and ps.checksum_id = c.id
898 and ps.sigchecksum_id = sig.id
899 """)
900 item_id_key = 'package_id'
901 hash_factor = 2
902 key_template = 'xml-packages/%s/rhn-source-package-%s.xml'
903
906 iterator_query = rhnSQL.Statement("""
907 select
908 e.id,
909 e.org_id,
910 e.advisory_name,
911 e.advisory,
912 e.advisory_type,
913 e.advisory_rel,
914 e.product,
915 e.description,
916 e.synopsis,
917 e.topic,
918 e.solution,
919 TO_CHAR(e.issue_date, 'YYYYMMDDHH24MISS') issue_date,
920 TO_CHAR(e.update_date, 'YYYYMMDDHH24MISS') update_date,
921 TO_CHAR(e.last_modified, 'YYYYMMDDHH24MISS') last_modified,
922 e.refers_to,
923 e.notes,
924 e.errata_from,
925 e.severity_id
926 from rhnErrata e
927 where e.id = :errata_id
928 """)
929
934
937 iterator_query = rhnSQL.Statement("""
938 select kt.id,
939 c.label channel,
940 kt.base_path "base-path",
941 kt.label,
942 kt.boot_image "boot-image",
943 ktt.name "kstree-type-name",
944 ktt.label "kstree-type-label",
945 kit.name "install-type-name",
946 kit.label "install-type-label",
947 TO_CHAR(kt.last_modified, 'YYYYMMDDHH24MISS') "last-modified"
948 from rhnKickstartableTree kt,
949 rhnKSTreeType ktt,
950 rhnKSInstallType kit,
951 rhnChannel c
952 where kt.channel_id = c.id
953 and ktt.id = kt.kstree_type
954 and kit.id = kt.install_type
955 and kt.org_id is NULL
956 and kt.label = :kickstart_label
957 """)
958
960 kickstart_label = params['kickstart_label']
961 return "xml-kickstartable-tree/%s.xml" % kickstart_label
962
966
969
970 """Writes to multiple streams at the same time"""
971
973 self.streams = streams
974
979