Package backend :: Package satellite_tools :: Module connection
[hide private]
[frames] | no frames]

Source Code for Module backend.satellite_tools.connection

  1  # 
  2  # Module that provides the client-side functionality for an XML importer 
  3  # 
  4  # Copyright (c) 2008--2016 Red Hat, Inc. 
  5  # 
  6  # This software is licensed to you under the GNU General Public License, 
  7  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  8  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  9  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
 10  # along with this software; if not, see 
 11  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 12  # 
 13  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 14  # granted to use or replicate Red Hat trademarks that are incorporated 
 15  # in this software or its documentation. 
 16  # 
 17  # 
 18   
 19  import gzipstream 
 20   
 21  from rhn import rpclib 
 22  from spacewalk.satellite_tools import constants 
 23   
 24  __version__ = "0.1" 
 25   
 26   
27 -class Transport(rpclib.transports.Transport):
28 user_agent = "satellite-sync/%s" % __version__ 29
30 - def __init__(self, timeout=None):
31 if timeout: 32 rpclib.transports.Transport.__init__(self, timeout=timeout) 33 else: 34 rpclib.transports.Transport.__init__(self) 35 self.add_header("Accept-Encoding", "gzip")
36
37 - def _process_response(self, fd, connection):
38 # Content-Type defaults to txt/xml 39 content_type = self.headers_in.get('Content-Type', 'text/xml') 40 content_encoding = self.headers_in.get('Content-Encoding') 41 42 if content_encoding == 'gzip': 43 # Un-gzipstream it 44 # NOTE: if data expected to get bigger than ~2.5Gb in size 45 # then use GzipStreamXL instead (it's slower though) 46 fd = CompressedStream(fd) 47 48 if content_type == 'text/xml': 49 # XML-RPC error 50 # Catch exceptions so we can properly close file descriptors 51 try: 52 ret = self.parse_response(fd) 53 except: 54 fd.close() 55 connection.close() 56 raise 57 fd.close() 58 connection.close() 59 return ret 60 61 # XXX application/octet-stream should go away 62 if content_type in ('application/xml', 'application/octet-stream', 63 'application/x-rpm'): 64 f = rpclib.transports.File(fd) 65 # Explanation copied from the base class' method (rhn.transports): 66 # Set the File's close method to the connection's 67 # Note that calling the HTTPResponse's close() is not enough, 68 # since the main socket would remain open, and this is 69 # particularily bad with SSL 70 f.close = connection.close 71 return f 72 73 connection.close() 74 raise Exception("Unknown response type: " + content_type)
75 76
77 -class SafeTransport(rpclib.transports.SafeTransport, Transport):
78 _process_response = Transport._process_response
79 80
81 -class ProxyTransport(rpclib.transports.ProxyTransport, Transport):
82 _process_response = Transport._process_response
83 84
85 -class SafeProxyTransport(rpclib.transports.SafeProxyTransport, Transport):
86 _process_response = Transport._process_response
87 88
89 -class _Server(rpclib.Server):
90 _transport_class = Transport 91 _transport_class_https = SafeTransport 92 _transport_class_proxy = ProxyTransport 93 _transport_class_https_proxy = SafeProxyTransport 94
95 - def use_CA_chain(self, ca_chain=None):
96 pass
97 98
99 -class StreamConnection(_Server):
100
101 - def __init__(self, uri, proxy=None, username=None, password=None, 102 refreshCallback=None, xml_dump_version=constants.PROTOCOL_VERSION, 103 timeout=None):
104 _Server.__init__(self, uri, proxy=proxy, username=username, 105 password=password, refreshCallback=refreshCallback, timeout=timeout) 106 self.add_header("X-RHN-Satellite-XML-Dump-Version", xml_dump_version)
107 108
109 -class GETServer(rpclib.GETServer):
110 111 """ class rpclib.GETServer with overriden default transports classes """ 112 _transport_class = Transport 113 _transport_class_https = SafeTransport 114 _transport_class_proxy = ProxyTransport 115 _transport_class_https_proxy = SafeProxyTransport 116
117 - def __init__(self, uri, transport=None, proxy=None, username=None, 118 password=None, client_version=2, headers=None, refreshCallback=None, 119 progressCallback=None, xml_dump_version=constants.PROTOCOL_VERSION, 120 timeout=None):
121 if headers is None: 122 headers = {} 123 rpclib.GETServer.__init__(self, uri, 124 transport=transport, 125 proxy=proxy, 126 username=username, 127 password=password, 128 client_version=client_version, 129 headers=headers, 130 refreshCallback=refreshCallback, 131 timeout=timeout) 132 self.add_header("X-RHN-Satellite-XML-Dump-Version", xml_dump_version)
133
134 - def use_CA_chain(self, ca_chain=None):
135 pass
136 137
138 -class CompressedStream:
139 140 """ 141 GzipStream will not close the connection by itself, so we have to keep the 142 underlying stream around 143 """ 144
145 - def __init__(self, stream):
146 def noop(): 147 pass
148 self._real_stream = stream 149 # gzipstream tries to flush stuff; add a noop function 150 self._real_stream.flush = noop 151 self.stream = self._real_stream 152 if not isinstance(self._real_stream, gzipstream.GzipStream): 153 self.stream = gzipstream.GzipStream(stream=self._real_stream, mode="r")
154
155 - def close(self):
156 if self.stream: 157 self.stream.close() 158 self.stream = None 159 if self._real_stream: 160 self._real_stream.close() 161 self._real_stream = None
162
163 - def __getattr__(self, name):
164 return getattr(self.stream, name)
165
166 - def __repr__(self):
167 return "<_CompressedStream at %s>" % id(self)
168 169 if __name__ == '__main__': 170 pass 171