Package backend :: Package satellite_exporter :: Package handlers :: Module non_auth_dumper
[hide private]
[frames] | no frames]

Source Code for Module backend.satellite_exporter.handlers.non_auth_dumper

  1  # Copyright (c) 2008--2018 Red Hat, Inc. 
  2  # 
  3  # This software is licensed to you under the GNU General Public License, 
  4  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  5  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  6  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  7  # along with this software; if not, see 
  8  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
  9  # 
 10  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 11  # granted to use or replicate Red Hat trademarks that are incorporated 
 12  # in this software or its documentation. 
 13  # 
 14   
 15  """ 
 16  Non-authenticated dumper 
 17  """ 
 18   
 19  import os 
 20  try: 
 21      #  python 2 
 22      import xmlrpclib 
 23  except ImportError: 
 24      #  python3 
 25      import xmlrpc.client as xmlrpclib 
 26  import gzip 
 27  import sys 
 28   
 29  from rhn.UserDictCase import UserDictCase 
 30  from spacewalk.common.usix import raise_with_tb 
 31  from spacewalk.common.rhnLog import log_debug, log_error 
 32  from spacewalk.common.rhnConfig import CFG 
 33  from spacewalk.server import rhnSQL, rhnLib 
 34  from spacewalk.server.rhnHandler import rhnHandler 
 35  from spacewalk.server.importlib.backendLib import localtime 
 36  from spacewalk.common.rhnException import rhnFault 
 37   
 38  from spacewalk.common.rhnTranslate import _ 
 39   
 40  from spacewalk.satellite_tools.exporter import exportLib 
 41  from spacewalk.satellite_tools.disk_dumper import dumper 
 42   
 43   
44 -class InvalidPackageError(Exception):
45 pass
46 47
48 -class NullPathPackageError(Exception):
49 pass
50 51
52 -class MissingPackageError(Exception):
53 pass
54 55
56 -class NonAuthenticatedDumper(rhnHandler, dumper.XML_Dumper):
57 # pylint: disable=E1101,W0102,W0613,R0902,R0904 58
59 - def __init__(self, req):
60 rhnHandler.__init__(self) 61 dumper.XML_Dumper.__init__(self) 62 self.headers_out = UserDictCase() 63 self._raw_stream = req 64 self._raw_stream.content_type = 'application/octet-stream' 65 self.compress_level = 0 66 # State machine 67 self._headers_sent = 0 68 self._is_closed = 0 69 self._compressed_stream = None 70 71 self.functions = [ 72 'arches', 73 'arches_extra', 74 'channel_families', 75 'channels', 76 'get_comps', 77 'get_modules', 78 'channel_packages_short', 79 'packages_short', 80 'packages', 81 'source_packages', 82 'errata', 83 'blacklist_obsoletes', 84 'product_names', 85 'get_rpm', 86 'kickstartable_trees', 87 'get_ks_file', 88 'orgs', 89 ] 90 91 self.system_id = None 92 self._channel_family_query_template = """ 93 select cfm.channel_family_id, 0 quantity 94 from rhnChannelFamilyMembers cfm, 95 rhnChannel c, rhnChannelFamily cf 96 where cfm.channel_id = c.id 97 and c.label in (%s) 98 and cfm.channel_family_id = cf.id 99 and cf.label != 'rh-public' 100 and (cf.org_id in (%s) 101 or cf.org_id is null) 102 union 103 select id channel_family_id, NULL quantity 104 from rhnChannelFamily 105 where label = 'rh-public' 106 """ 107 self._channel_family_query_public = """ 108 select id channel_family_id, 0 quantity 109 from rhnChannelFamily 110 where org_id in (%s) 111 or org_id is null 112 """ 113 self._channel_family_query = None
114
115 - def _send_headers(self, error=0, init_compressed_stream=1):
116 log_debug(4, "is_closed", self._is_closed) 117 if self._is_closed: 118 raise Exception("Trying to write to a closed connection") 119 if self._headers_sent: 120 return 121 self._headers_sent = 1 122 if self.compress_level: 123 self.headers_out['Content-Encoding'] = 'gzip' 124 # Send the headers 125 if error: 126 # No compression 127 self.compress_level = 0 128 self._raw_stream.content_type = 'text/xml' 129 for h, v in self.headers_out.items(): 130 self._raw_stream.headers_out[h] = str(v) 131 self._raw_stream.send_http_header() 132 # If need be, start gzipping 133 if self.compress_level and init_compressed_stream: 134 log_debug(4, "Compressing with factor %s" % self.compress_level) 135 self._compressed_stream = gzip.GzipFile(None, "wb", 136 self.compress_level, self._raw_stream)
137
138 - def send(self, data):
139 log_debug(3, "Sending %d bytes" % len(data)) 140 try: 141 self._send_headers() 142 if self._compressed_stream: 143 log_debug(4, "Sending through a compressed stream") 144 self._compressed_stream.write(data) 145 else: 146 self._raw_stream.write(data) 147 except IOError: 148 log_error("Client appears to have closed connection") 149 self.close() 150 raise_with_tb(dumper.ClosedConnectionError, sys.exc_info()[2]) 151 log_debug(5, "Bytes sent", len(data))
152 153 write = send 154
155 - def close(self):
156 log_debug(2, "Closing") 157 if self._is_closed: 158 log_debug(3, "Already closed") 159 return 160 161 if self._compressed_stream: 162 log_debug(5, "Closing a compressed stream") 163 try: 164 self._compressed_stream.close() 165 except IOError: 166 e = sys.exc_info()[1] 167 # Remote end has closed connection already 168 log_error("Error closing the stream", str(e)) 169 170 self._compressed_stream = None 171 self._is_closed = 1 172 log_debug(3, "Closed")
173
174 - def set_channel_family_query(self, channel_labels=[]):
175 if not channel_labels: 176 # All null-pwned channel families 177 self._channel_family_query = self._channel_family_query_public % self.exportable_orgs 178 return self 179 180 self._channel_family_query = self._channel_family_query_template % ( 181 ', '.join(["'%s'" % x for x in channel_labels]), 182 self.exportable_orgs) 183 return self
184
185 - def _get_channel_data(self, channels):
186 writer = ContainerWriter() 187 d = ChannelsDumper(writer, params=list(channels.values())) 188 d.dump() 189 data = writer.get_data() 190 # We don't care about <rhn-channels> here 191 channel_data = self._cleanse_channels(data[2]) 192 return channel_data
193
194 - def _cleanse_channels(channels_dom):
195 channels = {} 196 for dummy, attributes, child_elements in channels_dom: 197 channel_label = attributes['label'] 198 channels[channel_label] = channel_entry = {} 199 200 packages = attributes['packages'].split() 201 del attributes['packages'] 202 203 # Get dir of the prefix 204 prefix = "rhn-package-" 205 prefix_len = len(prefix) 206 packages = [int(x[prefix_len:]) for x in packages] 207 208 channel_entry['packages'] = packages 209 210 ks_trees = attributes['kickstartable-trees'].split() 211 212 channel_entry['ks_trees'] = ks_trees 213 214 # Clean up to reduce memory footprint if possible 215 attributes.clear() 216 217 # tag name to object prefix 218 maps = { 219 'source-packages': ('source_packages', 'rhn-source-package-'), 220 'rhn-channel-errata': ('errata', 'rhn-erratum-'), 221 } 222 # Now look for package sources 223 for tag_name, dummy, celem in child_elements: 224 if tag_name not in maps: 225 continue 226 field, prefix = maps[tag_name] 227 prefix_len = len(prefix) 228 # Hmm. x[1] is the attributes hash; we fetch the id and we get 229 # rid of te prefix, then we run that through int() 230 objects = [] 231 for dummy, ceattr, dummy in celem: 232 obj_id = ceattr['id'] 233 obj_id = int(obj_id[prefix_len:]) 234 last_modified = localtime(ceattr['last-modified']) 235 objects.append((obj_id, last_modified)) 236 channel_entry[field] = objects 237 238 # Clean up to reduce memory footprint if possible 239 del child_elements[:] 240 241 return channels
242 243 _cleanse_channels = staticmethod(_cleanse_channels) 244 245 # Dumper functions here
246 - def dump_channel_families(self):
247 log_debug(2) 248 249 h = self.get_channel_families_statement() 250 h.execute() 251 252 writer = self._get_xml_writer() 253 d = dumper.SatelliteDumper(writer, 254 exportLib.ChannelFamiliesDumper(writer, 255 data_iterator=h, null_max_members=0,),) 256 d.dump() 257 writer.flush() 258 log_debug(4, "OK") 259 self.close() 260 return 0
261
262 - def dump_channels(self, channel_labels=None):
263 log_debug(2) 264 channels = self._validate_channels(channel_labels=channel_labels) 265 266 writer = self._get_xml_writer() 267 d = dumper.SatelliteDumper(writer, dumper.ChannelsDumperEx(writer, 268 params=list(channels.values()))) 269 d.dump() 270 writer.flush() 271 log_debug(4, "OK") 272 self.close() 273 return 0
274
275 - def dump_channel_packages_short(self, channel_label, last_modified):
276 return dumper.XML_Dumper.dump_channel_packages_short( 277 self, channel_label, last_modified, filepath=None, 278 validate_channels=True, send_headers=True, open_stream=False)
279
280 - def _packages(self, packages, prefix, dump_class, sources=0):
281 return dumper.XML_Dumper._packages(self, packages, prefix, dump_class, sources, 282 verify_packages=True)
283
284 - def dump_errata(self, errata):
285 return dumper.XML_Dumper.dump_errata(self, errata, verify_errata=True)
286
287 - def dump_kickstartable_trees(self, kickstart_labels=None):
288 return dumper.XML_Dumper.dump_kickstartable_trees(self, kickstart_labels, 289 validate_kickstarts=True)
290
291 - def dump_product_names(self):
293
294 - def arches(self):
295 return self.dump_arches(rpm_arch_type_only=1)
296
297 - def arches_extra(self):
298 return self.dump_server_group_type_server_arches(rpm_arch_type_only=1)
299
300 - def blacklist_obsoletes(self):
301 return self.dump_blacklist_obsoletes()
302
303 - def product_names(self):
304 return self.dump_product_names()
305
306 - def channel_families(self, channel_labels=[]):
307 self.set_channel_family_query() 308 return self.dump_channel_families()
309
310 - def channels(self, channel_labels, flags={}):
311 if not channel_labels: 312 channel_labels = [] 313 self.set_channel_family_query(channel_labels=channel_labels) 314 return self.dump_channels(channel_labels=channel_labels)
315
316 - def get_comps(self, channel):
317 return self.get_repomd_file(channel, 1)
318
319 - def get_modules(self, channel):
320 return self.get_repomd_file(channel, 2)
321
322 - def channel_packages_short(self, channel_label, last_modified):
323 self.set_channel_family_query(channel_labels=[channel_label]) 324 return self.dump_channel_packages_short(channel_label, last_modified)
325
326 - def packages(self, packages=[]):
329
330 - def packages_short(self, packages=[]):
333
334 - def source_packages(self, packages=[]):
337
338 - def errata(self, errata=[]):
339 self.set_channel_family_query() 340 return self.dump_errata(errata=errata)
341
342 - def orgs(self):
343 return self.dump_orgs()
344
345 - def kickstartable_trees(self, kickstart_labels=[]):
346 self.set_channel_family_query() 347 return self.dump_kickstartable_trees(kickstart_labels=kickstart_labels)
348
349 - def get_rpm(self, package, channel):
352
353 - def get_repomd_file(self, channel, comps_type_id):
354 comps_query = """ 355 select relative_filename 356 from rhnChannelComps 357 where channel_id = ( 358 select id 359 from rhnChannel 360 where label = :channel_label 361 and comps_type_id = :ctype_id 362 ) 363 order by id desc 364 """ 365 channel_comps_sth = rhnSQL.prepare(comps_query) 366 channel_comps_sth.execute(channel_label=channel, ctype_id=comps_type_id) 367 row = channel_comps_sth.fetchone_dict() 368 if not row: 369 raise rhnFault(3015, "No comps/modules file for channel [%s]" % channel) 370 path = os.path.join(CFG.MOUNT_POINT, row['relative_filename']) 371 if not os.path.exists(path): 372 log_error("Missing comps/modules file [%s] for channel [%s]" % (path, channel)) 373 raise rhnFault(3016, "Unable to retrieve comps/modules file for channel [%s]" % channel) 374 return self._send_stream(path)
375
376 - def get_ks_file(self, ks_label, relative_path):
377 log_debug(1, ks_label, relative_path) 378 h = rhnSQL.prepare(""" 379 select base_path 380 from rhnKickstartableTree 381 where label = :ks_label 382 and org_id is null 383 """) 384 h.execute(ks_label=ks_label) 385 row = h.fetchone_dict() 386 if not row: 387 raise rhnFault(3003, "No such file %s in tree %s" % 388 (relative_path, ks_label)) 389 path = os.path.join(CFG.MOUNT_POINT, row['base_path'], relative_path) 390 if not os.path.exists(path): 391 log_error("Missing file for satellite dumper: %s" % path) 392 raise rhnFault(3007, "Unable to retrieve file %s in tree %s" % 393 (relative_path, ks_label)) 394 return self._send_stream(path)
395 396 # Sends a package over the wire 397 # prefix is whatever we prepend to the package id (rhn-package- or 398 # rhn-source-package-)
399 - def _send_package_stream(self, package, channel):
400 log_debug(3, package, channel) 401 path, dummy = self.get_package_path_by_filename(package, channel) 402 403 log_debug(3, "Package path", path) 404 if not os.path.exists(path): 405 log_error("Missing package (satellite dumper): %s" % path) 406 raise rhnFault(3007, "Unable to retrieve package %s" % package) 407 return self._send_stream(path)
408 409 # This query is similar to the one aove, except that we have already 410 # authorized this channel (so no need for server_id) 411 _query_get_package_path_by_nvra = rhnSQL.Statement(""" 412 select distinct 413 p.id, p.path 414 from rhnPackage p, 415 rhnChannelPackage cp, 416 rhnChannel c, 417 rhnPackageArch pa 418 where c.label = :channel 419 and cp.channel_id = c.id 420 and cp.package_id = p.id 421 and p.name_id = LOOKUP_PACKAGE_NAME(:name) 422 and p.evr_id = LOOKUP_EVR(:epoch, :version, :release) 423 and p.package_arch_id = pa.id 424 and pa.label = :arch 425 """) 426
427 - def get_package_path_by_filename(self, fileName, channel):
428 log_debug(3, fileName, channel) 429 fileName = str(fileName) 430 n, e, v, r, a = rhnLib.parseRPMFilename(fileName) 431 432 h = rhnSQL.prepare(self._query_get_package_path_by_nvra) 433 h.execute(name=n, version=v, release=r, epoch=e, arch=a, channel=channel) 434 try: 435 return _get_path_from_cursor(h) 436 except InvalidPackageError: 437 log_debug(4, "Error", "Non-existent package requested", fileName) 438 raise_with_tb(rhnFault(17, _("Invalid RPM package %s requested") % fileName), sys.exc_info()[2]) 439 except NullPathPackageError: 440 e = sys.exc_info()[1] 441 package_id = e[0] 442 log_error("Package path null for package id", package_id) 443 raise_with_tb(rhnFault(17, _("Invalid RPM package %s requested") % fileName), sys.exc_info()[2]) 444 except MissingPackageError: 445 e = sys.exc_info()[1] 446 filePath = e[0] 447 log_error("Package not found", filePath) 448 raise_with_tb(rhnFault(17, _("Package not found")), sys.exc_info()[2])
449 450 # Opens the file and sends the stream
451 - def _send_stream(self, path):
452 try: 453 stream = open(path) 454 except IOError: 455 e = sys.exc_info()[1] 456 if e.errno == 2: 457 raise_with_tb(rhnFault(3007, "Missing file %s" % path), sys.exc_info()[2]) 458 # Let it flow so we can find it later 459 raise 460 461 stream.seek(0, 2) 462 file_size = stream.tell() 463 stream.seek(0, 0) 464 log_debug(3, "Package size", file_size) 465 self.headers_out['Content-Length'] = file_size 466 self.compress_level = 0 467 self._raw_stream.content_type = 'application/x-rpm' 468 self._send_headers() 469 self.send_rpm(stream) 470 return 0
471
472 - def send_rpm(self, stream):
473 buffer_size = 65536 474 while 1: 475 buf = stream.read(buffer_size) 476 if not buf: 477 break 478 try: 479 self._raw_stream.write(buf) 480 except IOError: 481 # client closed the connection? 482 log_error("Client appears to have closed connection") 483 self.close_rpm() 484 raise_with_tb(dumper.ClosedConnectionError, sys.exc_info()[2]) 485 self.close_rpm()
486
487 - def close_rpm(self):
488 self._is_closed = 1
489
490 - def _respond_xmlrpc(self, data):
491 # Marshal 492 s = xmlrpclib.dumps((data, )) 493 494 self.headers_out['Content-Length'] = len(s) 495 self._raw_stream.content_type = 'text/xml' 496 for h, v in self.headers_out.items(): 497 self._raw_stream.headers_out[h] = str(v) 498 self._raw_stream.send_http_header() 499 self._raw_stream.write(s) 500 return 0
501 502
503 -class ContainerWriter:
504 # Same interface as an XML writer, but collects data in a hash instead 505
506 - def __init__(self):
507 self._tag_stack = [] 508 self._cdata = [] 509 self._root = None
510
511 - def open_tag(self, tag_name, attributes=None):
512 # print "+++", tag_name, len(self._tag_stack) 513 if not attributes: 514 attributes = {} 515 self._cdata = [] 516 self._tag_stack.append((tag_name, attributes, self._cdata))
517
518 - def data(self, astring):
519 self._cdata.append(astring)
520
521 - def close_tag(self, tag_name):
522 # print "---", tag_name, len(self._tag_stack) 523 # Extract the current item from the stack 524 tag_name, attributes, cdata = self._tag_stack.pop() 525 526 return self._add_node(tag_name, attributes, cdata)
527
528 - def empty_tag(self, tag_name, attributes=None):
529 # print "+++---", tag_name, len(self._tag_stack) 530 if not attributes: 531 attributes = {} 532 return self._add_node(tag_name, attributes, [])
533
534 - def _add_node(self, tag_name, attributes, cdata):
535 node = (tag_name, attributes, cdata) 536 if not self._tag_stack: 537 # Parent 538 self._root = node 539 return self._root 540 541 # Fetch the parent 542 parent = self._tag_stack[-1] 543 # Add this node as a child 544 parent[2].append(node) 545 return parent
546
547 - def get_data(self):
548 assert self._root is not None 549 return self._root
550 551 # Overwrite the ChannelsDumper class to filter packages/source packages/errata 552 # based on the creation date 553 # XXX No caching for now 554 555
556 -class ChannelsDumper(dumper.ChannelsDumper):
557
558 - def dump_subelement(self, data):
559 c = exportLib.ChannelDumper(self._writer, data) 560 c.dump()
561 562
563 -def _get_path_from_cursor(h):
564 # Function shared between other retrieval functions 565 rs = h.fetchall_dict() 566 if not rs: 567 raise InvalidPackageError 568 569 max_row = rs[0] 570 571 if max_row['path'] is None: 572 573 raise NullPathPackageError(max_row['id']) 574 filePath = "%s/%s" % (CFG.MOUNT_POINT, max_row['path']) 575 pkgId = max_row['id'] 576 if not os.access(filePath, os.R_OK): 577 # Package not found on the filesystem 578 raise MissingPackageError(filePath) 579 return filePath, pkgId
580 581 rpcClasses = { 582 'dump': NonAuthenticatedDumper, 583 } 584