summaryrefslogtreecommitdiff
path: root/lib/thandy/download.py
blob: 2f98b30f636f9815f7a068b758666812fb6c5151 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
# Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information.

import httplib
import logging
import os
import Queue
import random
import sys
import threading
import traceback
import urllib2

import thandy.util
import thandy.socksurls


class DownloadManager:
    """Class to track a set of downloads and pass them out to worker threads.
    """
    def __init__(self, n_threads=2):
        # Prevents concurrent modification to downloads and haveDownloaded
        self._lock = threading.RLock()
        # Map from resource relPath to job.
        self.downloads = {}
        # Map from resource relPath from True to objects that we have
        # managed to dowload.
        self.haveDownloaded = {}
        # Work queue of DownloadJobs that we intend to process once a thread
        # is free.
        self.downloadQueue = Queue.Queue()
        # DOCDOC
        self.resultQueue = Queue.Queue()

        # List of worker threads.
        self.threads = [ threading.Thread(target=self._thread, args=[idx])
                         for idx in xrange(n_threads) ]
        # Condition that gets triggered whenever a thread is finished doing
        # something.
        self.done = threading.Condition()
        for t in self.threads:
            t.setDaemon(True)

        #DOCDOC
        self._raiseMe = None

    def start(self):
        """Start all of this download manager's worker threads."""
        for t in self.threads:
            t.start()

    def isCurrentlyDownloading(self, relPath):
        """Return true iff this download manager is currently downloading
           some copy of the resource at relPath."""
        self._lock.acquire()
        try:
            return self.downloads.has_key(relPath)
        finally:
            self._lock.release()

    def isRedundant(self, relPath):
        """Return true iff we are currently downloading, or have
           downloaded, the resource at relPath."""

        self._lock.acquire()
        try:
            return (self.downloads.has_key(relPath) or
                    self.haveDownloaded.has_key(relPath))
        finally:
            self._lock.release()

    def finished(self):
        """Return true iff we have no active or pending jobs."""
        self._lock.acquire()
        try:
            return self.downloadQueue.empty() and len(self.downloads) == 0
        finally:
            self._lock.release()

    def wait(self):
        """Pause until we have no active or pending jobs."""
        while not self.finished():
            self.done.acquire()
            self.done.wait()
            self.done.release()

            if self._raiseMe:
                raise self._raiseMe

            try:
                while True:
                    item = self.resultQueue.get(block=False)
                    item()
            except Queue.Empty:
                pass

    def addDownloadJob(self, job):
        """Add another DownloadJob to the end of the work queue."""
        rp = job.getRelativePath()
        self._lock.acquire()
        self.downloads[rp] = job
        self._lock.release()
        self.downloadQueue.put(job)

    def downloadFailed(self, mirror, relpath):
        """DOCDOC"""
        pass # Track failure; don't try the same mirror right away.

    def _thread(self, idx):
        # Run in the background per thread.  idx is the number of the thread.
        while True:
            job = self.downloadQueue.get() # Grab job from queue.
            rp = job.getRelativePath()
            success = False
            try:
                logging.info("start %s in Thread %s", rp, idx)
                success = job.download() # Execute the download.
                logging.info("end %s in Thread %s", rp, idx)
            finally:
                self._lock.acquire()
                try:
                    del self.downloads[rp]
                    if success: # If we downloaded correctly, say so.
                        self.haveDownloaded[rp] = True
                finally:
                    self._lock.release()

                if success:
                    self.resultQueue.put(job._success)
                else:
                    self.resultQueue.put(job._failure)

                self.done.acquire()
                self.done.notify()
                self.done.release()

class DownloadJob:
    """Abstract base class.  Represents a thing to be downloaded, and the
       knowledge of how to download it."""
    def __init__(self, targetPath, tmpPath, wantHash=None, repoFile=None,
                 useTor=False):
        """Create a new DownloadJob.  When it is finally downloaded,
           store it in targetPath.  Store partial results in tmpPath;
           if there is already a file in tmpPath, assume that it is an
           incomplete download. If wantHash, reject the file unless
           the hash is as given.  If useTor, use a socks connection."""
        #DOCDODC repofile
        self._destPath = targetPath
        self._tmpPath = tmpPath
        self._wantHash = wantHash
        self._repoFile = repoFile
        self._useTor = useTor

        self._success = lambda : None
        self._failure = lambda : None

    def setCallbacks(self, success, failure):
        """DOCDOC"""
        self._success = success
        self._failure = failure

    def getURL(self):
        """Abstract implementation helper.  Returns the URL that the
           _download function downloads from."""
        raise NotImplemented()

    def getRelativePath(self):
        """Abstract. Returns a string representing this download, to
           keep two downloads of the same object from running at once.
           In Thandy, this is usually a relative path of a downloaded
           object within the repository.
        """
        raise NotImplemented()

    def haveStalledFile(self):
        """Return true iff we have an existing incomplete download stored in
           the temporary file.
        """
        return os.path.exists(self._tmpPath)

    def download(self):
        """Main interface function: Start the download, and return
           when complete.
        """
        try:
            self._download()
            return True
        except (OSError, httplib.error, urllib2.HTTPError,
                thandy.DownloadError), err:
            # XXXXX retry on failure
            logging.warn("Download failed: %s", err)
            return False
        except:
            tp, val, tb = sys.exc_info()
            logging.warn("Internal during download: %s, %s", val,
                         traceback.format_exc())
            return False

    def _checkTmpFile(self):
        """DOCDOC"""
        if self._wantHash and not self._repoFile:
            gotHash = thandy.formats.getFileDigest(self._tmpPath)
            if gotHash != self._wantHash:
                raise thandy.DownloadError("File hash was not as expected.")
        elif self._repoFile:
            self._repoFile.checkFile(self._tmpPath, self._wantHash)

    def _download(self):
        # Implementation function.  Unlike download(), can throw exceptions.
        f_in = f_out = None

        haveStalled = self.haveStalledFile()
        if haveStalled and self._wantHash:
            try:
                self._checkTmpFile()
            except thandy.Exception:
                pass
            else:
                # What luck!  This file was what we wanted.
                thandy.util.ensureParentDir(self._destPath)
                thandy.util.moveFile(self._tmpPath, self._destPath)
                return

        try:
            url = self.getURL()

            logging.info("Downloading %s", url)

            if haveStalled:
                have_length = os.stat(self._tmpPath).st_size
                logging.info("Have stalled file for %s with %s bytes", url,
                             have_length)
            else:
                have_length = None

            f_in = getConnection(url, self._useTor, have_length)

            logging.info("Connected to %s", url)

            gotRange = f_in.info().get("Content-Range")
            expectLength = f_in.info().get("Content-Length", "???")
            if gotRange:
                if gotRange.startswith("bytes %s-"%have_length):
                    logging.info("Resuming download from %s"%url)
                    f_out = open(self._tmpPath, 'a')
                else:
                    raise thandy.DownloadError("Got an unexpected range %s"
                                               %gotRange)
            else:
                f_out = open(self._tmpPath, 'w')

            total = 0
            while True:
                c = f_in.read(1024)
                if not c:
                    break
                f_out.write(c)
                total += len(c)
                logging.debug("Got %s/%s bytes from %s",
                              total, expectLength, url)

        finally:
            if f_in is not None:
                f_in.close()
            if f_out is not None:
                f_out.close()

        self._checkTmpFile()

        thandy.util.ensureParentDir(self._destPath)
        thandy.util.moveFile(self._tmpPath, self._destPath)


class SimpleDownloadJob(DownloadJob):
    """Testing subtype of DownloadJob: just downloads a URL and writes it to
       disk."""
    def __init__(self, targetPath, url,
                 wantHash=None, supportedURLTypes=None, useTor=False):
        DownloadJob.__init__(self, targetPath, targetPath+".tmp",
                                 wantHash=wantHash,
                                 useTor=useTor)
        self._url = url

    def getURL(self):
        return self._url

    def getRelativePath(self):
        return self._url

class ThandyDownloadJob(DownloadJob):
    """Thandy's subtype of DownloadJob: knows about mirrors, weighting,
       and Thandy's directory structure."""
    def __init__(self, relPath, destPath, mirrorList, wantHash=None,
                 supportedURLTypes=None, useTor=None, repoFile=None):

        DownloadJob.__init__(self, destPath, None, wantHash=wantHash,
                             useTor=useTor, repoFile=repoFile)
        self._mirrorList = mirrorList
        self._relPath = relPath

        tmppath = thandy.util.userFilename("tmp")
        if relPath.startswith("/"):
            relPath = relPath[1:]
        self._tmpPath = os.path.join(tmppath, relPath)

        d = os.path.dirname(self._tmpPath)
        if not os.path.exists(d):
            os.makedirs(d, 0700)

        self._supportedURLTypes = None
        if self._supportedURLTypes is None and useTor:
            self._supportedURLTypes = [ "http", "https" ]


    def getURL(self):
        usable = []

        for m in self._mirrorList['mirrors']:
            for c in m['contents']:

                if self._supportedURLTypes is not None:
                    urltype = urllib2.splittype(m['urlbase'])[0]
                    if urltype.lower() not in self._supportedURLTypes:
                        continue

                if thandy.formats.rolePathMatches(c, self._relPath):
                    usable.append( (m['weight'], m) )
                    break

        mirror = thandy.util.randChooseWeighted(usable)

        if m['urlbase'][-1] == '/' and self._relPath[0] == '/':
            return m['urlbase'] + self._relPath[1:]
        else:
            return m['urlbase'] + self._relPath

    def getRelativePath(self):
        return self._relPath


_socks_opener = thandy.socksurls.build_socks_opener()

def getConnection(url, useTor, have_length=None):
    """Open a connection to 'url'.  We already have received
       have_length bytes of the file we're trying to fetch, so resume
       if possible.

    """
    headers = {}
    urltype = urllib2.splittype(url)[0]
    is_http = urltype in ["http", "https"]

    if have_length is not None and is_http:
        headers['Range'] = "bytes=%s-"%have_length

    req = urllib2.Request(url, headers=headers)

    if useTor:
        conn = _socks_opener.open(req)
    else:
        conn = urllib2.urlopen(req)

    return conn


if __name__ == '__main__':
    # Trivial CLI to test out downloading.

    import getopt
    options, args = getopt.getopt(sys.argv[1:], "",
                                  ["use-tor", "socksport=", "threads="])

    useTor = False
    socksPort = 9050
    nThreads = 2
    for o,v in options:
        if o == "--use-tor":
            useTor = True
        elif o == "--socksport":
            socksPort = int(v)
        elif o == "--threads":
            nThreads = int(v)

    logging.basicConfig(level=logging.DEBUG)

    if useTor:
        thandy.socksurls.setSocksProxy("127.0.0.1", socksPort)

    manager = DownloadManager(nThreads)

    for url in args:
        fn = urllib2.splithost(urllib2.splittype(url)[1])[1]
        fn = os.path.split(fn)[1]

        job = SimpleDownloadJob(fn, url, useTor=useTor)
        manager.addDownloadJob(job)

    manager.start()
    manager.wait()