Package rhnpush :: Module connection
[hide private]
[frames] | no frames]

Source Code for Module rhnpush.connection

  1  # 
  2  # Copyright (c) 2008--2018 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  import socket 
 17  import base64 
 18  import sys 
 19  # pylint: disable=F0401,E0611,W0632 
 20  from rhn import connections, rpclib 
 21   
 22  from rhnpush.utils import tupleify_urlparse 
 23  from spacewalk.common.usix import ListType, TupleType, IntType 
 24  from spacewalk.common.rhn_pkg import InvalidPackageError, package_from_filename 
 25  from spacewalk.common.usix import raise_with_tb 
 26   
 27  if sys.version_info[0] == 3: 
 28      from urllib.parse import splitport 
 29      from urllib.parse import urlparse 
 30  else: 
 31      from urlparse import urlparse 
 32      from urllib import splitport # pylint: disable=C0412 
 33   
 34  # pylint: disable=W0622 
35 -class ConnectionError(Exception):
36 pass
37 38 # pylint: disable=R0902 39 40
41 -class BaseConnection:
42
43 - def __init__(self, uri, proxy=None):
44 self._scheme, (self._host, self._port), self._path = parse_url(uri)[:3] 45 46 if proxy: 47 arr = rpclib.get_proxy_info(proxy) 48 self._proxy_host = arr[0] 49 self._proxy_port = arr[1] 50 self._proxy_username = arr[2] 51 self._proxy_password = arr[3] 52 else: 53 self._proxy_host = None 54 55 self._trusted_certs = None 56 self._connection = None 57 self._timeout = None
58
59 - def set_timeout(self, timeout):
60 self._timeout = timeout
61
62 - def get_connection(self):
63 if self._scheme not in ['http', 'https']: 64 raise ValueError("Unsupported scheme", self._scheme) 65 params = {} 66 if self._timeout is not None: 67 params['timeout'] = self._timeout 68 if self._proxy_host: 69 params.update({ 70 'host': self._host, 71 'port': self._port, 72 'proxy': "%s:%s" % (self._proxy_host, self._proxy_port), 73 'username': self._proxy_username, 74 'password': self._proxy_password, 75 }) 76 if self._scheme == 'http': 77 return connections.HTTPProxyConnection(**params) 78 params['trusted_certs'] = self._trusted_certs 79 return connections.HTTPSProxyConnection(**params) 80 else: 81 if self._scheme == 'http': 82 return connections.HTTPConnection(self._host, self._port, **params) 83 params['trusted_certs'] = self._trusted_certs 84 return connections.HTTPSConnection(self._host, self._port, **params)
85
86 - def connect(self):
87 self._connection = self.get_connection() 88 self._connection.connect()
89
90 - def putrequest(self, method, url=None, skip_host=0):
91 if url is None: 92 url = self._path 93 return self._connection.putrequest(method, url=url, 94 skip_host=skip_host)
95
96 - def __getattr__(self, name):
97 return getattr(self._connection, name)
98 99
100 -class PackageUpload:
101 header_prefix = "X-RHN-Upload" 102 user_agent = "rhn-package-upload" 103
104 - def __init__(self, url, proxy=None):
105 self.connection = BaseConnection(url, proxy) 106 self.headers = {} 107 self.package_name = None 108 self.package_epoch = None 109 self.package_version = None 110 self.package_release = None 111 self.package_arch = None 112 self.checksum = None 113 self.checksum_type = None 114 self.nvra = None 115 self._resp_headers = None 116 self.packaging = None 117 self._response = None
118
119 - def set_header(self, name, value):
120 if name not in self.headers: 121 vlist = self.headers[name] = [] 122 else: 123 vlist = self.headers[name] 124 if not isinstance(vlist, (ListType, TupleType)): 125 vlist = [vlist] 126 vlist.append(value)
127
128 - def send_http_headers(self, method, content_length=None):
129 try: 130 self.connection.connect() 131 except socket.error: 132 e = sys.exc_info()[1] 133 raise_with_tb(ConnectionError("Error connecting", str(e)), sys.exc_info()[2]) 134 135 # Add content_length 136 if 'Content-Length' not in self.headers and \ 137 content_length is not None: 138 self.set_header('Content-Length', content_length) 139 self.connection.putrequest(method) 140 141 # Additional headers 142 for hname, hval in self.headers.items(): 143 if not isinstance(hval, (ListType, TupleType)): 144 hval = [hval] 145 146 for v in hval: 147 self.connection.putheader(str(hname), str(v)) 148 149 self.connection.endheaders()
150
151 - def send_http_body(self, stream_body):
152 if stream_body is None: 153 return 154 stream_body.seek(0, 0) 155 buffer_size = 16384 156 while 1: 157 buf = stream_body.read(buffer_size) 158 if not buf: 159 break 160 try: 161 self.connection.send(buf) 162 except IOError: 163 e = sys.exc_info()[1] 164 raise_with_tb(ConnectionError("Error sending body", str(e)), sys.exc_info()[2])
165
166 - def send_http(self, method, stream_body=None):
167 if stream_body is None: 168 content_length = 0 169 else: 170 stream_body.seek(0, 2) 171 content_length = stream_body.tell() 172 self.send_http_headers(method, content_length=content_length) 173 self.send_http_body(stream_body) 174 self._response = self.connection.getresponse() 175 self._resp_headers = self._response.msg 176 177 return self._response
178
179 - def upload(self, filename, fileChecksumType, fileChecksum):
180 """ 181 Uploads a file. 182 Returns (http_error_code, error_message) 183 Sets: 184 self.package_name 185 self.package_epoch 186 self.package_version 187 self.package_release 188 self.package_arch 189 """ 190 try: 191 a_pkg = package_from_filename(filename) 192 a_pkg.read_header() 193 except InvalidPackageError: 194 return -1, "Not an RPM: %s" % filename 195 196 # Set some package data members 197 self.package_name = a_pkg.header['name'] 198 self.package_epoch = a_pkg.header['epoch'] 199 self.package_version = a_pkg.header['version'] 200 self.package_release = a_pkg.header['release'] 201 if a_pkg.header.is_source: 202 if 1051 in a_pkg.header.keys(): 203 self.package_arch = 'nosrc' 204 else: 205 self.package_arch = 'src' 206 else: 207 self.package_arch = a_pkg.header['arch'] 208 self.packaging = a_pkg.header.packaging 209 210 nvra = [self.package_name, self.package_version, self.package_release, 211 self.package_arch] 212 213 if isinstance(nvra[3], IntType): 214 # Old rpm format 215 return -1, "Deprecated RPM format: %s" % filename 216 217 self.nvra = nvra 218 219 # use the precomputed passed checksum 220 self.checksum_type = fileChecksumType 221 self.checksum = fileChecksum 222 223 # Set headers 224 self.set_header("Content-Type", "application/x-rpm") 225 self.set_header("User-Agent", self.user_agent) 226 # Custom RHN headers 227 prefix = self.header_prefix 228 self.set_header("%s-%s" % (prefix, "Package-Name"), nvra[0]) 229 self.set_header("%s-%s" % (prefix, "Package-Version"), nvra[1]) 230 self.set_header("%s-%s" % (prefix, "Package-Release"), nvra[2]) 231 self.set_header("%s-%s" % (prefix, "Package-Arch"), nvra[3]) 232 self.set_header("%s-%s" % (prefix, "Packaging"), self.packaging) 233 if self.checksum_type == 'md5': 234 self.set_header("%s-%s" % (prefix, "File-MD5sum"), self.checksum) 235 else: 236 self.set_header("%s-%s" % (prefix, "File-Checksum-Type"), self.checksum_type) 237 self.set_header("%s-%s" % (prefix, "File-Checksum"), self.checksum) 238 239 a_pkg.input_stream.seek(0, 0) 240 self._response = self.send_http('POST', stream_body=a_pkg.input_stream) 241 a_pkg.input_stream.close() 242 243 retval = self.process_response() 244 self.connection.close() 245 return retval
246
247 - def process_response(self):
248 status = self._response.status 249 reason = self._response.reason 250 if status == 200: 251 # OK 252 return status, "OK" 253 if status == 201: 254 # Created 255 return (status, "%s %s: %s-%s-%s.%s.rpm already uploaded" % ( 256 self.checksum_type, self.checksum, 257 self.nvra[0], self.nvra[1], self.nvra[2], self.nvra[3])) 258 if status in (404, 409): 259 # Conflict 260 errstring = self.get_error_message(self._resp_headers) 261 return status, errstring 262 data = self._response.read() 263 if status == 403: 264 # In this case Authentication is no longer valid on server 265 # client needs to re-authenticate itself. 266 errstring = self.get_error_message(self._resp_headers) 267 return status, errstring 268 if status == 500: 269 print("Internal server error", status, reason) 270 errstring = self.get_error_message(self._resp_headers) 271 return status, data + errstring 272 273 return status, data
274
275 - def get_error_message(self, headers):
276 prefix = self.header_prefix + '-Error' 277 text = [x[1] for x in headers.getaddrlist(prefix + '-String')] 278 # text is a list now, convert it to a string 279 text = '\n'.join(text) 280 # pylint: disable=W1505 281 text = base64.decodestring(text) 282 return text
283 284
285 -def parse_url(url, scheme="http", path='/'):
286 _scheme, netloc, _path, params, query, fragment = tupleify_urlparse( 287 urlparse(url)) 288 if not netloc: 289 # No scheme - trying to patch it up ourselves? 290 url = scheme + "://" + url 291 _scheme, netloc, _path, params, query, fragment = tupleify_urlparse( 292 urlparse(url)) 293 294 if not netloc: 295 # XXX 296 raise Exception() 297 298 (host, port) = splitport(netloc) 299 300 if not _path: 301 _path = path 302 303 return (_scheme, (host, port), _path, params, query, fragment)
304