Package backend :: Package common :: Module rhn_deb
[hide private]
[frames] | no frames]

Source Code for Module backend.common.rhn_deb

  1  # 
  2  # Copyright (c) 2010--2016 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15  # 
 16  # Meta-package manager 
 17  # 
 18  # Author: Lukas Durfina <lukas.durfina@gmail.com> 
 19   
 20  import sys 
 21  import tempfile 
 22   
 23  from debian import debfile 
 24   
 25  from spacewalk.common.usix import raise_with_tb 
 26  from spacewalk.common import checksum 
 27  from spacewalk.common.rhn_pkg import A_Package, InvalidPackageError 
 28   
 29  # bare-except and broad-except 
 30  # pylint: disable=W0702,W0703 
 31   
 32  DEB_CHECKSUM_TYPE = 'sha256'       # FIXME: this should be a configuration option 
33 34 35 -class deb_Header:
36 37 "Wrapper class for an deb header - we need to store a flag is_source" 38
39 - def __init__(self, stream):
40 self.packaging = 'deb' 41 self.signatures = [] 42 self.is_source = 0 43 self.deb = None 44 45 try: 46 self.deb = debfile.DebFile(stream.name) 47 except Exception: 48 e = sys.exc_info()[1] 49 raise_with_tb(InvalidPackageError(e), sys.exc_info()[2]) 50 51 try: 52 # Fill info about package 53 debcontrol = self.deb.debcontrol() 54 self.hdr = { 55 'name': debcontrol.get_as_string('Package'), 56 'arch': debcontrol.get_as_string('Architecture') + '-deb', 57 'summary': debcontrol.get_as_string('Description').splitlines()[0], 58 'vendor': debcontrol.get_as_string('Maintainer'), 59 'package_group': debcontrol.get_as_string('Section'), 60 'epoch': '', 61 'version': 0, 62 'release': 0, 63 'description': debcontrol.get_as_string('Description'), 64 } 65 for hdr_k, deb_k in [('requires', 'Depends'), 66 ('provides', 'Provides'), 67 ('conflicts', 'Conflicts'), 68 ('obsoletes', 'Replaces'), 69 ('recommends', 'Recommends'), 70 ('suggests', 'Suggests'), 71 ('breaks', 'Breaks'), 72 ('predepends', 'Pre-Depends'), 73 ('payload_size', 'Installed-Size')]: 74 if deb_k in debcontrol: 75 self.hdr[hdr_k] = debcontrol.get_as_string(deb_k) 76 for k in debcontrol.keys(): 77 if k not in self.hdr: 78 self.hdr[k] = debcontrol.get_as_string(k) 79 80 version = debcontrol.get_as_string('Version') 81 if version.find(':') != -1: 82 self.hdr['epoch'], version = version.split(':') 83 self.hdr['version'] = version 84 if version.find('-') != -1: 85 version_tmpArr = version.split('-') 86 self.hdr['version'] = '-'.join(version_tmpArr[:-1]) 87 self.hdr['release'] = version_tmpArr[-1] 88 else: 89 self.hdr['version'] = version 90 self.hdr['release'] = 'X' 91 except Exception: 92 e = sys.exc_info()[1] 93 raise_with_tb(InvalidPackageError(e), sys.exc_info()[2])
94 95 @staticmethod
96 - def checksum_type():
97 return DEB_CHECKSUM_TYPE
98 99 @staticmethod
100 - def is_signed():
101 return 0
102
103 - def __getitem__(self, name):
104 return self.hdr.get(str(name))
105
106 - def __setitem__(self, name, item):
107 self.hdr[name] = item
108
109 - def __delitem__(self, name):
110 del self.hdr[name]
111
112 - def __getattr__(self, name):
113 return getattr(self.hdr, name)
114
115 - def __len__(self):
116 return len(self.hdr)
117
118 119 -class DEB_Package(A_Package):
120
121 - def __init__(self, input_stream=None):
122 A_Package.__init__(self, input_stream) 123 self.header_data = tempfile.NamedTemporaryFile() 124 self.checksum_type = DEB_CHECKSUM_TYPE
125
126 - def read_header(self):
127 self._stream_copy(self.input_stream, self.header_data) 128 self.header_end = self.header_data.tell() 129 try: 130 self.header_data.seek(0, 0) 131 self.header = deb_Header(self.header_data) 132 except: 133 raise_with_tb(InvalidPackageError, sys.exc_info()[2])
134
135 - def save_payload(self, output_stream):
136 c_hash = checksum.getHashlibInstance(self.checksum_type, False) 137 if output_stream: 138 output_start = output_stream.tell() 139 self._stream_copy(self.header_data, output_stream, c_hash) 140 self.checksum = c_hash.hexdigest() 141 if output_stream: 142 self.payload_stream = output_stream 143 self.payload_size = output_stream.tell() - output_start
144