summaryrefslogtreecommitdiff
path: root/lib/thandy/download.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/thandy/download.py')
-rw-r--r--lib/thandy/download.py278
1 files changed, 251 insertions, 27 deletions
diff --git a/lib/thandy/download.py b/lib/thandy/download.py
index a14cb48..3e4c3dc 100644
--- a/lib/thandy/download.py
+++ b/lib/thandy/download.py
@@ -7,12 +7,17 @@ import Queue
import random
import sys
import threading
+import time
import traceback
import urllib2
import thandy.util
import thandy.socksurls
+import thandy.checkJson
+class BadCompoundData(thandy.DownloadError):
+ """DOCDOC"""
+ pass
class DownloadManager:
"""Class to track a set of downloads and pass them out to worker threads.
@@ -40,6 +45,9 @@ class DownloadManager:
for t in self.threads:
t.setDaemon(True)
+ # DOCDOC
+ self.statusLog = DownloadStatusLog()
+
#DOCDOC
self._raiseMe = None
@@ -95,12 +103,25 @@ class DownloadManager:
def addDownloadJob(self, job):
"""Add another DownloadJob to the end of the work queue."""
+ job.setDownloadStatusLog(self.statusLog)
rp = job.getRelativePath()
self._lock.acquire()
self.downloads[rp] = job
self._lock.release()
self.downloadQueue.put(job)
+ def getRetryTime(self, mirrorList, relPath):
+ """DOCDOC"""
+ readyAt = None
+ for m in mirrorsThatSupport(mirrorList, relPath):
+ r = self.statusLog.getDelayTime(m['urlbase'])
+ if readyAt == None or r < readyAt:
+ readyAt = r
+ if readyAt != None:
+ return readyAt
+ else:
+ return 0
+
def downloadFailed(self, mirror, relpath):
"""DOCDOC"""
pass # Track failure; don't try the same mirror right away.
@@ -113,7 +134,7 @@ class DownloadManager:
success = False
try:
logging.info("start %s in Thread %s", rp, idx)
- success = job.download() # Execute the download.
+ failure = job.download() # Execute the download.
logging.info("end %s in Thread %s", rp, idx)
finally:
self._lock.acquire()
@@ -124,15 +145,159 @@ class DownloadManager:
finally:
self._lock.release()
- if success:
+ if failure == None:
+ self.statusLog.succeeded(job.getMirror(),
+ job.getRelativePath())
self.resultQueue.put(job._success)
else:
+ self.statusLog.failed(failure)
self.resultQueue.put(job._failure)
self.done.acquire()
self.done.notify()
self.done.release()
+class DownloadFailure:
+ # Broadly speaking, these errors are possible:
+ #
+ # A - The whole internet is down for us, either because our network
+ # connection sucks, our proxy is down, or whatever.
+ # - A particular mirror is down or nonfunctional.
+ #
+ # DownloadFailure.connectionFailed()
+ #
+ # B - The mirror is giving us errors we don't understand.
+ # - A particular mirror is missing a file we need.
+ # - A particular mirror served us something that was allegedly a
+ # file we need, but that file was no good.
+ #
+ # DownloadFailure.mirrorFailed()
+ #
+ # C - We finished a partial download and it was no good, but we
+ # can't tell who was at fault, because we don't know which
+ # part was corrupt.
+ #
+ # DownloadFailure.badCompoundFile()
+ #
+ def __init__(self, urlbase, relPath, networkError=False):
+ self._urlbase = urlbase
+ self._relPath = relPath
+ self._network = networkError
+
+ self._when = time.time()
+
+ @staticmethod
+ def badCompoundFile(relpath):
+ return DownloadFailure(None, relpath)
+
+ @staticmethod
+ def mirrorFailed(urlbase, relpath):
+ return DownloadFailure(urlbase, relpath)
+
+ @staticmethod
+ def connectionFailed(urlbase):
+ return DownloadFailure(urlbase, None, True)
+
+S = thandy.checkJson
+_FAIL_SCHEMA = S.Struct([S.Int(), S.Int()], allowMore=True)
+_STATUS_LOG_SCHEMA = S.Obj(
+ v=S.Int(),
+ mirrorFailures=S.DictOf(S.AnyStr(), _FAIL_SCHEMA),
+ networkFailures=_FAIL_SCHEMA)
+del S
+
+class DownloadStatusLog:
+ """DOCDOC"""
+ # XXXX get smarter.
+ # XXXX make this persistent.
+ def __init__(self, mirrorFailures={}, networkFailures=[0,0]):
+ self._lock = threading.RLock()
+ # urlbase -> [ nFailures, dontTryUntil ]
+ self._mirrorFailures = dict(mirrorFailures)
+ # None, or [ nFailures, dontTryUntil ]
+ self._netFailure = list(networkFailures)
+
+ def _getDelay(self, isMirror, failureCount):
+ if isMirror:
+ DELAYS = [ 0, 300, 300, 600, 900, 8400, 8400, 9000 ]
+ else:
+ DELAYS = [ 0, 10, 30, 30, 60, 300, 600, 1800, 3600, 7200 ]
+
+ if failureCount < len(DELAYS):
+ return DELAYS[failureCount]
+ else:
+ return DELAYS[-1]
+
+ def toJSON(self):
+ return { 'v': 1,
+ 'networkFailures' : self._netFailure,
+ 'mirrorFailures' : self._mirrorFailures }
+
+ @staticmethod
+ def fromJSON(obj):
+ _STATUS_LOG_SCHEMA.checkMatch(obj)
+ return DownloadStatusLog( obj['mirrorFailures'],
+ obj['networkFailures'] )
+
+ def failed(self, failure):
+ self._lock.acquire()
+ try:
+ when = long(failure._when)
+
+ # If there's a mirror to blame, blame it.
+ if failure._urlbase != None:
+ s = self._mirrorFailures.setdefault(failure._urlbase, [0, 0])
+ if s[1] + 5 < when: # Two failures within 5s count as one.
+ s[0] += 1
+ s[1] = when
+
+ # If there is no mirror to blame, or we suspect a network error,
+ # blame the network too.
+ if failure._urlbase == None or failure._network:
+ s = self._netFailure
+ if s[1] + 5 < when: # Two failures within 5s count as one.
+ s[0] += 1
+ s[1] = when
+ finally:
+ self._lock.release()
+
+ def succeeded(self, urlbase, url):
+ self._lock.acquire()
+ try:
+ try:
+ del self._mirrorFailures[urlbase]
+ except KeyError:
+ pass
+ self._netFailure = [0, 0]
+ finally:
+ self._lock.release()
+
+ def canRetry(self, urlbase=None, now=None):
+ if now == None:
+ now = time.time()
+
+ d = self.getDelayTime(urlbase)
+ return d <= now
+
+ def getDelayTime(self, urlbase=None):
+ self._lock.acquire()
+ try:
+ readyAt = 0
+
+ if urlbase:
+ status = self._mirrorFailures.get(urlbase, (0,0))
+ if status[1] > readyAt:
+ readyAt = status[1] + self._getDelay(True, status[0])
+
+ if self._netFailure[1] > readyAt:
+ readyAt = (self._netFailure[1] +
+ self._getDelay(False, self._netFailure[0]))
+
+ return readyAt
+ finally:
+ self._lock.release()
+
+
class DownloadJob:
"""Abstract base class. Represents a thing to be downloaded, and the
knowledge of how to download it."""
@@ -164,6 +329,10 @@ class DownloadJob:
_download function downloads from."""
raise NotImplemented()
+ def getMirror(self):
+ """DOCDOC"""
+ return None
+
def getRelativePath(self):
"""Abstract. Returns a string representing this download, to
keep two downloads of the same object from running at once.
@@ -180,21 +349,34 @@ class DownloadJob:
def download(self):
"""Main interface function: Start the download, and return
- when complete.
+ when complete. DOCDOC return value.
"""
try:
self._download()
- return True
- except (OSError, httplib.error, urllib2.HTTPError,
- thandy.DownloadError), err:
- # XXXXX retry on failure
+ return None
+ except BadCompoundData, err:
logging.warn("Download failed: %s", err)
- return False
+ # No way to apportion the blame.
+ return DownloadFailure.badCompoundFile(self.getRelativePath())
+ except (urllib2.HTTPError, thandy.DownloadError), err:
+ # looks like we may have irreconcilable differences with a
+ # particular mirror.
+ logging.warn("Download failed: %s", err)
+ return DownloadFailure.mirrorFailed(self.getMirror(),
+ self.getRelativePath())
+ except (OSError, httplib.error, IOError, urllib2.URLError), err:
+ logging.warn("Download failed: %s", err)
+ # Could be the mirror; could be the network. Hard to say.
+ return DownloadFailure.connectionFailed(self.getMirror())
except:
tp, val, tb = sys.exc_info()
- logging.warn("Internal during download: %s, %s", val,
- traceback.format_exc())
- return False
+ logging.exception("Internal error during download: %s", val)
+ # We have an exception! Treat it like a network error, I guess.
+ return DownloadFailure.connectionFailed(None)
+
+ def setDownloadStatusLog(self, log):
+ """DOCDOC"""
+ pass
def _checkTmpFile(self):
"""Helper: check whether the downloaded temporary file matches
@@ -202,10 +384,14 @@ class DownloadJob:
if self._wantHash and not self._repoFile:
gotHash = thandy.formats.getFileDigest(self._tmpPath)
if gotHash != self._wantHash:
- raise thandy.DownloadError("File hash was not as expected.")
+ raise thandy.FormatException("File hash was not as expected.")
elif self._repoFile:
self._repoFile.checkFile(self._tmpPath, self._wantHash)
+ def _removeTmpFile(self):
+ """Helper: remove the temporary file so that we do not get stuck in
+ a downloading-it-forever loop."""
+ os.unlink(self._tmpPath)
def _download(self):
# Implementation function. Unlike download(), can throw exceptions.
@@ -219,7 +405,7 @@ class DownloadJob:
pass
else:
# What luck! This stalled file was what we wanted.
- # (This happens mostly with )
+ # (This happens mostly when we have an internal error.)
thandy.util.ensureParentDir(self._destPath)
thandy.util.moveFile(self._tmpPath, self._destPath)
return
@@ -236,7 +422,15 @@ class DownloadJob:
else:
have_length = None
- f_in = getConnection(url, self._useTor, have_length)
+ try:
+ f_in = getConnection(url, self._useTor, have_length)
+ except urllib2.HTTPError, err:
+ if err.code == 416:
+ # We asked for a range that couldn't be satisfied.
+ # Usually, this means that the server thinks the file
+ # is shorter than we think it is. We need to start over.
+ self._removeTmpFile()
+ raise
logging.info("Connected to %s", url)
@@ -268,7 +462,14 @@ class DownloadJob:
if f_out is not None:
f_out.close()
- self._checkTmpFile()
+ try:
+ self._checkTmpFile()
+ except (thandy.FormatException, thandy.DownloadError), err:
+ self._removeTmpFile()
+ if haveStalled:
+ raise BadCompoundData(err)
+ else:
+ raise
thandy.util.ensureParentDir(self._destPath)
thandy.util.moveFile(self._tmpPath, self._destPath)
@@ -290,11 +491,28 @@ class SimpleDownloadJob(DownloadJob):
def getRelativePath(self):
return self._url
+def mirrorsThatSupport(mirrorList, relPath, urlTypes=None, statusLog=None):
+ now = time.time()
+ for m in mirrorList['mirrors']:
+ if urlTypes != None:
+ urltype = urllib2.splittype(m['urlbase'])[0]
+ if urltype.lower() not in urlTypes:
+ continue
+
+ if statusLog != None and not statusLog.canRetry(m['urlbase'], now):
+ continue
+
+ for c in m['contents']:
+ if thandy.formats.rolePathMatches(c, relPath):
+ yield m
+ break
+
class ThandyDownloadJob(DownloadJob):
"""Thandy's subtype of DownloadJob: knows about mirrors, weighting,
and Thandy's directory structure."""
def __init__(self, relPath, destPath, mirrorList, wantHash=None,
- supportedURLTypes=None, useTor=None, repoFile=None):
+ supportedURLTypes=None, useTor=None, repoFile=None,
+ downloadStatusLog=None):
DownloadJob.__init__(self, destPath, None, wantHash=wantHash,
useTor=useTor, repoFile=repoFile)
@@ -314,23 +532,26 @@ class ThandyDownloadJob(DownloadJob):
if self._supportedURLTypes is None and useTor:
self._supportedURLTypes = [ "http", "https" ]
+ self._usingMirror = None #DOCDOC
+ self._downloadStatusLog = downloadStatusLog
+
+ def setDownloadStatusLog(self, log):
+ self._downloadStatusLog = log
def getURL(self):
usable = []
- for m in self._mirrorList['mirrors']:
- for c in m['contents']:
-
- if self._supportedURLTypes is not None:
- urltype = urllib2.splittype(m['urlbase'])[0]
- if urltype.lower() not in self._supportedURLTypes:
- continue
+ for m in mirrorsThatSupport(self._mirrorList, self._relPath,
+ self._supportedURLTypes,
+ self._downloadStatusLog):
+ usable.append( (m['weight'], m) )
- if thandy.formats.rolePathMatches(c, self._relPath):
- usable.append( (m['weight'], m) )
- break
+ try:
+ mirror = thandy.util.randChooseWeighted(usable)
+ except IndexError:
+ raise thandy.DownloadError("No mirror supports download.")
- mirror = thandy.util.randChooseWeighted(usable)
+ self._usingMirror = mirror['urlbase']
if m['urlbase'][-1] == '/' and self._relPath[0] == '/':
return m['urlbase'] + self._relPath[1:]
@@ -340,6 +561,9 @@ class ThandyDownloadJob(DownloadJob):
def getRelativePath(self):
return self._relPath
+ def getMirror(self):
+ return self._usingMirror
+
_socks_opener = thandy.socksurls.build_socks_opener()