Package backend :: Package cdn_tools :: Module candlepin_api
[hide private]
[frames] | no frames]

Source Code for Module backend.cdn_tools.candlepin_api

  1  # Copyright (c) 2017 Red Hat, Inc. 
  2  # 
  3  # This software is licensed to you under the GNU General Public License, 
  4  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  5  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  6  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  7  # along with this software; if not, see 
  8  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
  9  # 
 10  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 11  # granted to use or replicate Red Hat trademarks that are incorporated 
 12  # in this software or its documentation. 
 13  # 
 14   
 15  import os 
 16  import sys 
 17  import tempfile 
 18   
 19  import requests 
 20   
 21  from spacewalk.cdn_tools.constants import CA_CERT_PATH 
 22  from spacewalk.common.cli import getUsernamePassword 
 23  from spacewalk.common.rhnConfig import CFG 
 24  from spacewalk.satellite_tools.syncLib import log, log2 
25 26 27 -class CandlepinApi(object):
28 """Class used to communicate with Candlepin API.""" 29
30 - def __init__(self, current_manifest=None, username=None, password=None, 31 http_proxy=None, http_proxy_username=None, http_proxy_password=None):
32 self.base_url = current_manifest.get_api_url() 33 if CFG.CANDLEPIN_SERVER_API: 34 log(0, "Overriding Candlepin server to: '%s'" % CFG.CANDLEPIN_SERVER_API) 35 self.base_url = CFG.CANDLEPIN_SERVER_API 36 37 if self.base_url.startswith('https'): 38 self.protocol = 'https' 39 elif self.base_url.startswith('http'): 40 self.protocol = 'http' 41 else: 42 raise ValueError("Invalid protocol in URL: '%s'" % self.base_url) 43 44 if not self.base_url.endswith('/'): 45 self.base_url += '/' 46 47 self.current_manifest = current_manifest 48 49 # Authentication with upstream consumer certificate or with username and password 50 if self.current_manifest and self.protocol == 'https' and not username: 51 self.username = self.password = None 52 else: 53 log(0, "Candlepin login:") 54 self.username, self.password = getUsernamePassword(username, password) 55 56 self.http_proxy = http_proxy 57 self.http_proxy_username = http_proxy_username 58 self.http_proxy_password = http_proxy_password
59
60 - def _get_proxies(self):
61 proxies = {} 62 if self.http_proxy: 63 auth = "" 64 if self.http_proxy_username and self.http_proxy_password: 65 auth = "%s:%s@" % (self.http_proxy_username, self.http_proxy_password) 66 proxy_string = "http://%s%s" % (auth, self.http_proxy) 67 proxies["http"] = proxy_string 68 proxies["https"] = proxy_string 69 return proxies
70
71 - def _write_cert(self):
72 cert = None 73 if self.current_manifest: 74 credentials = self.current_manifest.get_consumer_credentials() 75 fd, cert_file = tempfile.mkstemp(prefix="/tmp/cert-") 76 fo = os.fdopen(fd, 'wb') 77 fo.write(credentials.get_cert()) 78 fo.flush() 79 fo.close() 80 81 fd, key_file = tempfile.mkstemp(prefix="/tmp/key-") 82 fo = os.fdopen(fd, 'wb') 83 fo.write(credentials.get_key()) 84 fo.flush() 85 fo.close() 86 cert = (cert_file, key_file) 87 return cert
88 89 @staticmethod
90 - def _delete_cert(cert):
91 if cert is not None: 92 cert_file, key_file = cert 93 if cert_file: 94 os.unlink(cert_file) 95 if key_file: 96 os.unlink(key_file)
97
98 - def _call_api(self, url, params=None, method="get"):
99 if self.protocol == 'https': 100 verify = CA_CERT_PATH 101 else: 102 verify = False 103 104 response = None 105 if self.username is not None and self.password is not None: 106 try: 107 if method == "get": 108 response = requests.get(url, params=params, proxies=self._get_proxies(), 109 auth=(self.username, self.password), verify=verify) 110 elif method == "put": 111 response = requests.put(url, params=params, proxies=self._get_proxies(), 112 auth=(self.username, self.password), verify=verify) 113 else: 114 raise ValueError("Unsupported method: '%s'" % method) 115 except requests.RequestException: 116 e = sys.exc_info()[1] 117 log2(0, 0, "ERROR: %s" % str(e), stream=sys.stderr) 118 else: 119 cert = self._write_cert() 120 try: 121 try: 122 if method == "get": 123 response = requests.get(url, params=params, proxies=self._get_proxies(), 124 verify=verify, cert=cert) 125 elif method == "put": 126 response = requests.put(url, params=params, proxies=self._get_proxies(), 127 verify=verify, cert=cert) 128 else: 129 raise ValueError("Unsupported method: '%s'" % method) 130 except requests.RequestException: 131 e = sys.exc_info()[1] 132 log2(0, 0, "ERROR: %s" % str(e), stream=sys.stderr) 133 finally: 134 self._delete_cert(cert) 135 return response
136
137 - def export_manifest(self, uuid=None, ownerid=None, satellite_version=None):
138 """Performs export request to Candlepin API and saves exported manifest to target file. 139 Can take required parameters from current manifest or override them with parameters of this method.""" 140 if uuid is None: 141 if self.current_manifest: 142 uuid = self.current_manifest.get_uuid() 143 else: 144 raise ValueError("Uuid is not known.") 145 if ownerid is None: 146 if self.current_manifest: 147 ownerid = self.current_manifest.get_ownerid() 148 else: 149 raise ValueError("Ownerid is not known.") 150 if satellite_version is None: 151 if self.current_manifest: 152 satellite_version = self.current_manifest.get_satellite_version() 153 else: 154 raise ValueError("Satellite version is not known.") 155 156 url = "%s%s/export" % (self.base_url, uuid) 157 params = {"ext": ["ownerid:%s" % ownerid, "version:%s" % satellite_version]} 158 159 log(1, "URL: '%s'" % url) 160 log(1, "Parameters: '%s'" % str(params)) 161 162 response = self._call_api(url, params=params, method="get") 163 164 if response is not None: 165 # pylint: disable=E1101 166 if response.status_code == requests.codes.ok: 167 fd, downloaded_manifest = tempfile.mkstemp(prefix="/tmp/manifest-", suffix=".zip") 168 fo = os.fdopen(fd, 'wb') 169 for chunk in response: 170 fo.write(chunk) 171 fo.flush() 172 fo.close() 173 return downloaded_manifest 174 else: 175 log2(0, 0, "Status code: %s" % response.status_code, stream=sys.stderr) 176 log2(0, 0, "Message: '%s'" % response.text, stream=sys.stderr) 177 178 return None
179
180 - def refresh_manifest(self, uuid=None):
181 if uuid is None: 182 if self.current_manifest: 183 uuid = self.current_manifest.get_uuid() 184 else: 185 raise ValueError("Uuid is not known.") 186 187 url = "%s%s/certificates" % (self.base_url, uuid) 188 189 log(1, "URL: '%s'" % url) 190 191 response = self._call_api(url, method="put") 192 193 if response is not None: 194 # pylint: disable=E1101 195 if response.status_code == requests.codes.ok or response.status_code == requests.codes.no_content: 196 return True 197 else: 198 log2(0, 0, "Status code: %s" % response.status_code, stream=sys.stderr) 199 log2(0, 0, "Message: '%s'" % response.text, stream=sys.stderr) 200 201 return False
202