summaryrefslogtreecommitdiff
path: root/lib/thandy/download.py
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2008-11-16 21:09:48 +0000
committerNick Mathewson <nickm@torproject.org>2008-11-16 21:09:48 +0000
commitf92241888b4dbf4616fa41aab8a92c1c6b37bfbb (patch)
tree3f4b48dc4287c10cf0cd1b8910e9504036904c65 /lib/thandy/download.py
parent9ff48fb74625ea1d1e78f7c79c7463e27974a5c2 (diff)
Add main-thread callbacks for download success/failure.
git-svn-id: file:///home/or/svnrepo/updater/trunk@17292 55e972cd-5a19-0410-ae62-a4d7a52db4cd
Diffstat (limited to 'lib/thandy/download.py')
-rw-r--r--lib/thandy/download.py44
1 files changed, 35 insertions, 9 deletions
diff --git a/lib/thandy/download.py b/lib/thandy/download.py
index 73226c7..ef7f475 100644
--- a/lib/thandy/download.py
+++ b/lib/thandy/download.py
@@ -28,6 +28,9 @@ class DownloadManager:
# Work queue of DownloadJobs that we intend to process once a thread
# is free.
self.downloadQueue = Queue.Queue()
+ # DOCDOC
+ self.resultQueue = Queue.Queue()
+
# List of worker threads.
self.threads = [ threading.Thread(target=self._thread, args=[idx])
for idx in xrange(n_threads) ]
@@ -66,19 +69,23 @@ class DownloadManager:
"""Return true iff we have no active or pending jobs."""
self._lock.acquire()
try:
- return downloadQueue.empty() and len(self.downloads) == 0
+ return self.downloadQueue.empty() and len(self.downloads) == 0
finally:
self._lock.release()
def wait(self):
"""Pause until we have no active or pending jobs."""
- while True:
+ while not self.finished():
self.done.acquire()
self.done.wait()
self.done.release()
- if self.finished():
- break
+ try:
+ while True:
+ item = self.resultQueue.get(block=False)
+ item()
+ except Queue.Empty:
+ pass
def addDownloadJob(self, job):
"""Add another DownloadJob to the end of the work queue."""
@@ -97,6 +104,7 @@ class DownloadManager:
while True:
job = self.downloadQueue.get() # Grab job from queue.
rp = job.getRelativePath()
+ success = False
try:
logging.info("start %s in Thread %s", rp, idx)
success = job.download() # Execute the download.
@@ -110,6 +118,11 @@ class DownloadManager:
finally:
self._lock.release()
+ if success:
+ self.resultQueue.put(job._success)
+ else:
+ self.resultQueue.put(job._failure)
+
self.done.acquire()
self.done.notify()
self.done.release()
@@ -129,6 +142,14 @@ class DownloadJob:
self._wantHash = wantHash
self._useTor = useTor
+ self._success = lambda : None
+ self._failure = lambda : None
+
+ def setCallbacks(self, success, failure):
+ """DOCDOC"""
+ self._success = success
+ self._failure = failure
+
def getURL(self):
"""Abstract implementation helper. Returns the URL that the
_download function downloads from."""
@@ -155,7 +176,8 @@ class DownloadJob:
try:
self._download()
return True
- except (OSError, thandy.DownloadError), err:
+ except (OSError, httplib.error, urllib2.HTTPError,
+ thandy.DownloadError), err:
# XXXXX retry on failure
logging.warn("Download failed: %s", err)
return False
@@ -165,7 +187,6 @@ class DownloadJob:
traceback.format_exc())
sys.exit(1)
-
def _download(self):
# Implementation function. Unlike download(), can throw exceptions.
f_in = f_out = None
@@ -177,7 +198,8 @@ class DownloadJob:
if self.haveStalledFile():
have_length = os.stat(self._tmpPath).st_size
- print "Have stalled file with %s bytes"%have_length
+ logging.info("Have stalled file for %s with %s bytes", url,
+ have_length)
else:
have_length = None
@@ -218,6 +240,7 @@ class DownloadJob:
if gotHash != self._wantHash:
raise thandy.DownloadError("File hash was not as expected.")
+ thandy.util.ensureParentDir(self._destPath)
thandy.util.moveFile(self._tmpPath, self._destPath)
@@ -245,7 +268,7 @@ class ThandyDownloadJob(DownloadJob):
DownloadJob.__init__(self, destPath, None, wantHash=wantHash,
useTor=useTor)
- self._mirrorList = mirrorList[:]
+ self._mirrorList = mirrorList
self._relPath = relPath
tmppath = thandy.util.userFilename("tmp")
@@ -279,7 +302,10 @@ class ThandyDownloadJob(DownloadJob):
mirror = thandy.util.randChooseWeighted(usable)
- return m['urlbase'] + self._relPath
+ if m['urlbase'][-1] == '/' and self._relPath[0] == '/':
+ return m['urlbase'] + self._relPath[1:]
+ else:
+ return m['urlbase'] + self._relPath
def getRelativePath(self):
return self._relPath