Package backend :: Package satellite_tools :: Module download
[hide private]
[frames] | no frames]

Source Code for Module backend.satellite_tools.download

  1  # 
  2  # Copyright (c) 2018 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15   
 16  import os 
 17  import sys 
 18  import re 
 19  import time 
 20  from Queue import Queue, Empty 
 21  from threading import Thread, Lock 
 22  try: 
 23      #  python 2 
 24      import urlparse 
 25  except ImportError: 
 26      #  python3 
 27      import urllib.parse as urlparse # pylint: disable=F0401,E0611 
 28  from urllib import quote 
 29  import pycurl 
 30  from urlgrabber.grabber import URLGrabberOptions, PyCurlFileObject, URLGrabError 
 31  from spacewalk.common.checksum import getFileChecksum 
 32  from spacewalk.common.rhnConfig import CFG, initCFG 
 33  from spacewalk.satellite_tools.syncLib import log, log2 
34 35 36 -class ProgressBarLogger:
37 - def __init__(self, msg, total):
38 self.msg = msg 39 self.total = total 40 self.status = 0 41 self.lock = Lock()
42
43 - def log(self, *_):
44 self.lock.acquire() 45 self.status += 1 46 self._print_progress_bar(self.status, self.total, prefix=self.msg, bar_length=50) 47 self.lock.release()
48 49 # from here http://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console 50 # Print iterations progress 51 @staticmethod
52 - def _print_progress_bar(iteration, total, prefix='', suffix='', decimals=2, bar_length=100):
53 """ 54 Call in a loop to create terminal progress bar 55 @params: 56 iteration - Required : current iteration (Int) 57 total - Required : total iterations (Int) 58 prefix - Optional : prefix string (Str) 59 suffix - Optional : suffix string (Str) 60 decimals - Optional : number of decimals in percent complete (Int) 61 bar_length - Optional : character length of bar (Int) 62 """ 63 filled_length = int(round(bar_length * iteration / float(total))) 64 percents = round(100.00 * (iteration / float(total)), decimals) 65 bar_char = '#' * filled_length + '-' * (bar_length - filled_length) 66 sys.stdout.write('\r%s |%s| %s%s %s' % (prefix, bar_char, percents, '%', suffix)) 67 sys.stdout.flush() 68 if iteration == total: 69 sys.stdout.write('\n') 70 sys.stdout.flush()
71
72 73 -class TextLogger:
74 - def __init__(self, _, total):
75 self.total = total 76 self.status = 0 77 self.lock = Lock()
78
79 - def log(self, success, param):
80 self.lock.acquire() 81 self.status += 1 82 if success: 83 log(0, " %d/%d : %s" % (self.status, self.total, str(param))) 84 else: 85 log2(0, 0, " %d/%d : %s (failed)" % (self.status, self.total, str(param)), stream=sys.stderr) 86 self.lock.release()
87
88 89 # Older versions of urlgrabber don't allow to set proxy parameters separately 90 # Simplified version from yumRepository class 91 -def get_proxies(proxy, user, password):
92 if not proxy: 93 return {} 94 proxy_string = proxy 95 if user: 96 auth = quote(user) 97 if password: 98 auth += ':' + quote(password) 99 proto, rest = re.match(r'(\w+://)(.+)', proxy_string).groups() 100 proxy_string = "%s%s@%s" % (proto, auth, rest) 101 proxies = {'http': proxy_string, 'https': proxy_string, 'ftp': proxy_string} 102 return proxies
103
104 105 -class PyCurlFileObjectThread(PyCurlFileObject):
106 - def __init__(self, url, filename, opts, curl_cache, parent):
107 self.curl_cache = curl_cache 108 self.parent = parent 109 PyCurlFileObject.__init__(self, url, filename, opts)
110
111 - def _do_open(self):
112 self.curl_obj = self.curl_cache 113 self.curl_obj.reset() 114 self._set_opts() 115 self._do_grab() 116 return self.fo
117
118 - def _do_perform(self):
119 # WORKAROUND - BZ #1439758 - ensure first item in queue is performed alone to properly setup NSS 120 if not self.parent.first_in_queue_done: 121 self.parent.first_in_queue_lock.acquire() 122 # If some other thread was faster, no need to block anymore 123 if self.parent.first_in_queue_done: 124 self.parent.first_in_queue_lock.release() 125 try: 126 PyCurlFileObject._do_perform(self) 127 finally: 128 if not self.parent.first_in_queue_done: 129 self.parent.first_in_queue_done = True 130 self.parent.first_in_queue_lock.release()
131
132 - def _set_opts(self, opts=None):
133 if not opts: 134 opts = {} 135 PyCurlFileObject._set_opts(self, opts=opts) 136 self.curl_obj.setopt(pycurl.FORBID_REUSE, 0) # pylint: disable=E1101
137
138 139 -class FailedDownloadError(Exception):
140 pass
141
142 143 -class DownloadThread(Thread):
144 - def __init__(self, parent, queue):
145 Thread.__init__(self) 146 self.parent = parent 147 self.queue = queue 148 # pylint: disable=E1101 149 self.curl = pycurl.Curl() 150 self.mirror = 0
151 152 @staticmethod
153 - def __is_file_done(local_path=None, file_obj=None, checksum_type=None, checksum=None):
154 if checksum_type and checksum: 155 if local_path and os.path.isfile(local_path): 156 return getFileChecksum(checksum_type, filename=local_path) == checksum 157 elif file_obj: 158 return getFileChecksum(checksum_type, file_obj=file_obj) == checksum 159 if local_path and os.path.isfile(local_path): 160 return True 161 elif file_obj: 162 return True 163 return False
164
165 - def __can_retry(self, retry, mirrors, opts, url, e):
166 retrycode = getattr(e, 'errno', None) 167 code = getattr(e, 'code', None) 168 if retry < (self.parent.retries - 1): 169 # No codes at all or some specified codes 170 # 58, 77 - Couple of curl error codes observed in multithreading on RHEL 7 - probably a bug 171 if (retrycode is None and code is None) or (retrycode in opts.retrycodes or code in [58, 77]): 172 log2(0, 2, "ERROR: Download failed: %s - %s. Retrying..." % (url, sys.exc_info()[1]), 173 stream=sys.stderr) 174 return True 175 176 # 14 - HTTP Error 177 if retry < (mirrors - 1) and retrycode == 14: 178 log2(0, 2, "ERROR: Download failed: %s - %s. Trying next mirror..." % (url, sys.exc_info()[1]), 179 stream=sys.stderr) 180 return True 181 182 log2(0, 1, "ERROR: Download failed: %s - %s." % (url, sys.exc_info()[1]), 183 stream=sys.stderr) 184 return False
185
186 - def __next_mirror(self, total):
187 if self.mirror < (total - 1): 188 self.mirror += 1 189 else: 190 self.mirror = 0
191
192 - def __fetch_url(self, params):
193 # Skip existing file if exists and matches checksum 194 if not self.parent.force: 195 if self.__is_file_done(local_path=params['target_file'], checksum_type=params['checksum_type'], 196 checksum=params['checksum']): 197 return True 198 199 opts = URLGrabberOptions(ssl_ca_cert=params['ssl_ca_cert'], ssl_cert=params['ssl_client_cert'], 200 ssl_key=params['ssl_client_key'], range=params['bytes_range'], 201 proxy=params['proxy'], username=params['proxy_username'], 202 password=params['proxy_password'], proxies=params['proxies'], 203 http_headers=tuple(params['http_headers'].items())) 204 mirrors = len(params['urls']) 205 for retry in range(max(self.parent.retries, mirrors)): 206 fo = None 207 url = urlparse.urljoin(params['urls'][self.mirror], params['relative_path']) 208 ## BEWARE: This hack is introduced in order to support SUSE SCC channels 209 ## This also needs a patched urlgrabber AFAIK 210 if 'authtoken' in params and params['authtoken']: 211 (scheme, netloc, path, query, _) = urlparse.urlsplit(params['urls'][self.mirror]) 212 url = "%s://%s%s/%s?%s" % (scheme,netloc,path,params['relative_path'],query.rstrip('/')) 213 try: 214 try: 215 fo = PyCurlFileObjectThread(url, params['target_file'], opts, self.curl, self.parent) 216 # Check target file 217 if not self.__is_file_done(file_obj=fo, checksum_type=params['checksum_type'], 218 checksum=params['checksum']): 219 raise FailedDownloadError("Target file isn't valid. Checksum should be %s (%s)." 220 % (params['checksum'], params['checksum_type'])) 221 break 222 except (FailedDownloadError, URLGrabError): 223 e = sys.exc_info()[1] 224 # urlgrabber-3.10.1-9 trows URLGrabError for both 225 # 'HTTP Error 404 - Not Found' and 'No space left on device', so 226 # workaround for this is check error message: 227 if 'No space left on device' in str(e): 228 self.parent.fail_download(e) 229 return False 230 231 if not self.__can_retry(retry, mirrors, opts, url, e): 232 return False 233 self.__next_mirror(mirrors) 234 # RHEL 6 urlgrabber raises KeyboardInterrupt for example when there is no space left 235 # but handle also other fatal exceptions 236 except (KeyboardInterrupt, Exception): # pylint: disable=W0703 237 e = sys.exc_info()[1] 238 self.parent.fail_download(e) 239 return False 240 finally: 241 if fo: 242 fo.close() 243 # Delete failed download file 244 elif os.path.isfile(params['target_file']): 245 os.unlink(params['target_file']) 246 247 return True
248
249 - def run(self):
250 while not self.queue.empty() and self.parent.can_continue(): 251 try: 252 params = self.queue.get(block=False) 253 except Empty: 254 break 255 self.mirror = 0 256 success = self.__fetch_url(params) 257 if self.parent.log_obj: 258 # log_obj must be thread-safe 259 self.parent.log_obj.log(success, os.path.basename(params['relative_path'])) 260 self.queue.task_done() 261 self.curl.close()
262
263 264 -class ThreadedDownloader:
265 - def __init__(self, retries=3, log_obj=None, force=False):
266 self.queues = {} 267 initCFG('server.satellite') 268 try: 269 self.threads = int(CFG.REPOSYNC_DOWNLOAD_THREADS) 270 except ValueError: 271 raise ValueError("Number of threads expected, found: '%s'" % CFG.REPOSYNC_DOWNLOAD_THREADS) 272 if self.threads < 1: 273 raise ValueError("Invalid number of threads: %d" % self.threads) 274 self.retries = retries 275 self.log_obj = log_obj 276 self.force = force 277 self.lock = Lock() 278 self.exception = None 279 # WORKAROUND - BZ #1439758 - ensure first item in queue is performed alone to properly setup NSS 280 self.first_in_queue_done = False 281 self.first_in_queue_lock = Lock()
282
283 - def set_log_obj(self, log_obj):
284 self.log_obj = log_obj
285
286 - def set_force(self, force):
287 self.force = force
288 289 @staticmethod
290 - def _validate(ssl_set):
291 ssl_ca_cert, ssl_cert, ssl_key = ssl_set 292 for certificate_file in (ssl_ca_cert, ssl_cert, ssl_key): 293 if certificate_file and not os.path.isfile(certificate_file): 294 log2(0, 0, "ERROR: Certificate file not found: %s" % certificate_file, stream=sys.stderr) 295 return False 296 return True
297
298 - def add(self, params):
299 ssl_set = (params['ssl_ca_cert'], params['ssl_client_cert'], params['ssl_client_key']) 300 if self._validate(ssl_set): 301 if ssl_set not in self.queues: 302 self.queues[ssl_set] = Queue() 303 queue = self.queues[ssl_set] 304 queue.put(params)
305
306 - def run(self):
307 size = 0 308 for queue in self.queues.values(): 309 size += queue.qsize() 310 if size <= 0: 311 return 312 log(1, "Downloading total %d files from %d queues." % (size, len(self.queues))) 313 314 for index, queue in enumerate(self.queues.values()): 315 log(2, "Downloading %d files from queue #%d." % (queue.qsize(), index)) 316 self.first_in_queue_done = False 317 started_threads = [] 318 for _ in range(self.threads): 319 thread = DownloadThread(self, queue) 320 thread.setDaemon(True) 321 thread.start() 322 started_threads.append(thread) 323 324 # wait to finish 325 try: 326 while any(t.isAlive() for t in started_threads): 327 time.sleep(1) 328 except KeyboardInterrupt: 329 e = sys.exc_info()[1] 330 self.fail_download(e) 331 while any(t.isAlive() for t in started_threads): 332 time.sleep(1) 333 break 334 335 # raise first detected exception if any 336 if self.exception: 337 raise self.exception # pylint: disable=E0702
338
339 - def can_continue(self):
340 self.lock.acquire() 341 status = self.exception is None 342 self.lock.release() 343 return status
344
345 - def fail_download(self, exception):
346 self.lock.acquire() 347 if not self.exception: 348 self.exception = exception 349 self.lock.release()
350