summaryrefslogtreecommitdiff
path: root/lib/thandy/download.py
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2008-11-16 20:15:34 +0000
committerNick Mathewson <nickm@torproject.org>2008-11-16 20:15:34 +0000
commitc5749895ab4f6893bdf1d398691d1dd33e81574c (patch)
tree4ee47c3c6c56e313c3074f04c77a3637cf0fe31d /lib/thandy/download.py
parent02a2e5807f23ad0cad9a49b5febe08ec25fcc74c (diff)
have some more thandy. This update includes a working downloader with tor support, a package system framework, and more. Most of what's left is glue code.
git-svn-id: file:///home/or/svnrepo/updater/trunk@17288 55e972cd-5a19-0410-ae62-a4d7a52db4cd
Diffstat (limited to 'lib/thandy/download.py')
-rw-r--r--lib/thandy/download.py331
1 files changed, 275 insertions, 56 deletions
diff --git a/lib/thandy/download.py b/lib/thandy/download.py
index bf7dc43..73226c7 100644
--- a/lib/thandy/download.py
+++ b/lib/thandy/download.py
@@ -1,28 +1,50 @@
# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information.
-import urllib2
import httplib
+import logging
+import os
+import Queue
import random
-
-import threading, Queue
+import sys
+import threading
+import traceback
+import urllib2
import thandy.util
+import thandy.socksurls
-class Downloads:
+
+class DownloadManager:
+ """Class to track a set of downloads and pass them out to worker threads.
+ """
def __init__(self, n_threads=2):
+ # Prevents concurrent modification to downloads and haveDownloaded
self._lock = threading.RLock()
+ # Map from resource relPath to job.
self.downloads = {}
+ # Map from resource relPath from True to objects that we have
+ # managed to dowload.
self.haveDownloaded = {}
+ # Work queue of DownloadJobs that we intend to process once a thread
+ # is free.
self.downloadQueue = Queue.Queue()
- self.threads = [ threading.Thread(target=self._thread) ]
+ # List of worker threads.
+ self.threads = [ threading.Thread(target=self._thread, args=[idx])
+ for idx in xrange(n_threads) ]
+ # Condition that gets triggered whenever a thread is finished doing
+ # something.
+ self.done = threading.Condition()
for t in self.threads:
t.setDaemon(True)
def start(self):
+ """Start all of this download manager's worker threads."""
for t in self.threads:
t.start()
def isCurrentlyDownloading(self, relPath):
+ """Return true iff this download manager is currently downloading
+ some copy of the resource at relPath."""
self._lock.acquire()
try:
return self.downloads.has_key(relPath)
@@ -30,6 +52,9 @@ class Downloads:
self._lock.release()
def isRedundant(self, relPath):
+ """Return true iff we are currently downloading, or have
+ downloaded, the resource at relPath."""
+
self._lock.acquire()
try:
return (self.downloads.has_key(relPath) or
@@ -37,91 +62,285 @@ class Downloads:
finally:
self._lock.release()
+ def finished(self):
+ """Return true iff we have no active or pending jobs."""
+ self._lock.acquire()
+ try:
+ return downloadQueue.empty() and len(self.downloads) == 0
+ finally:
+ self._lock.release()
+
+ def wait(self):
+ """Pause until we have no active or pending jobs."""
+ while True:
+ self.done.acquire()
+ self.done.wait()
+ self.done.release()
+
+ if self.finished():
+ break
+
def addDownloadJob(self, job):
+ """Add another DownloadJob to the end of the work queue."""
rp = job.getRelativePath()
self._lock.acquire()
self.downloads[rp] = job
self._lock.release()
self.downloadQueue.put(job)
- def _thread(self):
+ def downloadFailed(self, mirror, relpath):
+ """DOCDOC"""
+ pass # Track failure; don't try the same mirror right away.
+
+ def _thread(self, idx):
+ # Run in the background per thread. idx is the number of the thread.
while True:
- job = self.downloadQueue.get()
- job.download()
+ job = self.downloadQueue.get() # Grab job from queue.
rp = job.getRelativePath()
- self._lock.acquire()
try:
- del self.downloads[rp]
- self.haveDownloaded[rp] = True
+ logging.info("start %s in Thread %s", rp, idx)
+ success = job.download() # Execute the download.
+ logging.info("end %s in Thread %s", rp, idx)
finally:
- self._lock.release()
+ self._lock.acquire()
+ try:
+ del self.downloads[rp]
+ if success: # If we downloaded correctly, say so.
+ self.haveDownloaded[rp] = True
+ finally:
+ self._lock.release()
+
+ self.done.acquire()
+ self.done.notify()
+ self.done.release()
class DownloadJob:
- def __init__(self, relPath, destPath, mirrorlist=None,
- wantHash=None, canStall=False):
- self._relPath = relPath
+ """Abstract base class. Represents a thing to be downloaded, and the
+ knowledge of how to download it."""
+ def __init__(self, targetPath, tmpPath, wantHash=None, useTor=False):
+ """Create a new DownloadJob. When it is finally downloaded,
+ store it in targetPath. Store partial results in tmpPath;
+ if there is already a file in tmpPath, assume that it is an
+ incomplete download. If wantHash, reject the file unless
+ the hash is as given. If useTor, use a socks connection."""
+
+ self._destPath = targetPath
+ self._tmpPath = tmpPath
self._wantHash = wantHash
- self._mirrorList = mirrorlist
- self._destPath = destPath
+ self._useTor = useTor
+
+ def getURL(self):
+ """Abstract implementation helper. Returns the URL that the
+ _download function downloads from."""
+ raise NotImplemented()
+
+ def getRelativePath(self):
+ """Abstract. Returns a string representing this download, to
+ keep two downloads of the same object from running at once.
+ In Thandy, this is usually a relative path of a downloaded
+ object within the repository.
+ """
+ raise NotImplemented()
+
+ def haveStalledFile(self):
+ """Return true iff we have an existing incomplete download stored in
+ the temporary file.
+ """
+ return os.path.exists(self._tmpPath)
+
+ def download(self):
+ """Main interface function: Start the download, and return
+ when complete.
+ """
+ try:
+ self._download()
+ return True
+ except (OSError, thandy.DownloadError), err:
+ # XXXXX retry on failure
+ logging.warn("Download failed: %s", err)
+ return False
+ except:
+ tp, val, tb = sys.exc_info()
+ logging.warn("Internal during download: %s, %s", val,
+ traceback.format_exc())
+ sys.exit(1)
+
+
+ def _download(self):
+ # Implementation function. Unlike download(), can throw exceptions.
+ f_in = f_out = None
+
+ try:
+ url = self.getURL()
+
+ logging.info("Downloading %s", url)
+
+ if self.haveStalledFile():
+ have_length = os.stat(self._tmpPath).st_size
+ print "Have stalled file with %s bytes"%have_length
+ else:
+ have_length = None
+
+ f_in = getConnection(url, self._useTor, have_length)
+
+ logging.info("Connected to %s", url)
+
+ gotRange = f_in.info().get("Content-Range")
+ expectLength = f_in.info().get("Content-Length", "???")
+ if gotRange:
+ if gotRange.startswith("bytes %s-"%(have_length+1)):
+ logging.info("Resuming download from %s"%url)
+ f_out = open(self._tmpPath, 'a')
+ else:
+ raise thandy.DownloadError("Got an unexpected range %s"
+ %gotRange)
+ else:
+ f_out = open(self._tmpPath, 'w')
+
+ total = 0
+ while True:
+ c = f_in.read(1024)
+ if not c:
+ break
+ f_out.write(c)
+ total += len(c)
+ logging.debug("Got %s/%s bytes from %s",
+ total, expectLength, url)
+
+ finally:
+ if f_in is not None:
+ f_in.close()
+ if f_out is not None:
+ f_out.close()
+
+ if self._wantHash:
+ gotHash = thandy.formats.getFileDigest(self._tmpPath)
+ if gotHash != self._wantHash:
+ raise thandy.DownloadError("File hash was not as expected.")
+
+ thandy.util.moveFile(self._tmpPath, self._destPath)
+
+
+class SimpleDownloadJob(DownloadJob):
+ """Testing subtype of DownloadJob: just downloads a URL and writes it to
+ disk."""
+ def __init__(self, targetPath, url,
+ wantHash=None, supportedURLTypes=None, useTor=False):
+ DownloadJob.__init__(self, targetPath, targetPath+".tmp",
+ wantHash=wantHash,
+ useTor=useTor)
+ self._url = url
+
+ def getURL(self):
+ return self._url
+
+ def getRelativePath(self):
+ return self._url
+
+class ThandyDownloadJob(DownloadJob):
+ """Thandy's subtype of DownloadJob: knows about mirrors, weighting,
+ and Thandy's directory structure."""
+ def __init__(self, relPath, destPath, mirrorList, wantHash=None,
+ supportedURLTypes=None, useTor=None):
+
+ DownloadJob.__init__(self, destPath, None, wantHash=wantHash,
+ useTor=useTor)
+ self._mirrorList = mirrorList[:]
+ self._relPath = relPath
tmppath = thandy.util.userFilename("tmp")
if relPath.startswith("/"):
relPath = relPath[1:]
- self._tmppath = os.path.join(tmppath, relPath)
+ self._tmpPath = os.path.join(tmppath, relPath)
- d = os.path.dirname(self._tmppath)
+ d = os.path.dirname(self._tmpPath)
if not os.path.exists(d):
os.makedirs(d, 0700)
- def getRelativePath(self):
- return self._relPath
+ self._supportedURLTypes = None
+ if self._supportedURLTypes is None and useTor:
+ self._supportedURLTypes = [ "http", "https" ]
- def haveStalledFile(self):
- return os.path.exists(self._tmppath)
- def getURL(self, mirrorlist=None):
- if mirrorlist is None:
- mirrorlist = self._mirrorList
- weightSoFar = 0
+ def getURL(self):
usable = []
- for m in mirrorlist['mirrors']:
+ for m in self._mirrorList['mirrors']:
for c in m['contents']:
- # CHECK FOR URL SUITABILITY XXXXX
+
+ if self._supportedURLTypes is not None:
+ urltype = urllib2.splittype(m['urlbase'][0])
+ if urltype.lower() not in self._supportedURLTypes:
+ continue
if thandy.formats.rolePathMatches(c, self._relPath):
- weightSoFar += m['weight']
- usable.append( (weightSoFar, m) )
+ usable.append( (m['weight'], m) )
break
- wTarget = random.randint(0, weightSoFar)
- mirror = None
- # Could use bisect here instead
- for w, m in mirrorlist:
- if w >= wTarget:
- mirror = m
- break
+ mirror = thandy.util.randChooseWeighted(usable)
return m['urlbase'] + self._relPath
- def download(self):
- # XXXX RESUME
+ def getRelativePath(self):
+ return self._relPath
- f_in = urllib2.urlopen(self.getURL())
- f_out = open(self._tmpPath, 'w')
- while True:
- c = f_in.read(1024)
- if not c:
- break
- f_out.write(c)
- f_in.close()
- f_out.close()
- # XXXXX retry on failure
- if self._wantHash:
- gotHash = thandy.formats.getFileDigest(self._tmpPath)
- if gotHash != self._wantHash:
- # XXXX Corrupt file.
- pass
+_socks_opener = thandy.socksurls.build_socks_opener()
+
+def getConnection(url, useTor, have_length=None):
+ """Open a connection to 'url'. We already have received
+ have_length bytes of the file we're trying to fetch, so resume
+ if possible.
+
+ """
+ headers = {}
+ urltype = urllib2.splittype(url)[0]
+ is_http = urltype in ["http", "https"]
+
+ if have_length is not None and is_http:
+ headers['Range'] = "bytes=%s-"%(have_length+1)
+
+ req = urllib2.Request(url, headers=headers)
+
+ if useTor:
+ conn = _socks_opener.open(req)
+ else:
+ conn = urllib2.urlopen(req)
+
+ return conn
+
+
+if __name__ == '__main__':
+ # Trivial CLI to test out downloading.
+
+ import getopt
+ options, args = getopt.getopt(sys.argv[1:], "",
+ ["use-tor", "socksport=", "threads="])
+
+ useTor = False
+ socksPort = 9050
+ nThreads = 2
+ for o,v in options:
+ if o == "--use-tor":
+ useTor = True
+ elif o == "--socksport":
+ socksPort = int(v)
+ elif o == "--threads":
+ nThreads = int(v)
+
+ logging.basicConfig(level=logging.DEBUG)
+
+ if useTor:
+ thandy.socksurls.setSocksProxy("127.0.0.1", socksPort)
+
+ manager = DownloadManager(nThreads)
+
+ for url in args:
+ fn = urllib2.splithost(urllib2.splittype(url)[1])[1]
+ fn = os.path.split(fn)[1]
+
+ job = SimpleDownloadJob(fn, url, useTor=useTor)
+ manager.addDownloadJob(job)
- thandy.utils.moveFile(self._tmpPath, self._destPath)
+ manager.start()
+ manager.wait()