From f92241888b4dbf4616fa41aab8a92c1c6b37bfbb Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Sun, 16 Nov 2008 21:09:48 +0000 Subject: Add main-thread callbacks for download success/failure. git-svn-id: file:///home/or/svnrepo/updater/trunk@17292 55e972cd-5a19-0410-ae62-a4d7a52db4cd --- lib/thandy/download.py | 44 +++++++++++++++++++++++++++++++++++--------- lib/thandy/util.py | 9 +++++++-- 2 files changed, 42 insertions(+), 11 deletions(-) (limited to 'lib') diff --git a/lib/thandy/download.py b/lib/thandy/download.py index 73226c7..ef7f475 100644 --- a/lib/thandy/download.py +++ b/lib/thandy/download.py @@ -28,6 +28,9 @@ class DownloadManager: # Work queue of DownloadJobs that we intend to process once a thread # is free. self.downloadQueue = Queue.Queue() + # DOCDOC + self.resultQueue = Queue.Queue() + # List of worker threads. self.threads = [ threading.Thread(target=self._thread, args=[idx]) for idx in xrange(n_threads) ] @@ -66,19 +69,23 @@ class DownloadManager: """Return true iff we have no active or pending jobs.""" self._lock.acquire() try: - return downloadQueue.empty() and len(self.downloads) == 0 + return self.downloadQueue.empty() and len(self.downloads) == 0 finally: self._lock.release() def wait(self): """Pause until we have no active or pending jobs.""" - while True: + while not self.finished(): self.done.acquire() self.done.wait() self.done.release() - if self.finished(): - break + try: + while True: + item = self.resultQueue.get(block=False) + item() + except Queue.Empty: + pass def addDownloadJob(self, job): """Add another DownloadJob to the end of the work queue.""" @@ -97,6 +104,7 @@ class DownloadManager: while True: job = self.downloadQueue.get() # Grab job from queue. rp = job.getRelativePath() + success = False try: logging.info("start %s in Thread %s", rp, idx) success = job.download() # Execute the download. @@ -110,6 +118,11 @@ class DownloadManager: finally: self._lock.release() + if success: + self.resultQueue.put(job._success) + else: + self.resultQueue.put(job._failure) + self.done.acquire() self.done.notify() self.done.release() @@ -129,6 +142,14 @@ class DownloadJob: self._wantHash = wantHash self._useTor = useTor + self._success = lambda : None + self._failure = lambda : None + + def setCallbacks(self, success, failure): + """DOCDOC""" + self._success = success + self._failure = failure + def getURL(self): """Abstract implementation helper. Returns the URL that the _download function downloads from.""" @@ -155,7 +176,8 @@ class DownloadJob: try: self._download() return True - except (OSError, thandy.DownloadError), err: + except (OSError, httplib.error, urllib2.HTTPError, + thandy.DownloadError), err: # XXXXX retry on failure logging.warn("Download failed: %s", err) return False @@ -165,7 +187,6 @@ class DownloadJob: traceback.format_exc()) sys.exit(1) - def _download(self): # Implementation function. Unlike download(), can throw exceptions. f_in = f_out = None @@ -177,7 +198,8 @@ class DownloadJob: if self.haveStalledFile(): have_length = os.stat(self._tmpPath).st_size - print "Have stalled file with %s bytes"%have_length + logging.info("Have stalled file for %s with %s bytes", url, + have_length) else: have_length = None @@ -218,6 +240,7 @@ class DownloadJob: if gotHash != self._wantHash: raise thandy.DownloadError("File hash was not as expected.") + thandy.util.ensureParentDir(self._destPath) thandy.util.moveFile(self._tmpPath, self._destPath) @@ -245,7 +268,7 @@ class ThandyDownloadJob(DownloadJob): DownloadJob.__init__(self, destPath, None, wantHash=wantHash, useTor=useTor) - self._mirrorList = mirrorList[:] + self._mirrorList = mirrorList self._relPath = relPath tmppath = thandy.util.userFilename("tmp") @@ -279,7 +302,10 @@ class ThandyDownloadJob(DownloadJob): mirror = thandy.util.randChooseWeighted(usable) - return m['urlbase'] + self._relPath + if m['urlbase'][-1] == '/' and self._relPath[0] == '/': + return m['urlbase'] + self._relPath[1:] + else: + return m['urlbase'] + self._relPath def getRelativePath(self): return self._relPath diff --git a/lib/thandy/util.py b/lib/thandy/util.py index 2db7df4..c1f1326 100644 --- a/lib/thandy/util.py +++ b/lib/thandy/util.py @@ -24,7 +24,6 @@ def moveFile(fromLocation, toLocation): os.rename(fromLocation, toLocation) - def replaceFile(fname, contents, textMode=False): """overwrite the file in 'fname' atomically with the content of 'contents' """ @@ -48,13 +47,19 @@ def userFilename(name): os.makedirs(base, 0700) return os.path.join(base, name) +def ensureParentDir(name): + """DOCDOC""" + directory = os.path.split(name)[0] + if not os.path.exists(directory): + os.makedirs(directory, 0700) + def getKeylist(keys_fname, checkKeys=True): import thandy.master_keys keydb = thandy.formats.Keylist() for key in thandy.master_keys.MASTER_KEYS: - keydb.addKey(key) + keydb.addKey(thandy.keys.RSAKey.fromJSon(key)) user_keys = userFilename("preload_keys") if os.path.exists(user_keys): -- cgit v1.2.3