Package backend :: Package satellite_tools :: Package disk_dumper :: Module dumper
[hide private]
[frames] | no frames]

Source Code for Module backend.satellite_tools.disk_dumper.dumper

  1  # 
  2  # Copyright (c) 2008--2018 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  import time 
 17  import gzip 
 18  import sys 
 19  import tempfile 
 20  try: 
 21      #  python 2 
 22      from cStringIO import StringIO 
 23  except ImportError: 
 24      #  python3 
 25      from io import StringIO 
 26   
 27  from spacewalk.common.usix import raise_with_tb, ListType 
 28  from spacewalk.common import rhnCache, rhnLib, rhnFlags 
 29  from spacewalk.common.rhnLog import log_debug, log_error 
 30  from spacewalk.common.rhnConfig import CFG 
 31  from spacewalk.common.rhnException import rhnFault 
 32  from spacewalk.server import rhnSQL 
 33  from spacewalk.satellite_tools import constants 
 34  from spacewalk.satellite_tools.exporter import exportLib, xmlWriter 
 35  from string_buffer import StringBuffer 
36 37 38 -class XML_Dumper:
39
40 - def __init__(self):
41 self.compress_level = 5 42 self.llimit = None 43 self.ulimit = None 44 self._channel_family_query = """ 45 select cf.id channel_family_id, to_number(null, null) quantity 46 from rhnChannelFamily cf 47 """ 48 self.channel_ids = [] 49 self.channel_ids_for_families = [] 50 self.exportable_orgs = 'null' 51 self._raw_stream = None
52
53 - def send(self, data):
54 # to be overwritten in subclass 55 pass
56
57 - def close(self):
58 # to be overwritten in subclass 59 pass
60
62 query = """ 63 select cf.*, scf.quantity max_members 64 from rhnChannelFamily cf, 65 (%s 66 ) scf 67 where scf.channel_family_id = cf.id 68 """ % self._channel_family_query 69 return rhnSQL.prepare(query)
70 71 @staticmethod
72 - def get_orgs_statement(org_ids):
73 query = """ 74 select wc.id, wc.name 75 from web_customer wc 76 where wc.id in (%s) 77 """ % org_ids 78 return rhnSQL.prepare(query)
79 80 @staticmethod
82 83 args = { 84 'ch_ids': cids 85 } 86 87 query = """ 88 select distinct cf.*, to_number(null, null) max_members 89 from rhnchannelfamily cf, rhnchannelfamilymembers cfm 90 where cf.id = cfm.channel_family_id and cfm.channel_id in ( %(ch_ids)s ) 91 """ 92 return rhnSQL.prepare(query % args)
93 94 @staticmethod
96 if cids: 97 query = """ 98 select distinct pn.label, pn.name 99 from rhnchannel c, rhnproductname pn 100 where c.product_name_id = pn.id and c.id in ( %s ) 101 """ % cids 102 else: 103 query = """ 104 select label, name from rhnproductname 105 """ 106 return rhnSQL.prepare(query)
107
108 - def get_channels_statement(self):
109 query = """ 110 select c.id channel_id, c.label, 111 ct.label as checksum_type, 112 TO_CHAR(c.last_modified, 'YYYYMMDDHH24MISS') last_modified 113 from rhnChannel c left outer join rhnChecksumType ct on c.checksum_type_id = ct.id, 114 rhnChannelFamilyMembers cfm, 115 (%s 116 ) scf 117 where scf.channel_family_id = cfm.channel_family_id 118 and cfm.channel_id = c.id 119 """ % self._channel_family_query 120 return rhnSQL.prepare(query)
121
122 - def get_packages_statement(self):
123 query = """ 124 select p.id package_id, 125 TO_CHAR(p.last_modified, 'YYYYMMDDHH24MISS') last_modified 126 from rhnChannelPackage cp, rhnPackage p, 127 rhnChannelFamilyMembers cfm, 128 (%s 129 ) scf 130 where scf.channel_family_id = cfm.channel_family_id 131 and cfm.channel_id = cp.channel_id 132 and cp.package_id = :package_id 133 and p.id = :package_id 134 """ % self._channel_family_query 135 return rhnSQL.prepare(query)
136
138 query = """ 139 select ps.id package_id, 140 TO_CHAR(ps.last_modified, 'YYYYMMDDHH24MISS') last_modified 141 from rhnChannelPackage cp, rhnPackage p, rhnPackageSource ps, 142 rhnChannelFamilyMembers cfm, 143 (%s 144 ) scf 145 where scf.channel_family_id = cfm.channel_family_id 146 and cfm.channel_id = cp.channel_id 147 and cp.package_id = p.id 148 and p.source_rpm_id = ps.source_rpm_id 149 and ((p.org_id is null and ps.org_id is null) or 150 p.org_id = ps.org_id) 151 and ps.id = :package_id 152 """ % self._channel_family_query 153 return rhnSQL.prepare(query)
154
155 - def get_errata_statement(self):
156 query = """ 157 select e.id errata_id, 158 TO_CHAR(e.last_modified, 'YYYYMMDDHH24MISS') last_modified 159 from rhnChannelErrata ce, rhnErrata e, 160 rhnChannelFamilyMembers cfm, 161 (%s 162 ) scf 163 where scf.channel_family_id = cfm.channel_family_id 164 and cfm.channel_id = ce.channel_id 165 and ce.errata_id = :errata_id 166 and e.id = :errata_id 167 """ % self._channel_family_query 168 return rhnSQL.prepare(query)
169
170 - def _get_xml_writer(self):
171 return xmlWriter.XMLWriter(stream=StringBuffer(self))
172
173 - def _write_dump(self, item_dumper_class, **kwargs):
174 writer = self._get_xml_writer() 175 dumper = SatelliteDumper(writer, item_dumper_class(writer, **kwargs)) 176 dumper.dump() 177 writer.flush() 178 log_debug(4, "OK")
179 180 # Dumper functions here
181 - def dump_blacklist_obsoletes(self):
185
186 - def dump_arches(self, rpm_arch_type_only=0):
187 log_debug(2) 188 writer = self._get_xml_writer() 189 dumper = SatelliteDumper( 190 writer, 191 exportLib.ChannelArchesDumper(writer, rpm_arch_type_only=rpm_arch_type_only), 192 exportLib.PackageArchesDumper(writer, rpm_arch_type_only=rpm_arch_type_only), 193 exportLib.ServerArchesDumper(writer, rpm_arch_type_only=rpm_arch_type_only), 194 exportLib.CPUArchesDumper(writer), 195 exportLib.ServerPackageArchCompatDumper(writer, rpm_arch_type_only=rpm_arch_type_only), 196 exportLib.ServerChannelArchCompatDumper(writer, rpm_arch_type_only=rpm_arch_type_only), 197 exportLib.ChannelPackageArchCompatDumper(writer, rpm_arch_type_only=rpm_arch_type_only)) 198 dumper.dump() 199 writer.flush() 200 log_debug(4, "OK") 201 self.close() 202 return 0
203
204 - def dump_product_names(self):
205 log_debug(2) 206 207 # Export only product names for relevant channels 208 cids = ','.join([str(x['channel_id']) for x in self.channel_ids + self.channel_ids_for_families]) 209 h = self.get_product_names_statement(cids) 210 h.execute() 211 212 self._write_dump(exportLib.ProductNamesDumper, data_iterator=h) 213 return 0
214
215 - def dump_server_group_type_server_arches(self, rpm_arch_type_only=0, 216 virt_filter=0):
217 log_debug(2) 218 self._write_dump(exportLib.ServerGroupTypeServerArchCompatDumper, 219 rpm_arch_type_only=rpm_arch_type_only, virt_filter=virt_filter) 220 return 0
221
222 - def dump_channel_families(self):
223 log_debug(2) 224 225 cids = ','.join([str(x['channel_id']) for x in self.channel_ids + self.channel_ids_for_families]) 226 227 h = self.get_channel_families_statement_new(cids) 228 h.execute() 229 230 self._write_dump(exportLib.ChannelFamiliesDumper, 231 data_iterator=h, null_max_members=0) 232 return 0
233
234 - def set_exportable_orgs(self, org_list):
235 if not org_list: 236 self.exportable_orgs = 'null' 237 elif isinstance(org_list, type('')): 238 self.exportable_orgs = org_list 239 else: 240 self.exportable_orgs = ', '.join([str(x) for x in org_list])
241
242 - def dump_orgs(self):
243 log_debug(2) 244 245 h = self.get_orgs_statement(self.exportable_orgs) 246 h.execute() 247 248 self._write_dump(exportLib.OrgsDumper, data_iterator=h) 249 return 0
250
251 - def dump_channels(self, channel_labels=None, start_date=None, end_date=None, use_rhn_date=True, whole_errata=False):
252 log_debug(2) 253 #channels = self._validate_channels(channel_labels=channel_labels) 254 255 self._write_dump(ChannelsDumper, 256 channels=channel_labels, start_date=start_date, end_date=end_date, use_rhn_date=use_rhn_date, 257 whole_errata=whole_errata) 258 return 0
259
260 - def _send_headers(self, error=0, init_compressed_stream=1):
261 """to be overwritten in subclass""" 262 pass
263
264 - def dump_channel_packages_short(self, channel_label, last_modified, filepath=None, 265 validate_channels=False, send_headers=False, 266 open_stream=True):
267 log_debug(2, channel_label) 268 if validate_channels: 269 channels = self._validate_channels(channel_labels=[channel_label]) 270 channel_obj = channels[channel_label] 271 else: 272 channels = channel_label 273 channel_obj = channels 274 db_last_modified = int(rhnLib.timestamp(channel_obj['last_modified'])) 275 last_modified = int(rhnLib.timestamp(last_modified)) 276 log_debug(3, "last modified", last_modified, "db last modified", 277 db_last_modified) 278 if last_modified != db_last_modified: 279 raise rhnFault(3013, "The requested channel version does not match" 280 " the upstream version", explain=0) 281 channel_id = channel_obj['channel_id'] 282 if filepath: 283 key = filepath 284 else: 285 key = "xml-channel-packages/rhn-channel-%d.data" % channel_id 286 # Try to get everything off of the cache 287 val = rhnCache.get(key, compressed=0, raw=1, modified=last_modified) 288 if val is None: 289 # Not generated yet 290 log_debug(4, "Cache MISS for %s (%s)" % (channel_label, 291 channel_id)) 292 stream = self._cache_channel_packages_short(channel_id, key, 293 last_modified) 294 else: 295 log_debug(4, "Cache HIT for %s (%s)" % (channel_label, 296 channel_id)) 297 temp_stream = tempfile.TemporaryFile() 298 temp_stream.write(val) 299 temp_stream.flush() 300 stream = self._normalize_compressed_stream(temp_stream) 301 302 # Copy the results to the output stream 303 # They shold be already compressed if they were requested to be 304 # compressed 305 buffer_size = 16384 306 # Send the HTTP headers - but don't init the compressed stream since 307 # we send the data ourselves 308 if send_headers: 309 self._send_headers(init_compressed_stream=0) 310 if open_stream: 311 self._raw_stream = open(key, "w") 312 while 1: 313 buff = stream.read(buffer_size) 314 if not buff: 315 break 316 try: 317 self._raw_stream.write(buff) 318 except IOError: 319 log_error("Client disconnected prematurely") 320 self.close() 321 raise_with_tb(ClosedConnectionError, sys.exc_info()[2]) 322 # We're done 323 if open_stream: 324 self._raw_stream.close() 325 return 0
326 327 _query_get_channel_packages = rhnSQL.Statement(""" 328 select cp.package_id, 329 TO_CHAR(p.last_modified, 'YYYYMMDDHH24MISS') last_modified 330 from rhnChannelPackage cp, 331 rhnPackage p 332 where cp.channel_id = :channel_id 333 and cp.package_id = p.id 334 """) 335
336 - def _cache_channel_packages_short(self, channel_id, key, last_modified):
337 """ Caches the short package entries for channel_id """ 338 # Create a temporary file 339 temp_stream = tempfile.TemporaryFile() 340 # Always compress the result 341 compress_level = 5 342 stream = gzip.GzipFile(None, "wb", compress_level, temp_stream) 343 writer = xmlWriter.XMLWriter(stream=stream) 344 345 # Fetch packages 346 h = rhnSQL.prepare(self._query_get_channel_packages) 347 h.execute(channel_id=channel_id) 348 package_ids = h.fetchall_dict() or [] 349 # Sort packages 350 package_ids.sort(lambda a, b: cmp(a['package_id'], b['package_id'])) 351 352 dumper = SatelliteDumper(writer, 353 ShortPackagesDumper(writer, package_ids)) 354 dumper.dump() 355 writer.flush() 356 # We're done with the stream object 357 stream.close() 358 del stream 359 temp_stream.seek(0, 0) 360 # Set the value in the cache. We don't recompress the result since 361 # it's already compressed 362 rhnCache.set(key, temp_stream.read(), modified=last_modified, 363 compressed=0, raw=1) 364 return self._normalize_compressed_stream(temp_stream)
365
366 - def _normalize_compressed_stream(self, stream):
367 """ Given a compressed stream, will either return the stream, or will 368 decompress it and return it, depending on the compression level 369 self.compress_level 370 """ 371 stream.seek(0, 0) 372 if self.compress_level: 373 # Output should be compressed; nothing else to to 374 return stream 375 # Argh, have to decompress 376 return gzip.GzipFile(None, "rb", 0, stream)
377
378 - def dump_packages(self, packages):
379 log_debug(2) 380 return self._packages(packages, prefix='rhn-package-', 381 dump_class=PackagesDumper)
382
383 - def dump_packages_short(self, packages):
384 log_debug(2) 385 return self._packages(packages, prefix='rhn-package-', 386 dump_class=ShortPackagesDumper)
387
388 - def dump_source_packages(self, packages):
389 log_debug(2) 390 return self._packages(packages, prefix='rhn-source-package-', 391 dump_class=SourcePackagesDumper, sources=1)
392 393 @staticmethod
394 - def _get_item_id(prefix, name, errnum, errmsg):
395 prefix_len = len(prefix) 396 if name[:prefix_len] != prefix: 397 raise rhnFault(errnum, errmsg % name) 398 try: 399 uuid = int(name[prefix_len:]) 400 except ValueError: 401 raise_with_tb(rhnFault(errnum, errmsg % name), sys.exc_info()[2]) 402 return uuid
403
404 - def _packages(self, packages, prefix, dump_class, sources=0, 405 verify_packages=False):
406 packages_hash = {} 407 if verify_packages: 408 if sources: 409 h = self.get_source_packages_statement() 410 else: 411 h = self.get_packages_statement() 412 413 for package in packages: 414 package_id = self._get_item_id(prefix, str(package), 415 3002, 'Invalid package name %s') 416 if package_id in packages_hash: 417 # Already verified 418 continue 419 h.execute(package_id=package_id) 420 row = h.fetchone_dict() 421 if not row: 422 # XXX Silently ignore it? 423 raise rhnFault(3003, "No such package %s" % package) 424 # Saving the row, it's handy later when we create the iterator 425 packages_hash[package_id] = row 426 else: 427 for package in packages: 428 packages_hash[package['package_id']] = package 429 430 self._write_dump(dump_class, params=list(packages_hash.values())) 431 return 0
432
433 - def dump_errata(self, errata, verify_errata=False):
434 log_debug(2) 435 436 errata_hash = {} 437 if verify_errata: 438 h = self.get_errata_statement() 439 for erratum in errata: 440 errata_id = self._get_item_id('rhn-erratum-', str(erratum), 441 3004, "Wrong erratum name %s") 442 if errata_id in errata_hash: 443 # Already verified 444 continue 445 h.execute(errata_id=errata_id) 446 row = h.fetchone_dict() 447 if not row: 448 # XXX Silently ignore it? 449 raise rhnFault(3005, "No such erratum %s" % erratum) 450 # Saving the row, it's handy later when we create the iterator 451 errata_hash[errata_id] = row 452 else: 453 for erratum in errata: 454 errata_hash[erratum['errata_id']] = erratum 455 456 self._write_dump(ErrataDumper, params=list(errata_hash.values())) 457 return 0
458
459 - def dump_kickstartable_trees(self, kickstart_labels=None, 460 validate_kickstarts=False):
461 log_debug(2) 462 if validate_kickstarts: 463 kickstart_labels = self._validate_kickstarts( 464 kickstart_labels=kickstart_labels) 465 466 self._write_dump(KickstartableTreesDumper, params=kickstart_labels) 467 return 0
468
469 - def _validate_channels(self, channel_labels=None):
470 log_debug(4) 471 # Sanity check 472 if channel_labels: 473 if not isinstance(channel_labels, ListType): 474 raise rhnFault(3000, 475 "Expected list of channels, got %s" % type(channel_labels)) 476 477 h = self.get_channels_statement() 478 h.execute() 479 # Hash the list of all available channels based on the label 480 all_channels_hash = {} 481 while 1: 482 row = h.fetchone_dict() 483 if not row: 484 break 485 all_channels_hash[row['label']] = row 486 487 # Intersect the list of channels they've sent to us 488 iss_slave_sha256_capable = (float(rhnFlags.get('X-RHN-Satellite-XML-Dump-Version')) 489 >= constants.SHA256_SUPPORTED_VERSION) 490 491 if not channel_labels: 492 channels = all_channels_hash 493 else: 494 channels = {} 495 for label in channel_labels: 496 if label not in all_channels_hash: 497 raise rhnFault(3001, "Could not retrieve channel %s" % 498 label) 499 if not (iss_slave_sha256_capable 500 or all_channels_hash[label]['checksum_type'] in [None, 'sha1']): 501 raise rhnFault(3001, 502 ("Channel %s has incompatible rpm checksum (%s). Please contact\n" 503 + "Red Hat support for information about upgrade to newer version\n" 504 + "of Satellite Server which supports it.") % 505 (label, all_channels_hash[label]['checksum_type'])) 506 channels[label] = all_channels_hash[label] 507 508 return channels
509 510 _query_validate_kickstarts = rhnSQL.Statement(""" 511 select kt.label kickstart_label, 512 TO_CHAR(kt.modified, 'YYYYMMDDHH24MISS') last_modified 513 from rhnKickstartableTree kt 514 where kt.channel_id = :channel_id 515 and kt.org_id is null 516 """) 517
518 - def _validate_kickstarts(self, kickstart_labels):
519 log_debug(4) 520 # Saity check 521 if kickstart_labels: 522 if not isinstance(kickstart_labels, ListType): 523 raise rhnFault(3000, 524 "Expected list of kickstart labels, got %s" % 525 type(kickstart_labels)) 526 527 all_ks_hash = {} 528 529 h = self.get_channels_statement() 530 h.execute() 531 532 hk = rhnSQL.prepare(self._query_validate_kickstarts) 533 while 1: 534 channel = h.fetchone_dict() 535 if not channel: 536 break 537 538 hk.execute(channel_id=channel['channel_id']) 539 while 1: 540 row = hk.fetchone_dict() 541 if not row: 542 break 543 all_ks_hash[row['kickstart_label']] = row 544 545 if not kickstart_labels: 546 return list(all_ks_hash.values()) 547 548 result = [] 549 for l in kickstart_labels: 550 if l in all_ks_hash: 551 result.append(all_ks_hash[l]) 552 553 return result
554
555 556 -class SatelliteDumper(exportLib.SatelliteDumper):
557
558 - def set_attributes(self):
559 """ Overriding with our own version """ 560 attributes = exportLib.SatelliteDumper.set_attributes(self) 561 attributes['version'] = constants.PROTOCOL_VERSION 562 attributes['generation'] = CFG.SAT_CERT_GENERATION 563 return attributes
564
565 566 -class QueryIterator:
567 568 """ A query iterator successively applies the list of params as execute() to the 569 statement that was passed in, and presents the union of the result sets as a 570 single result set. 571 Params is a list of dictionaries that would fill the named bound variables 572 from the statement. 573 """ 574
575 - def __init__(self, statement, params):
576 self._statement = statement 577 self._params = params 578 # Position in the params list 579 self._params_pos = -1 580 self._result_set_exhausted = 1
581
582 - def fetchone_dict(self):
583 log_debug(4) 584 while 1: 585 if self._result_set_exhausted: 586 # Nothing to do here, move to the next set of params 587 pos = self._params_pos 588 pos = pos + 1 589 self._params_pos = pos 590 if pos == len(self._params): 591 # End of the list, we're done 592 return None 593 # Execute the satement 594 log_debug(5, "Using param", pos, self._params[pos]) 595 self._statement.execute(**self._params[pos]) 596 self._result_set_exhausted = 0 597 # Go back into the loop 598 continue 599 600 # Result set not exhausted yet 601 row = self._statement.fetchone_dict() 602 if row: 603 return row 604 605 self._result_set_exhausted = 1
606
607 608 -class CachedQueryIterator:
609 610 """ This class will attempt to retrieve information, either from the database or 611 from a local cache. 612 613 Note that we expect at most one result set per database query - this can be 614 easily fixed if we need more. 615 """ 616
617 - def __init__(self, statement, params, cache_get):
618 self._statement = statement 619 # XXX params has to be a list of hashes, containing at least a 620 # last_modified - which is stripped before the execution of the 621 # statement 622 self._params = params 623 self._params_pos = 0 624 self._cache_get = cache_get
625
626 - def fetchone_dict(self):
627 log_debug(4) 628 while 1: 629 if self._params_pos == len(self._params): 630 log_debug(4, "End of iteration") 631 self.close() 632 return None 633 log_debug(4, "Fetching set for param", self._params_pos) 634 # Get the last modified attribute 635 params = self._params[self._params_pos] 636 self._params_pos = self._params_pos + 1 637 638 # Look up the object in the cache 639 val = self._cache_get(params) 640 if val is not None: 641 # Entry is cached 642 log_debug(2, "Cache HIT for %s" % params) 643 return val 644 645 log_debug(4, "Cache MISS for %s" % params) 646 start = time.time() 647 self._execute(params) 648 row = self._statement.fetchone_dict() 649 650 if row: 651 log_debug(5, "Timer: %.2f" % (time.time() - start)) 652 return (params, row) 653 654 # Dummy return 655 return None
656
657 - def _execute(self, params):
658 log_debug(4, params) 659 self._statement.execute(**params)
660
661 - def close(self):
662 """ Make sure we remove references to these objects, or circular 663 references can occur. 664 """ 665 log_debug(3, "Closing the iterator") 666 self._statement = None 667 self._cache_get = None 668 self._params = None
669
670 671 -class CachedDumper(exportLib.BaseDumper):
672 iterator_query = None 673 item_id_key = 'id' 674 hash_factor = 1 675 key_template = 'dump/%s/dump-%s.xml' 676
677 - def __init__(self, writer, params):
678 statement = rhnSQL.prepare(self.iterator_query) 679 iterator = CachedQueryIterator(statement, params, 680 cache_get=self.cache_get) 681 exportLib.BaseDumper.__init__(self, writer, data_iterator=iterator) 682 self.non_cached_class = self.__class__.__bases__[1]
683 684 @staticmethod
685 - def _get_last_modified(params):
686 """ To be overwritten. """ 687 return params['last_modified']
688
689 - def _get_key(self, params):
690 item_id = str(params[self.item_id_key]) 691 hash_val = rhnLib.hash_object_id(item_id, self.hash_factor) 692 return self.key_template % (hash_val, item_id)
693
694 - def cache_get(self, params):
695 log_debug(4, params) 696 key = self._get_key(params) 697 last_modified = self._get_last_modified(params) 698 return rhnCache.get(key, modified=last_modified, raw=1)
699
700 - def cache_set(self, params, value):
701 log_debug(4, params) 702 last_modified = self._get_last_modified(params) 703 key = self._get_key(params) 704 user = 'apache' 705 group = 'apache' 706 if rhnLib.isSUSE(): 707 user = 'wwwrun' 708 group = 'www' 709 return rhnCache.set(key, value, modified=last_modified, 710 raw=1, user=user, group=group, mode=int('0755', 8))
711
712 - def dump_subelement(self, data):
713 log_debug(2) 714 # CachedQueryIterator returns (params, row) as data 715 params, row = data 716 s = StringIO() 717 # Back up the old writer and replace it with a StringIO-based one 718 ow = self.get_writer() 719 # Use into a tee stream (which writes to both streams at the same 720 # time) 721 tee_stream = TeeStream(s, ow.stream) 722 self.set_writer(xmlWriter.XMLWriter(stream=tee_stream, skip_xml_decl=1)) 723 724 start = time.time() 725 # call dump_subelement() from original (non-cached) class 726 self.non_cached_class.dump_subelement(self, row) 727 log_debug(5, 728 "Timer for _dump_subelement: %.2f" % (time.time() - start)) 729 730 # Restore the old writer 731 self.set_writer(ow) 732 733 self.cache_set(params, s.getvalue())
734
735 736 -class ChannelsDumper(exportLib.ChannelsDumper):
737 _query_list_channels = rhnSQL.Statement(""" 738 select c.id, c.org_id, 739 c.label, ca.label channel_arch, c.basedir, c.name, 740 c.summary, c.description, c.gpg_key_url, 741 ct.label checksum_type, 742 TO_CHAR(c.last_modified, 'YYYYMMDDHH24MISS') last_modified, 743 pc.label parent_channel, c.channel_access 744 from rhnChannel c left outer join rhnChannel pc on c.parent_channel = pc.id 745 left outer join rhnChecksumType ct on c.checksum_type_id = ct.id, rhnChannelArch ca 746 where c.id = :channel_id 747 and c.channel_arch_id = ca.id 748 """) 749
750 - def __init__(self, writer, channels=(), start_date=None, end_date=None, use_rhn_date=True, whole_errata=False):
751 exportLib.ChannelsDumper.__init__(self, writer, channels) 752 self.start_date = start_date 753 self.end_date = end_date 754 self.use_rhn_date = use_rhn_date 755 self.whole_errata = whole_errata
756
757 - def dump_subelement(self, data):
758 log_debug(6, data) 759 # return exportLib.ChannelsDumper.dump_subelement(self, data) 760 # pylint: disable=W0212 761 c = exportLib._ChannelDumper(self._writer, data, self.start_date, self.end_date, 762 self.use_rhn_date, self.whole_errata) 763 c.dump()
764
765 - def set_iterator(self):
766 if not self._channels: 767 # Nothing to do 768 return [] 769 770 h = rhnSQL.prepare(self._query_list_channels) 771 return QueryIterator(statement=h, params=self._channels)
772
773 774 -class ChannelsDumperEx(CachedDumper, exportLib.ChannelsDumper):
775 iterator_query = rhnSQL.Statement(""" 776 select c.id, c.label, ca.label channel_arch, c.basedir, c.name, 777 c.summary, c.description, c.gpg_key_url, c.org_id, 778 TO_CHAR(c.last_modified, 'YYYYMMDDHH24MISS') last_modified, 779 c.channel_product_id, 780 pc.label parent_channel, 781 cp.product channel_product, 782 cp.version channel_product_version, 783 cp.beta channel_product_beta, 784 c.receiving_updates, 785 ct.label checksum_type, 786 c.channel_access 787 from rhnChannel c left outer join rhnChannel pc on c.parent_channel = pc.id 788 left outer join rhnChannelProduct cp on c.channel_product_id = cp.id 789 left outer join rhnChecksumType ct on c.checksum_type_id = ct.id, 790 rhnChannelArch ca 791 where c.id = :channel_id 792 and c.channel_arch_id = ca.id 793 """) 794
795 - def _get_key(self, params):
796 channel_id = params['channel_id'] 797 return "xml-channels/rhn-channel-%d.xml" % channel_id
798
799 800 -class ShortPackagesDumper(CachedDumper, exportLib.ShortPackagesDumper):
801 iterator_query = rhnSQL.Statement(""" 802 select 803 p.id, 804 p.org_id, 805 pn.name, 806 (pe.evr).version as version, 807 (pe.evr).release as release, 808 (pe.evr).epoch as epoch, 809 pa.label as package_arch, 810 c.checksum_type, 811 c.checksum, 812 p.package_size, 813 TO_CHAR(p.last_modified, 'YYYYMMDDHH24MISS') as last_modified 814 from rhnPackage p, rhnPackageName pn, rhnPackageEVR pe, 815 rhnPackageArch pa, rhnChecksumView c 816 where p.id = :package_id 817 and p.name_id = pn.id 818 and p.evr_id = pe.id 819 and p.package_arch_id = pa.id 820 and p.checksum_id = c.id 821 """) 822 item_id_key = 'package_id' 823 hash_factor = 2 824 key_template = 'xml-short-packages/%s/rhn-package-short-%s.xml'
825
826 827 -class PackagesDumper(CachedDumper, exportLib.PackagesDumper):
828 iterator_query = rhnSQL.Statement(""" 829 select 830 p.id, 831 p.org_id, 832 pn.name, 833 (pe.evr).version as version, 834 (pe.evr).release as release, 835 (pe.evr).epoch as epoch, 836 pa.label as package_arch, 837 pg.name as package_group, 838 p.rpm_version, 839 p.description, 840 p.summary, 841 p.package_size, 842 p.payload_size, 843 p.installed_size, 844 p.build_host, 845 TO_CHAR(p.build_time, 'YYYYMMDDHH24MISS') as build_time, 846 sr.name as source_rpm, 847 c.checksum_type, 848 c.checksum, 849 p.vendor, 850 p.payload_format, 851 p.compat, 852 p.header_sig, 853 p.header_start, 854 p.header_end, 855 p.copyright, 856 p.cookie, 857 TO_CHAR(p.last_modified, 'YYYYMMDDHH24MISS') as last_modified 858 from rhnPackage p, rhnPackageName pn, rhnPackageEVR pe, 859 rhnPackageArch pa, rhnPackageGroup pg, rhnSourceRPM sr, 860 rhnChecksumView c 861 where p.id = :package_id 862 and p.name_id = pn.id 863 and p.evr_id = pe.id 864 and p.package_arch_id = pa.id 865 and p.package_group = pg.id 866 and p.source_rpm_id = sr.id 867 and p.checksum_id = c.id 868 """) 869 item_id_key = 'package_id' 870 hash_factor = 2 871 key_template = 'xml-packages/%s/rhn-package-%s.xml'
872
873 874 -class SourcePackagesDumper(CachedDumper, exportLib.SourcePackagesDumper):
875 iterator_query = rhnSQL.Statement(""" 876 select 877 ps.id, 878 sr.name source_rpm, 879 pg.name package_group, 880 ps.rpm_version, 881 ps.payload_size, 882 ps.build_host, 883 TO_CHAR(ps.build_time, 'YYYYMMDDHH24MISS') build_time, 884 sig.checksum sigchecksum, 885 sig.checksum_type sigchecksum_type, 886 ps.vendor, 887 ps.cookie, 888 ps.package_size, 889 c.checksum_type, 890 c.checksum, 891 TO_CHAR(ps.last_modified, 'YYYYMMDDHH24MISS') last_modified 892 from rhnPackageSource ps, rhnPackageGroup pg, rhnSourceRPM sr, 893 rhnChecksumView c, rhnChecksumView sig 894 where ps.id = :package_id 895 and ps.package_group = pg.id 896 and ps.source_rpm_id = sr.id 897 and ps.checksum_id = c.id 898 and ps.sigchecksum_id = sig.id 899 """) 900 item_id_key = 'package_id' 901 hash_factor = 2 902 key_template = 'xml-packages/%s/rhn-source-package-%s.xml'
903
904 905 -class ErrataDumper(exportLib.ErrataDumper):
906 iterator_query = rhnSQL.Statement(""" 907 select 908 e.id, 909 e.org_id, 910 e.advisory_name, 911 e.advisory, 912 e.advisory_type, 913 e.advisory_rel, 914 e.product, 915 e.description, 916 e.synopsis, 917 e.topic, 918 e.solution, 919 TO_CHAR(e.issue_date, 'YYYYMMDDHH24MISS') issue_date, 920 TO_CHAR(e.update_date, 'YYYYMMDDHH24MISS') update_date, 921 TO_CHAR(e.last_modified, 'YYYYMMDDHH24MISS') last_modified, 922 e.refers_to, 923 e.notes, 924 e.errata_from, 925 e.severity_id 926 from rhnErrata e 927 where e.id = :errata_id 928 """) 929
930 - def __init__(self, writer, params):
931 statement = rhnSQL.prepare(self.iterator_query) 932 iterator = QueryIterator(statement, params) 933 exportLib.ErrataDumper.__init__(self, writer, iterator)
934
935 936 -class KickstartableTreesDumper(CachedDumper, exportLib.KickstartableTreesDumper):
937 iterator_query = rhnSQL.Statement(""" 938 select kt.id, 939 c.label channel, 940 kt.base_path "base-path", 941 kt.label, 942 kt.boot_image "boot-image", 943 ktt.name "kstree-type-name", 944 ktt.label "kstree-type-label", 945 kit.name "install-type-name", 946 kit.label "install-type-label", 947 TO_CHAR(kt.last_modified, 'YYYYMMDDHH24MISS') "last-modified" 948 from rhnKickstartableTree kt, 949 rhnKSTreeType ktt, 950 rhnKSInstallType kit, 951 rhnChannel c 952 where kt.channel_id = c.id 953 and ktt.id = kt.kstree_type 954 and kit.id = kt.install_type 955 and kt.org_id is NULL 956 and kt.label = :kickstart_label 957 """) 958
959 - def _get_key(self, params):
960 kickstart_label = params['kickstart_label'] 961 return "xml-kickstartable-tree/%s.xml" % kickstart_label
962
963 964 -class ClosedConnectionError(Exception):
965 pass
966
967 968 -class TeeStream:
969 970 """Writes to multiple streams at the same time""" 971
972 - def __init__(self, *streams):
973 self.streams = streams
974
975 - def write(self, data):
976 log_debug(6, "Writing %s bytes" % len(data)) 977 for stream in self.streams: 978 stream.write(data)
979