From c5749895ab4f6893bdf1d398691d1dd33e81574c Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Sun, 16 Nov 2008 20:15:34 +0000 Subject: have some more thandy. This update includes a working downloader with tor support, a package system framework, and more. Most of what's left is glue code. git-svn-id: file:///home/or/svnrepo/updater/trunk@17288 55e972cd-5a19-0410-ae62-a4d7a52db4cd --- lib/thandy/ClientCLI.py | 9 +- lib/thandy/__init__.py | 2 + lib/thandy/download.py | 331 +++++++++++++++++++++++++++------ lib/thandy/formats.py | 24 ++- lib/thandy/packagesys/ExePackages.py | 33 ++++ lib/thandy/packagesys/PackageDB.py | 81 ++++++++ lib/thandy/packagesys/PackageSystem.py | 58 ++++++ lib/thandy/packagesys/RPMPackages.py | 156 ++++++++++++++++ lib/thandy/packagesys/__init__.py | 4 + lib/thandy/repository.py | 153 ++++++++++++++- lib/thandy/socksurls.py | 93 +++++++++ lib/thandy/util.py | 20 ++ 12 files changed, 890 insertions(+), 74 deletions(-) create mode 100644 lib/thandy/packagesys/ExePackages.py create mode 100644 lib/thandy/packagesys/PackageDB.py create mode 100644 lib/thandy/packagesys/PackageSystem.py create mode 100644 lib/thandy/packagesys/RPMPackages.py create mode 100644 lib/thandy/packagesys/__init__.py create mode 100644 lib/thandy/socksurls.py (limited to 'lib/thandy') diff --git a/lib/thandy/ClientCLI.py b/lib/thandy/ClientCLI.py index ba7fb2e..ad39a2d 100644 --- a/lib/thandy/ClientCLI.py +++ b/lib/thandy/ClientCLI.py @@ -33,11 +33,11 @@ def update(args): for f in files: # XXXX Use hash. - dj = thandy.download.DownloadJob(f, repo.getFilename(f), - mirrorlist) + dj = thandy.download.ThandyDownloadJob(f, repo.getFilename(f), + mirrorlist) downloader.addDownloadJob(dj) # XXXX replace file in repository if ok; reload; see what changed. - + # Wait for in-progress jobs # Check my repository @@ -48,6 +48,7 @@ def update(args): # Tell me what to install. + def usage(): print "Known commands:" print " update [--repo=repository] [--no-download]" @@ -58,7 +59,7 @@ def main(): usage() cmd = sys.argv[1] args = sys.argv[2:] - if cmd in [ "update" ]: + if cmd in [ "update", "geturls" ]: globals()[cmd](args) else: usage() diff --git a/lib/thandy/__init__.py b/lib/thandy/__init__.py index 03b262e..5b53bfe 100644 --- a/lib/thandy/__init__.py +++ b/lib/thandy/__init__.py @@ -34,3 +34,5 @@ class PubkeyFormatException(FormatException): class UnknownMethod(CryptoError): pass +class DownloadError(Exception): + pass diff --git a/lib/thandy/download.py b/lib/thandy/download.py index bf7dc43..73226c7 100644 --- a/lib/thandy/download.py +++ b/lib/thandy/download.py @@ -1,28 +1,50 @@ # Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information. -import urllib2 import httplib +import logging +import os +import Queue import random - -import threading, Queue +import sys +import threading +import traceback +import urllib2 import thandy.util +import thandy.socksurls -class Downloads: + +class DownloadManager: + """Class to track a set of downloads and pass them out to worker threads. + """ def __init__(self, n_threads=2): + # Prevents concurrent modification to downloads and haveDownloaded self._lock = threading.RLock() + # Map from resource relPath to job. self.downloads = {} + # Map from resource relPath from True to objects that we have + # managed to dowload. self.haveDownloaded = {} + # Work queue of DownloadJobs that we intend to process once a thread + # is free. self.downloadQueue = Queue.Queue() - self.threads = [ threading.Thread(target=self._thread) ] + # List of worker threads. + self.threads = [ threading.Thread(target=self._thread, args=[idx]) + for idx in xrange(n_threads) ] + # Condition that gets triggered whenever a thread is finished doing + # something. + self.done = threading.Condition() for t in self.threads: t.setDaemon(True) def start(self): + """Start all of this download manager's worker threads.""" for t in self.threads: t.start() def isCurrentlyDownloading(self, relPath): + """Return true iff this download manager is currently downloading + some copy of the resource at relPath.""" self._lock.acquire() try: return self.downloads.has_key(relPath) @@ -30,6 +52,9 @@ class Downloads: self._lock.release() def isRedundant(self, relPath): + """Return true iff we are currently downloading, or have + downloaded, the resource at relPath.""" + self._lock.acquire() try: return (self.downloads.has_key(relPath) or @@ -37,91 +62,285 @@ class Downloads: finally: self._lock.release() + def finished(self): + """Return true iff we have no active or pending jobs.""" + self._lock.acquire() + try: + return downloadQueue.empty() and len(self.downloads) == 0 + finally: + self._lock.release() + + def wait(self): + """Pause until we have no active or pending jobs.""" + while True: + self.done.acquire() + self.done.wait() + self.done.release() + + if self.finished(): + break + def addDownloadJob(self, job): + """Add another DownloadJob to the end of the work queue.""" rp = job.getRelativePath() self._lock.acquire() self.downloads[rp] = job self._lock.release() self.downloadQueue.put(job) - def _thread(self): + def downloadFailed(self, mirror, relpath): + """DOCDOC""" + pass # Track failure; don't try the same mirror right away. + + def _thread(self, idx): + # Run in the background per thread. idx is the number of the thread. while True: - job = self.downloadQueue.get() - job.download() + job = self.downloadQueue.get() # Grab job from queue. rp = job.getRelativePath() - self._lock.acquire() try: - del self.downloads[rp] - self.haveDownloaded[rp] = True + logging.info("start %s in Thread %s", rp, idx) + success = job.download() # Execute the download. + logging.info("end %s in Thread %s", rp, idx) finally: - self._lock.release() + self._lock.acquire() + try: + del self.downloads[rp] + if success: # If we downloaded correctly, say so. + self.haveDownloaded[rp] = True + finally: + self._lock.release() + + self.done.acquire() + self.done.notify() + self.done.release() class DownloadJob: - def __init__(self, relPath, destPath, mirrorlist=None, - wantHash=None, canStall=False): - self._relPath = relPath + """Abstract base class. Represents a thing to be downloaded, and the + knowledge of how to download it.""" + def __init__(self, targetPath, tmpPath, wantHash=None, useTor=False): + """Create a new DownloadJob. When it is finally downloaded, + store it in targetPath. Store partial results in tmpPath; + if there is already a file in tmpPath, assume that it is an + incomplete download. If wantHash, reject the file unless + the hash is as given. If useTor, use a socks connection.""" + + self._destPath = targetPath + self._tmpPath = tmpPath self._wantHash = wantHash - self._mirrorList = mirrorlist - self._destPath = destPath + self._useTor = useTor + + def getURL(self): + """Abstract implementation helper. Returns the URL that the + _download function downloads from.""" + raise NotImplemented() + + def getRelativePath(self): + """Abstract. Returns a string representing this download, to + keep two downloads of the same object from running at once. + In Thandy, this is usually a relative path of a downloaded + object within the repository. + """ + raise NotImplemented() + + def haveStalledFile(self): + """Return true iff we have an existing incomplete download stored in + the temporary file. + """ + return os.path.exists(self._tmpPath) + + def download(self): + """Main interface function: Start the download, and return + when complete. + """ + try: + self._download() + return True + except (OSError, thandy.DownloadError), err: + # XXXXX retry on failure + logging.warn("Download failed: %s", err) + return False + except: + tp, val, tb = sys.exc_info() + logging.warn("Internal during download: %s, %s", val, + traceback.format_exc()) + sys.exit(1) + + + def _download(self): + # Implementation function. Unlike download(), can throw exceptions. + f_in = f_out = None + + try: + url = self.getURL() + + logging.info("Downloading %s", url) + + if self.haveStalledFile(): + have_length = os.stat(self._tmpPath).st_size + print "Have stalled file with %s bytes"%have_length + else: + have_length = None + + f_in = getConnection(url, self._useTor, have_length) + + logging.info("Connected to %s", url) + + gotRange = f_in.info().get("Content-Range") + expectLength = f_in.info().get("Content-Length", "???") + if gotRange: + if gotRange.startswith("bytes %s-"%(have_length+1)): + logging.info("Resuming download from %s"%url) + f_out = open(self._tmpPath, 'a') + else: + raise thandy.DownloadError("Got an unexpected range %s" + %gotRange) + else: + f_out = open(self._tmpPath, 'w') + + total = 0 + while True: + c = f_in.read(1024) + if not c: + break + f_out.write(c) + total += len(c) + logging.debug("Got %s/%s bytes from %s", + total, expectLength, url) + + finally: + if f_in is not None: + f_in.close() + if f_out is not None: + f_out.close() + + if self._wantHash: + gotHash = thandy.formats.getFileDigest(self._tmpPath) + if gotHash != self._wantHash: + raise thandy.DownloadError("File hash was not as expected.") + + thandy.util.moveFile(self._tmpPath, self._destPath) + + +class SimpleDownloadJob(DownloadJob): + """Testing subtype of DownloadJob: just downloads a URL and writes it to + disk.""" + def __init__(self, targetPath, url, + wantHash=None, supportedURLTypes=None, useTor=False): + DownloadJob.__init__(self, targetPath, targetPath+".tmp", + wantHash=wantHash, + useTor=useTor) + self._url = url + + def getURL(self): + return self._url + + def getRelativePath(self): + return self._url + +class ThandyDownloadJob(DownloadJob): + """Thandy's subtype of DownloadJob: knows about mirrors, weighting, + and Thandy's directory structure.""" + def __init__(self, relPath, destPath, mirrorList, wantHash=None, + supportedURLTypes=None, useTor=None): + + DownloadJob.__init__(self, destPath, None, wantHash=wantHash, + useTor=useTor) + self._mirrorList = mirrorList[:] + self._relPath = relPath tmppath = thandy.util.userFilename("tmp") if relPath.startswith("/"): relPath = relPath[1:] - self._tmppath = os.path.join(tmppath, relPath) + self._tmpPath = os.path.join(tmppath, relPath) - d = os.path.dirname(self._tmppath) + d = os.path.dirname(self._tmpPath) if not os.path.exists(d): os.makedirs(d, 0700) - def getRelativePath(self): - return self._relPath + self._supportedURLTypes = None + if self._supportedURLTypes is None and useTor: + self._supportedURLTypes = [ "http", "https" ] - def haveStalledFile(self): - return os.path.exists(self._tmppath) - def getURL(self, mirrorlist=None): - if mirrorlist is None: - mirrorlist = self._mirrorList - weightSoFar = 0 + def getURL(self): usable = [] - for m in mirrorlist['mirrors']: + for m in self._mirrorList['mirrors']: for c in m['contents']: - # CHECK FOR URL SUITABILITY XXXXX + + if self._supportedURLTypes is not None: + urltype = urllib2.splittype(m['urlbase'][0]) + if urltype.lower() not in self._supportedURLTypes: + continue if thandy.formats.rolePathMatches(c, self._relPath): - weightSoFar += m['weight'] - usable.append( (weightSoFar, m) ) + usable.append( (m['weight'], m) ) break - wTarget = random.randint(0, weightSoFar) - mirror = None - # Could use bisect here instead - for w, m in mirrorlist: - if w >= wTarget: - mirror = m - break + mirror = thandy.util.randChooseWeighted(usable) return m['urlbase'] + self._relPath - def download(self): - # XXXX RESUME + def getRelativePath(self): + return self._relPath - f_in = urllib2.urlopen(self.getURL()) - f_out = open(self._tmpPath, 'w') - while True: - c = f_in.read(1024) - if not c: - break - f_out.write(c) - f_in.close() - f_out.close() - # XXXXX retry on failure - if self._wantHash: - gotHash = thandy.formats.getFileDigest(self._tmpPath) - if gotHash != self._wantHash: - # XXXX Corrupt file. - pass +_socks_opener = thandy.socksurls.build_socks_opener() + +def getConnection(url, useTor, have_length=None): + """Open a connection to 'url'. We already have received + have_length bytes of the file we're trying to fetch, so resume + if possible. + + """ + headers = {} + urltype = urllib2.splittype(url)[0] + is_http = urltype in ["http", "https"] + + if have_length is not None and is_http: + headers['Range'] = "bytes=%s-"%(have_length+1) + + req = urllib2.Request(url, headers=headers) + + if useTor: + conn = _socks_opener.open(req) + else: + conn = urllib2.urlopen(req) + + return conn + + +if __name__ == '__main__': + # Trivial CLI to test out downloading. + + import getopt + options, args = getopt.getopt(sys.argv[1:], "", + ["use-tor", "socksport=", "threads="]) + + useTor = False + socksPort = 9050 + nThreads = 2 + for o,v in options: + if o == "--use-tor": + useTor = True + elif o == "--socksport": + socksPort = int(v) + elif o == "--threads": + nThreads = int(v) + + logging.basicConfig(level=logging.DEBUG) + + if useTor: + thandy.socksurls.setSocksProxy("127.0.0.1", socksPort) + + manager = DownloadManager(nThreads) + + for url in args: + fn = urllib2.splithost(urllib2.splittype(url)[1])[1] + fn = os.path.split(fn)[1] + + job = SimpleDownloadJob(fn, url, useTor=useTor) + manager.addDownloadJob(job) - thandy.utils.moveFile(self._tmpPath, self._destPath) + manager.start() + manager.wait() diff --git a/lib/thandy/formats.py b/lib/thandy/formats.py index 42e3d79..7ae6080 100644 --- a/lib/thandy/formats.py +++ b/lib/thandy/formats.py @@ -258,9 +258,9 @@ def getDigest(obj, digestObj=None): return digestObj.digest() def getFileDigest(f, digestObj=None): - """Update 'digestObj' (typically a SHA256 object) with the digest of - the file object in f. If digestObj is none, compute the SHA256 - hash and return it. + """Update 'digestObj' (typically a SHA256 object) with the digest + of the file object (or filename) in f. If digestObj is none, + compute the SHA256 hash and return it. >>> s = "here is a long string"*1000 >>> import cStringIO, Crypto.Hash.SHA256 @@ -271,15 +271,23 @@ def getFileDigest(f, digestObj=None): >>> h1.digest() == h2.digest() True """ + f_to_close = None + if isinstance(f, basestring): + t_to_close = f = open(f, 'rb') + useTempDigestObj = (digestObj == None) if useTempDigestObj: digestObj = Crypto.Hash.SHA256.new() - while 1: - s = f.read(4096) - if not s: - break - digestObj.update(s) + try: + while 1: + s = f.read(4096) + if not s: + break + digestObj.update(s) + finally: + if f_to_close != None: + f_to_close.close() if useTempDigestObj: return digestObj.digest() diff --git a/lib/thandy/packagesys/ExePackages.py b/lib/thandy/packagesys/ExePackages.py new file mode 100644 index 0000000..1688da3 --- /dev/null +++ b/lib/thandy/packagesys/ExePackages.py @@ -0,0 +1,33 @@ +# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information. + +import thandy.packagesys.PackageSystem as ps +import thandy.packagesys.PackageDB as pdb + +class ExePackageSystem(pdb.DBBackedPackageSystem): + + def getName(self): + return "executable" + + def packageHandleFromJSON(self, json): + raise NotImplemented() #XXXX???? + + def canBeAutomatic(self): + return True + + def canHaveUI(self): + return True + +class ExePackageHandle(pdb.DBBackedPackageHandle): + def __init__(self, packageDB, name, version, filelist, filename, + arguments): + pdb.DBBackedPackageHandle.__init__(packageDB, name, version, filelist) + self._filename = filename + self._arguments = arguments + + def _doInstall(self): + commandline = [ self._filename ] + self._arguments + logging.info("Installing %s. Command line: %s", self._filename, + commandLine) + subprocess.call(commandline) + + diff --git a/lib/thandy/packagesys/PackageDB.py b/lib/thandy/packagesys/PackageDB.py new file mode 100644 index 0000000..bb218be --- /dev/null +++ b/lib/thandy/packagesys/PackageDB.py @@ -0,0 +1,81 @@ +# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information. + +import anydbm +import shelve + +import thandy.util +import thandy.formats + +class SimplePackageDB: + + def __init__(self, filename): + self._db = anydbm.open(filename, 'c') + + def setVersion(self, package, version, filelist): + pass + + def setInstallParameters(self, package, params): + pass + + def getCurVersion(self, package): + pass + + def getInstallParameters(self, package): + pass + + +class DBBackedPackageSystem(thandy.packagesys.PackageSystem): + def __init__(self, packageDB): + self._packageDB = packageDB + +class DBBackedPackageHandle(thandy.packagesys.PackageHandle): + def __init__(self, packageDB, name, version, filelist): + thandy.packagesys.PackageSystem.PackageHandle.__init__(self) + self._packageDB = packageDB + self._name = name + self._version = version + self._filelist = filelist + + self._metaData = None + + def _getInstallBase(self): + raise NotImplemented() + + def anyVersionInstalled(self, transaction=None): + return self._packageDB.getCurVersion(self._name) != None + + def getInstalledVersion(self, transaction=None): + return self._packageDB.getCurVersion(self._name) + + def install(self): + params = self._doInstall() + self._packageDB.setCurVersion( + self._name, self._version, self._filelist) + self._packageDB.setInstallParameters(self._name, params) + + def _doInstall(self): + raise NotImplemented() + + def isInstalled(self): + return self.getInstalledVersion(self, transaction) == self._version + + def checkInstall(self): + base = self._getInstallBase() + + all_ok = True + for fn, hash in self._filelist: + fn = os.path.join(base, fn) + if not os.path.exists(fn): + all_ok = False + else: + f = open(fn, 'rb') + try: + try: + d = thandy.formats.getFileDigest(f) + except OSError: + all_ok = False + break + finally: + f.close() + + return all_ok diff --git a/lib/thandy/packagesys/PackageSystem.py b/lib/thandy/packagesys/PackageSystem.py new file mode 100644 index 0000000..ad1e221 --- /dev/null +++ b/lib/thandy/packagesys/PackageSystem.py @@ -0,0 +1,58 @@ +# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information. + +class PackageSystem: + def getName(self): + raise NotImplemented() + + def packageHandleFromJSON(self, json): + raise NotImplemented() + + def canBeAutomatic(self): + return True + + def canHaveUI(self): + return False + + def getTransaction(self): + return PackageTransaction() + +class PackageTransaction: + def __init__(self): + self._transactions = [] + + def _start(self): + pass + + def _commit(self): + pass + + def run(self): + self._start() + for cb in self._transactions: + cb(self) + self._commit() + + def addInstall(self, packageHandle): + self._transactions.append(packageHandle.install) + + def addRemove(self, packageHandle): + self._transactions.append(packageHandle.remove) + +class PackageHandle: + def isInstalled(self, transaction=None): + raise NotImplemented() + + def anyVersionInstalled(self, transaction=None): + raise NotImplemented() + + def getInstalledVersion(self, transaction=None): + raise NotImplemented() + + def install(self, transaction): + raise NotImplemented() + + def remove(self, transaction): + raise NotImplemented() + + def checkInstall(self, transaction=None): + raise NotImplemented() diff --git a/lib/thandy/packagesys/RPMPackages.py b/lib/thandy/packagesys/RPMPackages.py new file mode 100644 index 0000000..5e4c960 --- /dev/null +++ b/lib/thandy/packagesys/RPMPackages.py @@ -0,0 +1,156 @@ +# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information. + +import thandy.packagesys.PackageSystem + +import os +import rpm +import md5 + +__all__ = [ 'RPMPackageSystem' ] + +class RPMPackageSystem(thandy.packagesys.PackageSystem.PackageSystem): + def getName(self): + return "RPM" + + def packageHandleFromJSON(self, json): + raise NotImplemented() # XXXX + + def getTransaction(self): + return RPMPackageTransaction() + +_CALLBACK_CODES = {} + +for name in dir(rpm): + if name.startswith("RPMCALLBACK_"): + _CALLBACK_CODES[getattr(rpm, name)] = name[12:] +del name + +class RPMPackageTransaction(thandy.packagesys.PackageSystem.PackageTransaction): + + def _start(self): + thandy.packagesys.PackageSystem.PackageTransaction.__init__(self) + self._tset = rpm.TransactionSet() + + def _commit(self): + self._tset.run(self._callback, "") + + def _callback(self, what, amount, total, mydata, _): + if what == rpm.RPMCALLBACK_INST_OPEN_FILE: + hdr, path = mydata + logging.info("Installing RPM for %s [%s]", hdr['name'], path) + + elif what == rpm.RPMCALLBACK_INST_CLOSE_FILE: + hdr, path = mydata + logging.info("Done installing RPM for %s", path) + + elif what == rpm.RPMCALLBACK_INST_PROGRESS: + hdr, path = mydata + logging.info("%s: %.5s%% done", name, float(amount)/total*100) + + else: + hdr, path = mydata + logging.info("RPM event %s on %s [%s/%s]", + _CALLBACK_CODES.get(what,str(what)), + hdr['name'], amount, total) + +def addRPMInstall(ts, path): + fd = os.open(path, os.O_RDONLY) + try: + hdr = ts.hdrFromFdno(fd) + finally: + os.close(fd) + ts.addInstall(hdr, (hdr, path), "u") + +def addRPMErase(ts, name): + ts.addErase(name) + +def getInstalledRPMVersions(name, ts=None): + if ts is None: + ts = rpm.TransactionSet() + #XXXX need to close? + + versions = set() + for match in ts.dbMatch(rpm.RPMTAG_NAME, name): + versions.add(match['version']) + + return versions + +def fileMD5(fname): + d = md5.new() + try: + f = open(fname, 'r') + try: + while 1: + s = f.read(4096) + if not s: + break + d.update(s) + + finally: + f.close() + except OSError, e: + logging.warn("Couldn't get digest of %s: %s", fname, e) + return None + + return d.hexdigest() + +def checkRPMInstall(name, version, ts=None): + if ts is None: + ts = rpm.TransactionSet() + #XXXX need to close? + + found = False + all_ok = True + + for h in ts.dbMatch(rpm.RPMTAG_NAME, name): + if h['version'] != version: + continue + + found = True + + for fname, flags, md5sum in zip(h['filenames'], h['fileflags'], h['filemd5s']): + haveMD5 = fileMD5(fname) + if not haveMD5: + if flags & RPMFILE_MISSINGOK: + logging.info("%s is missing or unreadable from %s %s; " + "that's ok.", fname, name, h['version']) + else: + logging.warn("%s is missing or unreadable from %s %s." + fname, name, h['version']) + all_ok = False + elif haveMD5 == md5sum: + logging.info("%s is unchanged from %s %s", + fname, name, h['version']) + else: + # file changed. If it's not configuration, that's a problem. + if not flags & RPMFILE_CONFIG: + logging.warn("%s changed from installed version of %s %s", + fname, name, h['version']) + all_ok = False + + return found and all_ok + +class RPMPackageHandle(thandy.packagesys.PackageSystem.PackageHandle): + def __init__(self, name, version, filename): + self._name = name + self._version = version + self._filename = filename + + def anyVersionInstalled(self, transaction=None): + return len(getInstalledRPMVersions(self.name, transaction)) > 1 + + def getInstalledVersion(self, transaction=None): + s = max(getInstalledRPMVersions(self._name, transaction)) + + def install(self, transaction): + addRPMInstall(transaction._trans, self._filename) + + def remove(self, transaction): + addRPMErase(transaction._trans, self._name) + + def isInstalled(self, transaction=None): + return self._version in getInstalledRPMVersions(self._name,transaction) + + def checkInstall(self, transaction=None): + return checkRPMInstall(self._name, self._version) + diff --git a/lib/thandy/packagesys/__init__.py b/lib/thandy/packagesys/__init__.py new file mode 100644 index 0000000..4dd7019 --- /dev/null +++ b/lib/thandy/packagesys/__init__.py @@ -0,0 +1,4 @@ +# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information. + +__all__ = [ ] + diff --git a/lib/thandy/repository.py b/lib/thandy/repository.py index 1385dd6..af2fa4f 100644 --- a/lib/thandy/repository.py +++ b/lib/thandy/repository.py @@ -16,8 +16,18 @@ import time MAX_TIMESTAMP_AGE = 24*60*60 class RepositoryFile: + """Represents information about a file stored in our local repository + cache. Used to validate and load files. + """ def __init__(self, repository, relativePath, schema, needRole=None, signedFormat=True, needSigs=1): + """Allocate a new RepositoryFile for a file to be stored under + the LocalRepository 'repository' in relativePath. Make + sure the file validates with 'schema' (or its signed form, + if 'signedFormat'). When checking signatures, this file needs + at least 'needSigs' signatures with role 'needRole'. + """ + # These fields are as in the arguments. self._repository = repository self._relativePath = relativePath self._schema = schema @@ -25,17 +35,37 @@ class RepositoryFile: self._signedFormat = signedFormat self._needSigs = needSigs - self._signed_obj = self._main_obj = None + # The contents of the file, parsed. None if we haven't loaded + # the file. + self._main_obj = None + + # The contents of the file along with their signatures. May + # be aliased by _main_obj. None if we haven't loaded the + # file. + self._signed_obj = None + + # A SignatureStatus object, if we have checked signatures. + self._sigStatus = None + # The mtime of the file on disk, if we know it. + self._mtime = None + + def clear(self): + """DOCDOC""" + self._main_obj = self._signed_obj = None self._sigStatus = None self._mtime = None def getRelativePath(self): + """Return the filename for this item relative to the top of the + repository.""" return self._relativePath def getPath(self): + """Return the actual filename for this item.""" return self._repository.getFilename(self._relativePath) def _load(self): + """Helper: load and parse this item's contents.""" fname = self.getPath() # Propagate OSError @@ -59,6 +89,7 @@ class RepositoryFile: self._mtime = mtime def _save(self, content=None): + """Helper: Flush this object's contents to disk.""" if content == None: content = sexpr.encode @@ -69,9 +100,13 @@ class RepositoryFile: self._signed_obj = signed_obj self._main_obj = main_obj - self._mtime = mtime + self._mtime = time.time() def _checkContent(self, content): + """Helper. Check whether 'content' matches SIGNED_SCHEMA, and + self._schema (as appropraite). Return a tuple of the + signed_schema match, and the schema match, or raise + FormatException.""" try: obj = json.loads(content) @@ -94,20 +129,26 @@ class RepositoryFile: return signed_obj, main_obj def load(self): + """Load this object from disk if it hasn't already been loaded.""" if self._main_obj == None: self._load() def get(self): + """Return the object, or None if it isn't loaded.""" return self._main_obj def isLoaded(self): + """Return true iff this object is loaded.""" return self._main_obj != None def getContent(self): + """Load this object as needed and return its content.""" self.load() return self._main_obj def _checkSignatures(self): + """Helper: Try to verify all the signatures on this object, and + cache the SignatureStatus object.""" self.load() sigStatus = thandy.formats.checkSignatures(self._signed_obj, self._repository._keyDB, @@ -115,15 +156,47 @@ class RepositoryFile: self._sigStatus = sigStatus def checkSignatures(self): + """Try to verify all the signatures on this object if we + haven't already done so, and return a SignatureStatus + object.""" if self._sigStatus is None: self._checkSignatures() return self._sigStatus +class PkgFile: + def __init__(self, repository, relativePath, needHash): + self._repository = repository + self._relativePath = relativePath + self._needHash = needHash + + self._mtime = None + + def clear(self): + self._mtime = None + + def getRelativePath(self): + return self._relativePath + + def getPath(self): + return self._repository.getFilename(self._relativePath) + + def getExpectedHash(self): + return self._needHash + + def checkFile(self): + return self._needHash == self._repository.getFileDigest() + class LocalRepository: + """Represents a client's partial copy of a remote mirrored repository.""" def __init__(self, root): + """Create a new local repository that stores its files under 'root'""" + # Top of our mirror. self._root = root + + # A base keylist of master keys; we'll add others later. self._keyDB = thandy.util.getKeylist(None) + # Entries for the three invariant metafiles. self._keylistFile = RepositoryFile( self, "/meta/keys.txt", thandy.formats.KEYLIST_SCHEMA, needRole="master") @@ -133,28 +206,38 @@ class LocalRepository: self._mirrorlistFile = RepositoryFile( self, "/meta/mirrors.txt", thandy.formats.MIRRORLIST_SCHEMA, needRole="mirrors") + self._metaFiles = [ self._keylistFile, self._timestampFile, self._mirrorlistFile ] + # Map from relative path to a RepositoryFile for packages. self._packageFiles = {} + + # Map from relative path to a RepositoryFile for bundles. self._bundleFiles = {} def getFilename(self, relativePath): + """Return the file on disk that caches 'relativePath'.""" if relativePath.startswith("/"): relativePath = relativePath[1:] return os.path.join(self._root, relativePath) def getKeylistFile(self): + """Return a RepositoryFile for our keylist.""" return self._keylistFile def getTimestampFile(self): + """Return a RepositoryFile for our timestamp file.""" return self._timestampFile def getMirrorlistFile(self): + """Return a RepositoryFile for our mirrorlist.""" return self._mirrorlistFile def getPackageFile(self, relPath): + """Return a RepositoryFile for a package stored at relative path + 'relPath'.""" try: return self._packageFiles[relPath] except KeyError: @@ -164,6 +247,8 @@ class LocalRepository: return pkg def getBundleFile(self, relPath): + """Return a RepositoryFile for a bundle stored at relative path + 'relPath'.""" try: return self._bundleFiles[relPath] except KeyError: @@ -172,10 +257,38 @@ class LocalRepository: needRole='bundle') return pkg - def getFilesToUpdate(self, now=None, trackingBundles=()): + def getRequestedFile(self, relPath): + """ """ + for f in self._metafiles: + if f.getRelativePath() == relPath: + return f + for f in self._bundleFiles.itervalues(): + if f.getRelativePath() == relPath: + return f + for f in self._packageFiles.itervalues(): + if f.getRelativePath() == relPath: + return f + f.load() + for item in f.get()['files']: + rp, h = item[:2] + if rp == relPath: + return PkgFile(self, rp, thandy.formats.parseHash(h)) + + def getFilesToUpdate(self, now=None, trackingBundles=(), hashDict=None): + """Return a set of relative paths for all files that we need + to fetch. Assumes that we care about the bundles + 'trackingBundles'. If hashDict is provided, add mappings to it + from the relative paths we want to fecth to the hashes that we + want those items to have, when we know those hashes. + """ + if now == None: now = time.time() + if hashDict == None: + # Use a dummy hashdict. + hashDict = {} + need = set() # Fetch missing metafiles. @@ -196,6 +309,8 @@ class LocalRepository: age = now - thandy.formats.parseTime(ts['at']) ts = thandy.formats.TimestampFile.fromJSon(ts) if age > MAX_TIMESTAMP_AGE: + logging.info("Timestamp file from %s is out of " + "date; must fetch it.", ts['at']) need.add(self._timestampFile.getRelativePath()) # If the keylist isn't signed right, we can't check the @@ -203,6 +318,8 @@ class LocalRepository: if self._keylistFile.get(): s = self._keylistFile.checkSignatures() if not s.isValid(): # For now only require one master key. + logging.info("Key list is not properly signed; must get a " + "new one.") need.add(self._keylistFile.getRelativePath()) if need: @@ -215,6 +332,8 @@ class LocalRepository: # new keylist. s = self._timestampFile.checkSignatures() if not s.isValid(): + logging.info("Timestamp file is not properly signed; fetching new " + "timestamp file and keylist.") need.add(self._keylistFile.getRelativePath()) need.add(self._timestampFile.getRelativePath()) return need @@ -222,9 +341,15 @@ class LocalRepository: # FINALLY, we know we have an up-to-date, signed timestamp # file. Check whether the keys and mirrors file are as # authenticated. + hashDict[self._keylistFile.getRelativePath()] = \ + ts.getKeylistInfo().getHash() + hashDict[self._mirrorlistFile.getRelativePath()] = \ + ts.getMirrorlistInfo().getHash() + h_kf = thandy.formats.getDigest(self._keylistFile.get()) h_expected = ts.getKeylistInfo().getHash() if h_kf != h_expected: + logging.info("Keylist file hash did not match. Must fetch it.") need.add(self._keylistFile.getRelativePath()) if need: @@ -232,11 +357,13 @@ class LocalRepository: s = self._mirrorlistFile.checkSignatures() if not s.isValid(): + logging.info("Mirrorlist file signatures not valid. Must fetch.") need.add(self._mirrorlistFile.getRelativePath()) h_mf = thandy.formats.getDigest(self._mirrorlistFile.get()) h_expected = ts.getMirrorlistInfo().getHash() if h_mf != h_expected: + logging.info("Mirrorlist file hash did not match. Must fetch.") need.add(self._mirrorlistFile.getRelativePath()) if need: @@ -249,26 +376,30 @@ class LocalRepository: try: binfo = ts.getBundleInfo(b) except KeyError: - logging.warn("Unrecognized bundle %s"%b) + logging.warn("Bundle %s not listed in timestamp file."%b) continue rp = binfo.getRelativePath() + hashDict[rp] = h_expected = binfo.getHash() bfile = self.getBundleFile(rp) try: bfile.load() except OSError: + logging.info("Can't find bundle %s on disk; must fetch.", rp) need.add(rp) continue h_b = thandy.formats.getDigest(bfile.get()) - h_expected = binfo.getHash() if h_b != h_expected: + logging.info("Bundle hash not as expected; must fetch.", rp) need.add(rp) continue s = bfile.checkSignatures() if not s.isValid(): # Can't actually use it. + logging.warn("Bundle hash was as expected, but signatures did " + "not match.") continue bundles[rp] = bfile @@ -280,20 +411,26 @@ class LocalRepository: for pkginfo in bundle['packages']: rp = pkginfo['path'] pfile = self.getPackageFile(rp) + h_expected = thandy.formats.parseHash(pkginfo['hash']) + hashDict[rp] = h_expected try: pfile.load() except OSError: + logging.info("Can't find package %s on disk; must fetch.", + rp) need.add(rp) continue h_p = thandy.formats.getDigest(pfile.get()) - h_expected = thandy.formats.parseHash(pkginfo['hash']) if h_p != h_expected: + logging.info("Wrong hash for package %s; must fetch.", rp) need.add(rp) continue s = pfile.checkSignatures() if not s.isValid(): + logging.warn("Package hash was as expected, but signature " + "did nto match") # Can't use it. continue packages[rp] = pfile @@ -305,13 +442,17 @@ class LocalRepository: for f in package['files']: rp, h = f[:2] h_expected = thandy.formats.parseHash(h) + hashDict[rp] = h_expected fn = self.getFilename(rp) try: h_got = thandy.formats.getFileDigest(fn) except OSError: + logging.info("Installable file %s not found on disk; " + "must load", rp) need.add(rp) continue if h_got != h_expected: + logging.info("Hash for %s not as expected; must load.", rp) need.add(rp) # Okay; these are the files we need. diff --git a/lib/thandy/socksurls.py b/lib/thandy/socksurls.py new file mode 100644 index 0000000..d035a6c --- /dev/null +++ b/lib/thandy/socksurls.py @@ -0,0 +1,93 @@ +# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information. + +"""Implements URL types for socks-mediated connections.""" + +import socket +import httplib +import logging +import struct +import urllib2 + +# XXXX This isn't really threadsafe, but for now we don't change this after +# startup. +SOCKS_HOST = None +SOCKS_PORT = None + +def setSocksProxy(host, port): + """Set the global SOCKS proxy to host:port.""" + global SOCKS_HOST, SOCKS_PORT + SOCKS_HOST = host + SOCKS_PORT = port + +def _recvall(sock, n): + """Helper: fetch N bytes from the socket sock.""" + result = "" + while 1: + s = sock.recv(n) + if not s: + return result + result += s + n -= len(s) + if n <= 0: + return result + +def socks_connect(host, port): + """Helper: use the SOCKS proxy to open a connection to host:port. + Uses the simple and Tor-friendly SOCKS4a protocol.""" + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + logging.debug("Connecting to SOCKS proxy") + sock.connect((SOCKS_HOST, SOCKS_PORT)) + + # Now, the handshake! We just do socks4a, since that's the simplest. + version = 4 # socks 4 + command = 1 # connect + addr = 1 # 0.0.0.1, signals socks4a. + userid = "" + + messageheader = struct.pack("!BBHL", version, command, port, addr) + message = "%s%s\x00%s\x00" % (messageheader, userid, host) + + sock.sendall(message) + + logging.debug("Waiting for reply from SOCKS proxy") + reply = _recvall(sock, 8) + code = ord(reply[1]) + if code == 0x5a: + logging.debug("SOCKS proxy is connected.") + return sock + else: + raise socket.error("Bad SOCKS response code from proxy: %d", code) + except: + sock.close() + raise + +# Copies of HTTPConnection and HTTPSConnection that use socks instead of +# direct connections. +class SocksHTTPConnection(httplib.HTTPConnection): + def connect(self): + self.sock = socks_connect(self.host, self.port) +class SocksHTTPSConnection(httplib.HTTPSConnection): + def connect(self): + socket = socks_connect(self.host, self.port) + ssl = socket.ssl(sock, None, None) + self.sock = socket.FakeSocket(socket, ssl) + +# URL handlers for HTTP and HTTPS urls that use socks instead of direct +# connections. +class SocksHTTPHandler(urllib2.AbstractHTTPHandler): + def http_open(self, req): + return self.do_open(SocksHTTPConnection, req) + http_request = urllib2.AbstractHTTPHandler.do_request_ +class SocksHTTPSHandler(urllib2.AbstractHTTPHandler): + def https_open(self, req): + return self.do_open(SocksHTTPSConnection, req) + https_request = urllib2.AbstractHTTPHandler.do_request_ + +def build_socks_opener(): + """Return an urllib2.OpenerDirector object to open HTTP and HTTPS + urls using SOCKS connections.""" + opener = urllib2.OpenerDirector() + opener.add_handler(SocksHTTPSHandler()) + opener.add_handler(SocksHTTPHandler()) + return opener diff --git a/lib/thandy/util.py b/lib/thandy/util.py index b7281ae..2db7df4 100644 --- a/lib/thandy/util.py +++ b/lib/thandy/util.py @@ -3,6 +3,7 @@ import os import sys import tempfile +import random try: import json @@ -20,6 +21,7 @@ def moveFile(fromLocation, toLocation): os.unlink(toLocation) except OSError: pass + os.rename(fromLocation, toLocation) @@ -75,3 +77,21 @@ def getKeylist(keys_fname, checkKeys=True): keydb.addFromKeylist(obj['signed'], allowMasterKeys=False) return keydb + +def randChooseWeighted(lst): + """Given a list of (weight,item) tuples, pick an item with + probability proportional to its weight. + """ + + totalweight = sum(w for w,i in lst) + position = random.uniform(0, totalweight) + soFar = 0 + + # We could use bisect here, but this is not going to be in the + # critical path. If it is, oops. + for w,i in lst: + soFar += w + if position < soFar: + return i + + return lst[-1][1] -- cgit v1.2.3