Package backend :: Package common :: Module rhn_pkg
[hide private]
[frames] | no frames]

Source Code for Module backend.common.rhn_pkg

  1  # 
  2  # Copyright (c) 2008--2016 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  import os 
 17  from rhn.i18n import bstr 
 18  from spacewalk.common import checksum 
19 20 -def get_package_header(filename=None, file_obj=None, fd=None):
21 # pylint: disable=E1103 22 if filename is not None: 23 stream = open(filename, mode='rb') 24 need_close = True 25 elif file_obj is not None: 26 stream = file_obj 27 else: 28 stream = os.fdopen(os.dup(fd), "r") 29 need_close = True 30 31 if stream.name.endswith('.deb'): 32 packaging = 'deb' 33 elif stream.name.endswith('.rpm'): 34 packaging = 'rpm' 35 else: 36 packaging = 'mpm' 37 38 a_pkg = package_from_stream(stream, packaging) 39 a_pkg.read_header() 40 if need_close: 41 stream.close() 42 return a_pkg.header
43
44 45 -def package_from_stream(stream, packaging):
46 if packaging == 'deb': 47 from spacewalk.common import rhn_deb 48 a_pkg = rhn_deb.DEB_Package(stream) 49 elif packaging == 'rpm': 50 from spacewalk.common import rhn_rpm 51 a_pkg = rhn_rpm.RPM_Package(stream) 52 elif packaging == 'mpm': 53 from spacewalk.common import rhn_mpm 54 a_pkg = rhn_mpm.MPM_Package(stream) 55 else: 56 a_pkg = None 57 return a_pkg
58
59 60 -def package_from_filename(filename):
61 if filename.endswith('.deb'): 62 packaging = 'deb' 63 elif filename.endswith('.rpm') or filename.endswith('.hdr'): 64 packaging = 'rpm' 65 else: 66 packaging = 'mpm' 67 stream = open(filename, mode='rb') 68 return package_from_stream(stream, packaging)
69 70 BUFFER_SIZE = 16384 71 DEFAULT_CHECKSUM_TYPE = 'md5'
72 73 74 -class A_Package:
75 76 """virtual class that implements shared methods for RPM/MPM/DEB package object""" 77 # pylint: disable=R0902 78
79 - def __init__(self, input_stream=None):
80 self.header = None 81 self.header_start = 0 82 self.header_end = 0 83 self.input_stream = input_stream 84 self.checksum_type = DEFAULT_CHECKSUM_TYPE 85 self.checksum = None 86 self.payload_stream = None 87 self.payload_size = None
88
89 - def read_header(self):
90 """reads header from self.input_file""" 91 pass
92
93 - def save_payload(self, output_stream):
94 """saves payload to output_stream""" 95 c_hash = checksum.getHashlibInstance(self.checksum_type, False) 96 if output_stream: 97 output_start = output_stream.tell() 98 self._stream_copy(self.input_stream, output_stream, c_hash) 99 self.checksum = c_hash.hexdigest() 100 if output_stream: 101 self.payload_stream = output_stream 102 self.payload_size = output_stream.tell() - output_start
103
104 - def payload_checksum(self):
105 # just read and compute checksum 106 start = self.input_stream.tell() 107 self.save_payload(None) 108 self.payload_size = self.input_stream.tell() - start + self.header_end 109 self.payload_stream = self.input_stream
110 111 @staticmethod
112 - def _stream_copy(source, dest, c_hash=None):
113 """copies data from the source stream to the destination stream""" 114 while True: 115 buf = source.read(BUFFER_SIZE) 116 if not buf: 117 break 118 if dest: 119 dest.write(buf) 120 if c_hash: 121 c_hash.update(buf)
122 123 @staticmethod
124 - def _read_bytes(stream, amt):
125 ret = bstr('') 126 while amt: 127 buf = stream.read(min(amt, BUFFER_SIZE)) 128 if not buf: 129 return ret 130 ret = ret + buf 131 amt = amt - len(buf) 132 return ret
133
134 135 -class InvalidPackageError(Exception):
136 pass
137