diff options
Diffstat (limited to 'lib/thandy/download.py')
-rw-r--r-- | lib/thandy/download.py | 278 |
1 files changed, 251 insertions, 27 deletions
diff --git a/lib/thandy/download.py b/lib/thandy/download.py index a14cb48..3e4c3dc 100644 --- a/lib/thandy/download.py +++ b/lib/thandy/download.py @@ -7,12 +7,17 @@ import Queue import random import sys import threading +import time import traceback import urllib2 import thandy.util import thandy.socksurls +import thandy.checkJson +class BadCompoundData(thandy.DownloadError): + """DOCDOC""" + pass class DownloadManager: """Class to track a set of downloads and pass them out to worker threads. @@ -40,6 +45,9 @@ class DownloadManager: for t in self.threads: t.setDaemon(True) + # DOCDOC + self.statusLog = DownloadStatusLog() + #DOCDOC self._raiseMe = None @@ -95,12 +103,25 @@ class DownloadManager: def addDownloadJob(self, job): """Add another DownloadJob to the end of the work queue.""" + job.setDownloadStatusLog(self.statusLog) rp = job.getRelativePath() self._lock.acquire() self.downloads[rp] = job self._lock.release() self.downloadQueue.put(job) + def getRetryTime(self, mirrorList, relPath): + """DOCDOC""" + readyAt = None + for m in mirrorsThatSupport(mirrorList, relPath): + r = self.statusLog.getDelayTime(m['urlbase']) + if readyAt == None or r < readyAt: + readyAt = r + if readyAt != None: + return readyAt + else: + return 0 + def downloadFailed(self, mirror, relpath): """DOCDOC""" pass # Track failure; don't try the same mirror right away. @@ -113,7 +134,7 @@ class DownloadManager: success = False try: logging.info("start %s in Thread %s", rp, idx) - success = job.download() # Execute the download. + failure = job.download() # Execute the download. logging.info("end %s in Thread %s", rp, idx) finally: self._lock.acquire() @@ -124,15 +145,159 @@ class DownloadManager: finally: self._lock.release() - if success: + if failure == None: + self.statusLog.succeeded(job.getMirror(), + job.getRelativePath()) self.resultQueue.put(job._success) else: + self.statusLog.failed(failure) self.resultQueue.put(job._failure) self.done.acquire() self.done.notify() self.done.release() +class DownloadFailure: + # Broadly speaking, these errors are possible: + # + # A - The whole internet is down for us, either because our network + # connection sucks, our proxy is down, or whatever. + # - A particular mirror is down or nonfunctional. + # + # DownloadFailure.connectionFailed() + # + # B - The mirror is giving us errors we don't understand. + # - A particular mirror is missing a file we need. + # - A particular mirror served us something that was allegedly a + # file we need, but that file was no good. + # + # DownloadFailure.mirrorFailed() + # + # C - We finished a partial download and it was no good, but we + # can't tell who was at fault, because we don't know which + # part was corrupt. + # + # DownloadFailure.badCompoundFile() + # + def __init__(self, urlbase, relPath, networkError=False): + self._urlbase = urlbase + self._relPath = relPath + self._network = networkError + + self._when = time.time() + + @staticmethod + def badCompoundFile(relpath): + return DownloadFailure(None, relpath) + + @staticmethod + def mirrorFailed(urlbase, relpath): + return DownloadFailure(urlbase, relpath) + + @staticmethod + def connectionFailed(urlbase): + return DownloadFailure(urlbase, None, True) + +S = thandy.checkJson +_FAIL_SCHEMA = S.Struct([S.Int(), S.Int()], allowMore=True) +_STATUS_LOG_SCHEMA = S.Obj( + v=S.Int(), + mirrorFailures=S.DictOf(S.AnyStr(), _FAIL_SCHEMA), + networkFailures=_FAIL_SCHEMA) +del S + +class DownloadStatusLog: + """DOCDOC""" + # XXXX get smarter. + # XXXX make this persistent. + def __init__(self, mirrorFailures={}, networkFailures=[0,0]): + self._lock = threading.RLock() + # urlbase -> [ nFailures, dontTryUntil ] + self._mirrorFailures = dict(mirrorFailures) + # None, or [ nFailures, dontTryUntil ] + self._netFailure = list(networkFailures) + + def _getDelay(self, isMirror, failureCount): + if isMirror: + DELAYS = [ 0, 300, 300, 600, 900, 8400, 8400, 9000 ] + else: + DELAYS = [ 0, 10, 30, 30, 60, 300, 600, 1800, 3600, 7200 ] + + if failureCount < len(DELAYS): + return DELAYS[failureCount] + else: + return DELAYS[-1] + + def toJSON(self): + return { 'v': 1, + 'networkFailures' : self._netFailure, + 'mirrorFailures' : self._mirrorFailures } + + @staticmethod + def fromJSON(obj): + _STATUS_LOG_SCHEMA.checkMatch(obj) + return DownloadStatusLog( obj['mirrorFailures'], + obj['networkFailures'] ) + + def failed(self, failure): + self._lock.acquire() + try: + when = long(failure._when) + + # If there's a mirror to blame, blame it. + if failure._urlbase != None: + s = self._mirrorFailures.setdefault(failure._urlbase, [0, 0]) + if s[1] + 5 < when: # Two failures within 5s count as one. + s[0] += 1 + s[1] = when + + # If there is no mirror to blame, or we suspect a network error, + # blame the network too. + if failure._urlbase == None or failure._network: + s = self._netFailure + if s[1] + 5 < when: # Two failures within 5s count as one. + s[0] += 1 + s[1] = when + finally: + self._lock.release() + + def succeeded(self, urlbase, url): + self._lock.acquire() + try: + try: + del self._mirrorFailures[urlbase] + except KeyError: + pass + self._netFailure = [0, 0] + finally: + self._lock.release() + + def canRetry(self, urlbase=None, now=None): + if now == None: + now = time.time() + + d = self.getDelayTime(urlbase) + return d <= now + + def getDelayTime(self, urlbase=None): + self._lock.acquire() + try: + readyAt = 0 + + if urlbase: + status = self._mirrorFailures.get(urlbase, (0,0)) + if status[1] > readyAt: + readyAt = status[1] + self._getDelay(True, status[0]) + + if self._netFailure[1] > readyAt: + readyAt = (self._netFailure[1] + + self._getDelay(False, self._netFailure[0])) + + return readyAt + finally: + self._lock.release() + + class DownloadJob: """Abstract base class. Represents a thing to be downloaded, and the knowledge of how to download it.""" @@ -164,6 +329,10 @@ class DownloadJob: _download function downloads from.""" raise NotImplemented() + def getMirror(self): + """DOCDOC""" + return None + def getRelativePath(self): """Abstract. Returns a string representing this download, to keep two downloads of the same object from running at once. @@ -180,21 +349,34 @@ class DownloadJob: def download(self): """Main interface function: Start the download, and return - when complete. + when complete. DOCDOC return value. """ try: self._download() - return True - except (OSError, httplib.error, urllib2.HTTPError, - thandy.DownloadError), err: - # XXXXX retry on failure + return None + except BadCompoundData, err: logging.warn("Download failed: %s", err) - return False + # No way to apportion the blame. + return DownloadFailure.badCompoundFile(self.getRelativePath()) + except (urllib2.HTTPError, thandy.DownloadError), err: + # looks like we may have irreconcilable differences with a + # particular mirror. + logging.warn("Download failed: %s", err) + return DownloadFailure.mirrorFailed(self.getMirror(), + self.getRelativePath()) + except (OSError, httplib.error, IOError, urllib2.URLError), err: + logging.warn("Download failed: %s", err) + # Could be the mirror; could be the network. Hard to say. + return DownloadFailure.connectionFailed(self.getMirror()) except: tp, val, tb = sys.exc_info() - logging.warn("Internal during download: %s, %s", val, - traceback.format_exc()) - return False + logging.exception("Internal error during download: %s", val) + # We have an exception! Treat it like a network error, I guess. + return DownloadFailure.connectionFailed(None) + + def setDownloadStatusLog(self, log): + """DOCDOC""" + pass def _checkTmpFile(self): """Helper: check whether the downloaded temporary file matches @@ -202,10 +384,14 @@ class DownloadJob: if self._wantHash and not self._repoFile: gotHash = thandy.formats.getFileDigest(self._tmpPath) if gotHash != self._wantHash: - raise thandy.DownloadError("File hash was not as expected.") + raise thandy.FormatException("File hash was not as expected.") elif self._repoFile: self._repoFile.checkFile(self._tmpPath, self._wantHash) + def _removeTmpFile(self): + """Helper: remove the temporary file so that we do not get stuck in + a downloading-it-forever loop.""" + os.unlink(self._tmpPath) def _download(self): # Implementation function. Unlike download(), can throw exceptions. @@ -219,7 +405,7 @@ class DownloadJob: pass else: # What luck! This stalled file was what we wanted. - # (This happens mostly with ) + # (This happens mostly when we have an internal error.) thandy.util.ensureParentDir(self._destPath) thandy.util.moveFile(self._tmpPath, self._destPath) return @@ -236,7 +422,15 @@ class DownloadJob: else: have_length = None - f_in = getConnection(url, self._useTor, have_length) + try: + f_in = getConnection(url, self._useTor, have_length) + except urllib2.HTTPError, err: + if err.code == 416: + # We asked for a range that couldn't be satisfied. + # Usually, this means that the server thinks the file + # is shorter than we think it is. We need to start over. + self._removeTmpFile() + raise logging.info("Connected to %s", url) @@ -268,7 +462,14 @@ class DownloadJob: if f_out is not None: f_out.close() - self._checkTmpFile() + try: + self._checkTmpFile() + except (thandy.FormatException, thandy.DownloadError), err: + self._removeTmpFile() + if haveStalled: + raise BadCompoundData(err) + else: + raise thandy.util.ensureParentDir(self._destPath) thandy.util.moveFile(self._tmpPath, self._destPath) @@ -290,11 +491,28 @@ class SimpleDownloadJob(DownloadJob): def getRelativePath(self): return self._url +def mirrorsThatSupport(mirrorList, relPath, urlTypes=None, statusLog=None): + now = time.time() + for m in mirrorList['mirrors']: + if urlTypes != None: + urltype = urllib2.splittype(m['urlbase'])[0] + if urltype.lower() not in urlTypes: + continue + + if statusLog != None and not statusLog.canRetry(m['urlbase'], now): + continue + + for c in m['contents']: + if thandy.formats.rolePathMatches(c, relPath): + yield m + break + class ThandyDownloadJob(DownloadJob): """Thandy's subtype of DownloadJob: knows about mirrors, weighting, and Thandy's directory structure.""" def __init__(self, relPath, destPath, mirrorList, wantHash=None, - supportedURLTypes=None, useTor=None, repoFile=None): + supportedURLTypes=None, useTor=None, repoFile=None, + downloadStatusLog=None): DownloadJob.__init__(self, destPath, None, wantHash=wantHash, useTor=useTor, repoFile=repoFile) @@ -314,23 +532,26 @@ class ThandyDownloadJob(DownloadJob): if self._supportedURLTypes is None and useTor: self._supportedURLTypes = [ "http", "https" ] + self._usingMirror = None #DOCDOC + self._downloadStatusLog = downloadStatusLog + + def setDownloadStatusLog(self, log): + self._downloadStatusLog = log def getURL(self): usable = [] - for m in self._mirrorList['mirrors']: - for c in m['contents']: - - if self._supportedURLTypes is not None: - urltype = urllib2.splittype(m['urlbase'])[0] - if urltype.lower() not in self._supportedURLTypes: - continue + for m in mirrorsThatSupport(self._mirrorList, self._relPath, + self._supportedURLTypes, + self._downloadStatusLog): + usable.append( (m['weight'], m) ) - if thandy.formats.rolePathMatches(c, self._relPath): - usable.append( (m['weight'], m) ) - break + try: + mirror = thandy.util.randChooseWeighted(usable) + except IndexError: + raise thandy.DownloadError("No mirror supports download.") - mirror = thandy.util.randChooseWeighted(usable) + self._usingMirror = mirror['urlbase'] if m['urlbase'][-1] == '/' and self._relPath[0] == '/': return m['urlbase'] + self._relPath[1:] @@ -340,6 +561,9 @@ class ThandyDownloadJob(DownloadJob): def getRelativePath(self): return self._relPath + def getMirror(self): + return self._usingMirror + _socks_opener = thandy.socksurls.build_socks_opener() |