1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import os
17 import sys
18 import re
19 import time
20 from Queue import Queue, Empty
21 from threading import Thread, Lock
22 try:
23
24 import urlparse
25 except ImportError:
26
27 import urllib.parse as urlparse
28 from urllib import quote
29 import pycurl
30 from urlgrabber.grabber import URLGrabberOptions, PyCurlFileObject, URLGrabError
31 from spacewalk.common.checksum import getFileChecksum
32 from spacewalk.common.rhnConfig import CFG, initCFG
33 from spacewalk.satellite_tools.syncLib import log, log2
38 self.msg = msg
39 self.total = total
40 self.status = 0
41 self.lock = Lock()
42
48
49
50
51 @staticmethod
53 """
54 Call in a loop to create terminal progress bar
55 @params:
56 iteration - Required : current iteration (Int)
57 total - Required : total iterations (Int)
58 prefix - Optional : prefix string (Str)
59 suffix - Optional : suffix string (Str)
60 decimals - Optional : number of decimals in percent complete (Int)
61 bar_length - Optional : character length of bar (Int)
62 """
63 filled_length = int(round(bar_length * iteration / float(total)))
64 percents = round(100.00 * (iteration / float(total)), decimals)
65 bar_char = '#' * filled_length + '-' * (bar_length - filled_length)
66 sys.stdout.write('\r%s |%s| %s%s %s' % (prefix, bar_char, percents, '%', suffix))
67 sys.stdout.flush()
68 if iteration == total:
69 sys.stdout.write('\n')
70 sys.stdout.flush()
71
74 - def __init__(self, _, total):
75 self.total = total
76 self.status = 0
77 self.lock = Lock()
78
79 - def log(self, success, param):
80 self.lock.acquire()
81 self.status += 1
82 if success:
83 log(0, " %d/%d : %s" % (self.status, self.total, str(param)))
84 else:
85 log2(0, 0, " %d/%d : %s (failed)" % (self.status, self.total, str(param)), stream=sys.stderr)
86 self.lock.release()
87
92 if not proxy:
93 return {}
94 proxy_string = proxy
95 if user:
96 auth = quote(user)
97 if password:
98 auth += ':' + quote(password)
99 proto, rest = re.match(r'(\w+://)(.+)', proxy_string).groups()
100 proxy_string = "%s%s@%s" % (proto, auth, rest)
101 proxies = {'http': proxy_string, 'https': proxy_string, 'ftp': proxy_string}
102 return proxies
103
106 - def __init__(self, url, filename, opts, curl_cache, parent):
107 self.curl_cache = curl_cache
108 self.parent = parent
109 PyCurlFileObject.__init__(self, url, filename, opts)
110
112 self.curl_obj = self.curl_cache
113 self.curl_obj.reset()
114 self._set_opts()
115 self._do_grab()
116 return self.fo
117
131
133 if not opts:
134 opts = {}
135 PyCurlFileObject._set_opts(self, opts=opts)
136 self.curl_obj.setopt(pycurl.FORBID_REUSE, 0)
137
141
145 Thread.__init__(self)
146 self.parent = parent
147 self.queue = queue
148
149 self.curl = pycurl.Curl()
150 self.mirror = 0
151
152 @staticmethod
153 - def __is_file_done(local_path=None, file_obj=None, checksum_type=None, checksum=None):
164
166 retrycode = getattr(e, 'errno', None)
167 code = getattr(e, 'code', None)
168 if retry < (self.parent.retries - 1):
169
170
171 if (retrycode is None and code is None) or (retrycode in opts.retrycodes or code in [58, 77]):
172 log2(0, 2, "ERROR: Download failed: %s - %s. Retrying..." % (url, sys.exc_info()[1]),
173 stream=sys.stderr)
174 return True
175
176
177 if retry < (mirrors - 1) and retrycode == 14:
178 log2(0, 2, "ERROR: Download failed: %s - %s. Trying next mirror..." % (url, sys.exc_info()[1]),
179 stream=sys.stderr)
180 return True
181
182 log2(0, 1, "ERROR: Download failed: %s - %s." % (url, sys.exc_info()[1]),
183 stream=sys.stderr)
184 return False
185
187 if self.mirror < (total - 1):
188 self.mirror += 1
189 else:
190 self.mirror = 0
191
193
194 if not self.parent.force:
195 if self.__is_file_done(local_path=params['target_file'], checksum_type=params['checksum_type'],
196 checksum=params['checksum']):
197 return True
198
199 opts = URLGrabberOptions(ssl_ca_cert=params['ssl_ca_cert'], ssl_cert=params['ssl_client_cert'],
200 ssl_key=params['ssl_client_key'], range=params['bytes_range'],
201 proxy=params['proxy'], username=params['proxy_username'],
202 password=params['proxy_password'], proxies=params['proxies'],
203 http_headers=tuple(params['http_headers'].items()))
204 mirrors = len(params['urls'])
205 for retry in range(max(self.parent.retries, mirrors)):
206 fo = None
207 url = urlparse.urljoin(params['urls'][self.mirror], params['relative_path'])
208
209
210 if 'authtoken' in params and params['authtoken']:
211 (scheme, netloc, path, query, _) = urlparse.urlsplit(params['urls'][self.mirror])
212 url = "%s://%s%s/%s?%s" % (scheme,netloc,path,params['relative_path'],query.rstrip('/'))
213 try:
214 try:
215 fo = PyCurlFileObjectThread(url, params['target_file'], opts, self.curl, self.parent)
216
217 if not self.__is_file_done(file_obj=fo, checksum_type=params['checksum_type'],
218 checksum=params['checksum']):
219 raise FailedDownloadError("Target file isn't valid. Checksum should be %s (%s)."
220 % (params['checksum'], params['checksum_type']))
221 break
222 except (FailedDownloadError, URLGrabError):
223 e = sys.exc_info()[1]
224
225
226
227 if 'No space left on device' in str(e):
228 self.parent.fail_download(e)
229 return False
230
231 if not self.__can_retry(retry, mirrors, opts, url, e):
232 return False
233 self.__next_mirror(mirrors)
234
235
236 except (KeyboardInterrupt, Exception):
237 e = sys.exc_info()[1]
238 self.parent.fail_download(e)
239 return False
240 finally:
241 if fo:
242 fo.close()
243
244 elif os.path.isfile(params['target_file']):
245 os.unlink(params['target_file'])
246
247 return True
248
250 while not self.queue.empty() and self.parent.can_continue():
251 try:
252 params = self.queue.get(block=False)
253 except Empty:
254 break
255 self.mirror = 0
256 success = self.__fetch_url(params)
257 if self.parent.log_obj:
258
259 self.parent.log_obj.log(success, os.path.basename(params['relative_path']))
260 self.queue.task_done()
261 self.curl.close()
262
265 - def __init__(self, retries=3, log_obj=None, force=False):
266 self.queues = {}
267 initCFG('server.satellite')
268 try:
269 self.threads = int(CFG.REPOSYNC_DOWNLOAD_THREADS)
270 except ValueError:
271 raise ValueError("Number of threads expected, found: '%s'" % CFG.REPOSYNC_DOWNLOAD_THREADS)
272 if self.threads < 1:
273 raise ValueError("Invalid number of threads: %d" % self.threads)
274 self.retries = retries
275 self.log_obj = log_obj
276 self.force = force
277 self.lock = Lock()
278 self.exception = None
279
280 self.first_in_queue_done = False
281 self.first_in_queue_lock = Lock()
282
284 self.log_obj = log_obj
285
288
289 @staticmethod
291 ssl_ca_cert, ssl_cert, ssl_key = ssl_set
292 for certificate_file in (ssl_ca_cert, ssl_cert, ssl_key):
293 if certificate_file and not os.path.isfile(certificate_file):
294 log2(0, 0, "ERROR: Certificate file not found: %s" % certificate_file, stream=sys.stderr)
295 return False
296 return True
297
298 - def add(self, params):
299 ssl_set = (params['ssl_ca_cert'], params['ssl_client_cert'], params['ssl_client_key'])
300 if self._validate(ssl_set):
301 if ssl_set not in self.queues:
302 self.queues[ssl_set] = Queue()
303 queue = self.queues[ssl_set]
304 queue.put(params)
305
307 size = 0
308 for queue in self.queues.values():
309 size += queue.qsize()
310 if size <= 0:
311 return
312 log(1, "Downloading total %d files from %d queues." % (size, len(self.queues)))
313
314 for index, queue in enumerate(self.queues.values()):
315 log(2, "Downloading %d files from queue #%d." % (queue.qsize(), index))
316 self.first_in_queue_done = False
317 started_threads = []
318 for _ in range(self.threads):
319 thread = DownloadThread(self, queue)
320 thread.setDaemon(True)
321 thread.start()
322 started_threads.append(thread)
323
324
325 try:
326 while any(t.isAlive() for t in started_threads):
327 time.sleep(1)
328 except KeyboardInterrupt:
329 e = sys.exc_info()[1]
330 self.fail_download(e)
331 while any(t.isAlive() for t in started_threads):
332 time.sleep(1)
333 break
334
335
336 if self.exception:
337 raise self.exception
338
344
350