diff options
| -rw-r--r-- | lib/thandy/ClientCLI.py | 9 | ||||
| -rw-r--r-- | lib/thandy/__init__.py | 2 | ||||
| -rw-r--r-- | lib/thandy/download.py | 331 | ||||
| -rw-r--r-- | lib/thandy/formats.py | 24 | ||||
| -rw-r--r-- | lib/thandy/packagesys/ExePackages.py | 33 | ||||
| -rw-r--r-- | lib/thandy/packagesys/PackageDB.py | 81 | ||||
| -rw-r--r-- | lib/thandy/packagesys/PackageSystem.py | 58 | ||||
| -rw-r--r-- | lib/thandy/packagesys/RPMPackages.py | 156 | ||||
| -rw-r--r-- | lib/thandy/packagesys/__init__.py | 4 | ||||
| -rw-r--r-- | lib/thandy/repository.py | 153 | ||||
| -rw-r--r-- | lib/thandy/socksurls.py | 93 | ||||
| -rw-r--r-- | lib/thandy/util.py | 20 | 
12 files changed, 890 insertions, 74 deletions
diff --git a/lib/thandy/ClientCLI.py b/lib/thandy/ClientCLI.py index ba7fb2e..ad39a2d 100644 --- a/lib/thandy/ClientCLI.py +++ b/lib/thandy/ClientCLI.py @@ -33,11 +33,11 @@ def update(args):      for f in files:          # XXXX Use hash. -        dj = thandy.download.DownloadJob(f, repo.getFilename(f), -                                         mirrorlist) +        dj = thandy.download.ThandyDownloadJob(f, repo.getFilename(f), +                                               mirrorlist)          downloader.addDownloadJob(dj)          # XXXX replace file in repository if ok; reload; see what changed. -     +      # Wait for in-progress jobs  # Check my repository @@ -48,6 +48,7 @@ def update(args):  # Tell me what to install. +  def usage():      print "Known commands:"      print "  update [--repo=repository] [--no-download]" @@ -58,7 +59,7 @@ def main():          usage()      cmd = sys.argv[1]      args = sys.argv[2:] -    if cmd in [ "update" ]: +    if cmd in [ "update", "geturls" ]:          globals()[cmd](args)      else:          usage() diff --git a/lib/thandy/__init__.py b/lib/thandy/__init__.py index 03b262e..5b53bfe 100644 --- a/lib/thandy/__init__.py +++ b/lib/thandy/__init__.py @@ -34,3 +34,5 @@ class PubkeyFormatException(FormatException):  class UnknownMethod(CryptoError):      pass +class DownloadError(Exception): +    pass diff --git a/lib/thandy/download.py b/lib/thandy/download.py index bf7dc43..73226c7 100644 --- a/lib/thandy/download.py +++ b/lib/thandy/download.py @@ -1,28 +1,50 @@  # Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information. -import urllib2  import httplib +import logging +import os +import Queue  import random - -import threading, Queue +import sys +import threading +import traceback +import urllib2  import thandy.util +import thandy.socksurls -class Downloads: + +class DownloadManager: +    """Class to track a set of downloads and pass them out to worker threads. +    """      def __init__(self, n_threads=2): +        # Prevents concurrent modification to downloads and haveDownloaded          self._lock = threading.RLock() +        # Map from resource relPath to job.          self.downloads = {} +        # Map from resource relPath from True to objects that we have +        # managed to dowload.          self.haveDownloaded = {} +        # Work queue of DownloadJobs that we intend to process once a thread +        # is free.          self.downloadQueue = Queue.Queue() -        self.threads = [ threading.Thread(target=self._thread) ] +        # List of worker threads. +        self.threads = [ threading.Thread(target=self._thread, args=[idx]) +                         for idx in xrange(n_threads) ] +        # Condition that gets triggered whenever a thread is finished doing +        # something. +        self.done = threading.Condition()          for t in self.threads:              t.setDaemon(True)      def start(self): +        """Start all of this download manager's worker threads."""          for t in self.threads:              t.start()      def isCurrentlyDownloading(self, relPath): +        """Return true iff this download manager is currently downloading +           some copy of the resource at relPath."""          self._lock.acquire()          try:              return self.downloads.has_key(relPath) @@ -30,6 +52,9 @@ class Downloads:              self._lock.release()      def isRedundant(self, relPath): +        """Return true iff we are currently downloading, or have +           downloaded, the resource at relPath.""" +          self._lock.acquire()          try:              return (self.downloads.has_key(relPath) or @@ -37,91 +62,285 @@ class Downloads:          finally:              self._lock.release() +    def finished(self): +        """Return true iff we have no active or pending jobs.""" +        self._lock.acquire() +        try: +            return downloadQueue.empty() and len(self.downloads) == 0 +        finally: +            self._lock.release() + +    def wait(self): +        """Pause until we have no active or pending jobs.""" +        while True: +            self.done.acquire() +            self.done.wait() +            self.done.release() + +            if self.finished(): +                break +      def addDownloadJob(self, job): +        """Add another DownloadJob to the end of the work queue."""          rp = job.getRelativePath()          self._lock.acquire()          self.downloads[rp] = job          self._lock.release()          self.downloadQueue.put(job) -    def _thread(self): +    def downloadFailed(self, mirror, relpath): +        """DOCDOC""" +        pass # Track failure; don't try the same mirror right away. + +    def _thread(self, idx): +        # Run in the background per thread.  idx is the number of the thread.          while True: -            job = self.downloadQueue.get() -            job.download() +            job = self.downloadQueue.get() # Grab job from queue.              rp = job.getRelativePath() -            self._lock.acquire()              try: -                del self.downloads[rp] -                self.haveDownloaded[rp] = True +                logging.info("start %s in Thread %s", rp, idx) +                success = job.download() # Execute the download. +                logging.info("end %s in Thread %s", rp, idx)              finally: -                self._lock.release() +                self._lock.acquire() +                try: +                    del self.downloads[rp] +                    if success: # If we downloaded correctly, say so. +                        self.haveDownloaded[rp] = True +                finally: +                    self._lock.release() + +                self.done.acquire() +                self.done.notify() +                self.done.release()  class DownloadJob: -    def __init__(self, relPath, destPath, mirrorlist=None, -                 wantHash=None, canStall=False): -        self._relPath = relPath +    """Abstract base class.  Represents a thing to be downloaded, and the +       knowledge of how to download it.""" +    def __init__(self, targetPath, tmpPath, wantHash=None, useTor=False): +        """Create a new DownloadJob.  When it is finally downloaded, +           store it in targetPath.  Store partial results in tmpPath; +           if there is already a file in tmpPath, assume that it is an +           incomplete download. If wantHash, reject the file unless +           the hash is as given.  If useTor, use a socks connection.""" + +        self._destPath = targetPath +        self._tmpPath = tmpPath          self._wantHash = wantHash -        self._mirrorList = mirrorlist -        self._destPath = destPath +        self._useTor = useTor + +    def getURL(self): +        """Abstract implementation helper.  Returns the URL that the +           _download function downloads from.""" +        raise NotImplemented() + +    def getRelativePath(self): +        """Abstract. Returns a string representing this download, to +           keep two downloads of the same object from running at once. +           In Thandy, this is usually a relative path of a downloaded +           object within the repository. +        """ +        raise NotImplemented() + +    def haveStalledFile(self): +        """Return true iff we have an existing incomplete download stored in +           the temporary file. +        """ +        return os.path.exists(self._tmpPath) + +    def download(self): +        """Main interface function: Start the download, and return +           when complete. +        """ +        try: +            self._download() +            return True +        except (OSError, thandy.DownloadError), err: +            # XXXXX retry on failure +            logging.warn("Download failed: %s", err) +            return False +        except: +            tp, val, tb = sys.exc_info() +            logging.warn("Internal during download: %s, %s", val, +                         traceback.format_exc()) +            sys.exit(1) + + +    def _download(self): +        # Implementation function.  Unlike download(), can throw exceptions. +        f_in = f_out = None + +        try: +            url = self.getURL() + +            logging.info("Downloading %s", url) + +            if self.haveStalledFile(): +                have_length = os.stat(self._tmpPath).st_size +                print "Have stalled file with %s bytes"%have_length +            else: +                have_length = None + +            f_in = getConnection(url, self._useTor, have_length) + +            logging.info("Connected to %s", url) + +            gotRange = f_in.info().get("Content-Range") +            expectLength = f_in.info().get("Content-Length", "???") +            if gotRange: +                if gotRange.startswith("bytes %s-"%(have_length+1)): +                    logging.info("Resuming download from %s"%url) +                    f_out = open(self._tmpPath, 'a') +                else: +                    raise thandy.DownloadError("Got an unexpected range %s" +                                               %gotRange) +            else: +                f_out = open(self._tmpPath, 'w') + +            total = 0 +            while True: +                c = f_in.read(1024) +                if not c: +                    break +                f_out.write(c) +                total += len(c) +                logging.debug("Got %s/%s bytes from %s", +                              total, expectLength, url) + +        finally: +            if f_in is not None: +                f_in.close() +            if f_out is not None: +                f_out.close() + +        if self._wantHash: +            gotHash = thandy.formats.getFileDigest(self._tmpPath) +            if gotHash != self._wantHash: +                raise thandy.DownloadError("File hash was not as expected.") + +        thandy.util.moveFile(self._tmpPath, self._destPath) + + +class SimpleDownloadJob(DownloadJob): +    """Testing subtype of DownloadJob: just downloads a URL and writes it to +       disk.""" +    def __init__(self, targetPath, url, +                 wantHash=None, supportedURLTypes=None, useTor=False): +        DownloadJob.__init__(self, targetPath, targetPath+".tmp", +                                 wantHash=wantHash, +                                 useTor=useTor) +        self._url = url + +    def getURL(self): +        return self._url + +    def getRelativePath(self): +        return self._url + +class ThandyDownloadJob(DownloadJob): +    """Thandy's subtype of DownloadJob: knows about mirrors, weighting, +       and Thandy's directory structure.""" +    def __init__(self, relPath, destPath, mirrorList, wantHash=None, +                 supportedURLTypes=None, useTor=None): + +        DownloadJob.__init__(self, destPath, None, wantHash=wantHash, +                             useTor=useTor) +        self._mirrorList = mirrorList[:] +        self._relPath = relPath          tmppath = thandy.util.userFilename("tmp")          if relPath.startswith("/"):              relPath = relPath[1:] -        self._tmppath = os.path.join(tmppath, relPath) +        self._tmpPath = os.path.join(tmppath, relPath) -        d = os.path.dirname(self._tmppath) +        d = os.path.dirname(self._tmpPath)          if not os.path.exists(d):              os.makedirs(d, 0700) -    def getRelativePath(self): -        return self._relPath +        self._supportedURLTypes = None +        if self._supportedURLTypes is None and useTor: +            self._supportedURLTypes = [ "http", "https" ] -    def haveStalledFile(self): -        return os.path.exists(self._tmppath) -    def getURL(self, mirrorlist=None): -        if mirrorlist is None: -            mirrorlist = self._mirrorList -        weightSoFar = 0 +    def getURL(self):          usable = [] -        for m in mirrorlist['mirrors']: +        for m in self._mirrorList['mirrors']:              for c in m['contents']: -                # CHECK FOR URL SUITABILITY XXXXX + +                if self._supportedURLTypes is not None: +                    urltype = urllib2.splittype(m['urlbase'][0]) +                    if urltype.lower() not in self._supportedURLTypes: +                        continue                  if thandy.formats.rolePathMatches(c, self._relPath): -                    weightSoFar += m['weight'] -                    usable.append( (weightSoFar, m) ) +                    usable.append( (m['weight'], m) )                      break -        wTarget = random.randint(0, weightSoFar) -        mirror = None -        # Could use bisect here instead -        for w, m in mirrorlist: -            if w >= wTarget: -                mirror = m -                break +        mirror = thandy.util.randChooseWeighted(usable)          return m['urlbase'] + self._relPath -    def download(self): -        # XXXX RESUME +    def getRelativePath(self): +        return self._relPath -        f_in = urllib2.urlopen(self.getURL()) -        f_out = open(self._tmpPath, 'w') -        while True: -            c = f_in.read(1024) -            if not c: -                break -            f_out.write(c) -        f_in.close() -        f_out.close() -        # XXXXX retry on failure -        if self._wantHash: -            gotHash = thandy.formats.getFileDigest(self._tmpPath) -            if gotHash != self._wantHash: -                # XXXX Corrupt file. -                pass +_socks_opener = thandy.socksurls.build_socks_opener() + +def getConnection(url, useTor, have_length=None): +    """Open a connection to 'url'.  We already have received +       have_length bytes of the file we're trying to fetch, so resume +       if possible. + +    """ +    headers = {} +    urltype = urllib2.splittype(url)[0] +    is_http = urltype in ["http", "https"] + +    if have_length is not None and is_http: +        headers['Range'] = "bytes=%s-"%(have_length+1) + +    req = urllib2.Request(url, headers=headers) + +    if useTor: +        conn = _socks_opener.open(req) +    else: +        conn = urllib2.urlopen(req) + +    return conn + + +if __name__ == '__main__': +    # Trivial CLI to test out downloading. + +    import getopt +    options, args = getopt.getopt(sys.argv[1:], "", +                                  ["use-tor", "socksport=", "threads="]) + +    useTor = False +    socksPort = 9050 +    nThreads = 2 +    for o,v in options: +        if o == "--use-tor": +            useTor = True +        elif o == "--socksport": +            socksPort = int(v) +        elif o == "--threads": +            nThreads = int(v) + +    logging.basicConfig(level=logging.DEBUG) + +    if useTor: +        thandy.socksurls.setSocksProxy("127.0.0.1", socksPort) + +    manager = DownloadManager(nThreads) + +    for url in args: +        fn = urllib2.splithost(urllib2.splittype(url)[1])[1] +        fn = os.path.split(fn)[1] + +        job = SimpleDownloadJob(fn, url, useTor=useTor) +        manager.addDownloadJob(job) -        thandy.utils.moveFile(self._tmpPath, self._destPath) +    manager.start() +    manager.wait() diff --git a/lib/thandy/formats.py b/lib/thandy/formats.py index 42e3d79..7ae6080 100644 --- a/lib/thandy/formats.py +++ b/lib/thandy/formats.py @@ -258,9 +258,9 @@ def getDigest(obj, digestObj=None):          return digestObj.digest()  def getFileDigest(f, digestObj=None): -    """Update 'digestObj' (typically a SHA256 object) with the digest of -       the file object in f.  If digestObj is none, compute the SHA256 -       hash and return it. +    """Update 'digestObj' (typically a SHA256 object) with the digest +       of the file object (or filename) in f.  If digestObj is none, +       compute the SHA256 hash and return it.         >>> s = "here is a long string"*1000         >>> import cStringIO, Crypto.Hash.SHA256 @@ -271,15 +271,23 @@ def getFileDigest(f, digestObj=None):         >>> h1.digest() == h2.digest()         True      """ +    f_to_close = None +    if isinstance(f, basestring): +        t_to_close = f = open(f, 'rb') +      useTempDigestObj = (digestObj == None)      if useTempDigestObj:          digestObj = Crypto.Hash.SHA256.new() -    while 1: -        s = f.read(4096) -        if not s: -            break -        digestObj.update(s) +    try: +        while 1: +            s = f.read(4096) +            if not s: +                break +            digestObj.update(s) +    finally: +        if f_to_close != None: +            f_to_close.close()      if useTempDigestObj:          return digestObj.digest() diff --git a/lib/thandy/packagesys/ExePackages.py b/lib/thandy/packagesys/ExePackages.py new file mode 100644 index 0000000..1688da3 --- /dev/null +++ b/lib/thandy/packagesys/ExePackages.py @@ -0,0 +1,33 @@ +# Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information. + +import thandy.packagesys.PackageSystem as ps +import thandy.packagesys.PackageDB as pdb + +class ExePackageSystem(pdb.DBBackedPackageSystem): + +    def getName(self): +        return "executable" + +    def packageHandleFromJSON(self, json): +        raise NotImplemented()  #XXXX???? + +    def canBeAutomatic(self): +        return True + +    def canHaveUI(self): +        return True + +class ExePackageHandle(pdb.DBBackedPackageHandle): +    def __init__(self, packageDB, name, version, filelist, filename, +                 arguments): +        pdb.DBBackedPackageHandle.__init__(packageDB, name, version, filelist) +        self._filename = filename +        self._arguments = arguments + +    def _doInstall(self): +        commandline = [ self._filename ] + self._arguments +        logging.info("Installing %s.  Command line: %s", self._filename, +                     commandLine) +        subprocess.call(commandline) + + diff --git a/lib/thandy/packagesys/PackageDB.py b/lib/thandy/packagesys/PackageDB.py new file mode 100644 index 0000000..bb218be --- /dev/null +++ b/lib/thandy/packagesys/PackageDB.py @@ -0,0 +1,81 @@ +# Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information. + +import anydbm +import shelve + +import thandy.util +import thandy.formats + +class SimplePackageDB: + +    def __init__(self, filename): +        self._db = anydbm.open(filename, 'c') + +    def setVersion(self, package, version, filelist): +        pass + +    def setInstallParameters(self, package, params): +        pass + +    def getCurVersion(self, package): +        pass + +    def getInstallParameters(self, package): +        pass + + +class DBBackedPackageSystem(thandy.packagesys.PackageSystem): +    def __init__(self, packageDB): +        self._packageDB = packageDB + +class DBBackedPackageHandle(thandy.packagesys.PackageHandle): +    def __init__(self, packageDB, name, version, filelist): +        thandy.packagesys.PackageSystem.PackageHandle.__init__(self) +        self._packageDB = packageDB +        self._name = name +        self._version = version +        self._filelist = filelist + +        self._metaData = None + +    def _getInstallBase(self): +        raise NotImplemented() + +    def anyVersionInstalled(self, transaction=None): +        return self._packageDB.getCurVersion(self._name) != None + +    def getInstalledVersion(self, transaction=None): +        return self._packageDB.getCurVersion(self._name) + +    def install(self): +        params = self._doInstall() +        self._packageDB.setCurVersion( +            self._name, self._version, self._filelist) +        self._packageDB.setInstallParameters(self._name, params) + +    def _doInstall(self): +        raise NotImplemented() + +    def isInstalled(self): +        return self.getInstalledVersion(self, transaction) == self._version + +    def checkInstall(self): +        base = self._getInstallBase() + +        all_ok = True +        for fn, hash in self._filelist: +            fn = os.path.join(base, fn) +            if not os.path.exists(fn): +                all_ok = False +            else: +                f = open(fn, 'rb') +                try: +                    try: +                        d = thandy.formats.getFileDigest(f) +                    except OSError: +                        all_ok = False +                        break +                finally: +                    f.close() + +        return all_ok diff --git a/lib/thandy/packagesys/PackageSystem.py b/lib/thandy/packagesys/PackageSystem.py new file mode 100644 index 0000000..ad1e221 --- /dev/null +++ b/lib/thandy/packagesys/PackageSystem.py @@ -0,0 +1,58 @@ +# Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information. + +class PackageSystem: +    def getName(self): +        raise NotImplemented() + +    def packageHandleFromJSON(self, json): +        raise NotImplemented() + +    def canBeAutomatic(self): +        return True + +    def canHaveUI(self): +        return False + +    def getTransaction(self): +        return PackageTransaction() + +class PackageTransaction: +    def __init__(self): +        self._transactions = [] + +    def _start(self): +        pass + +    def _commit(self): +        pass + +    def run(self): +        self._start() +        for cb in self._transactions: +            cb(self) +        self._commit() + +    def addInstall(self, packageHandle): +        self._transactions.append(packageHandle.install) + +    def addRemove(self, packageHandle): +        self._transactions.append(packageHandle.remove) + +class PackageHandle: +    def isInstalled(self, transaction=None): +        raise NotImplemented() + +    def anyVersionInstalled(self, transaction=None): +        raise NotImplemented() + +    def getInstalledVersion(self, transaction=None): +        raise NotImplemented() + +    def install(self, transaction): +        raise NotImplemented() + +    def remove(self, transaction): +        raise NotImplemented() + +    def checkInstall(self, transaction=None): +        raise NotImplemented() diff --git a/lib/thandy/packagesys/RPMPackages.py b/lib/thandy/packagesys/RPMPackages.py new file mode 100644 index 0000000..5e4c960 --- /dev/null +++ b/lib/thandy/packagesys/RPMPackages.py @@ -0,0 +1,156 @@ +# Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information. + +import thandy.packagesys.PackageSystem + +import os +import rpm +import md5 + +__all__ = [ 'RPMPackageSystem' ] + +class RPMPackageSystem(thandy.packagesys.PackageSystem.PackageSystem): +    def getName(self): +        return "RPM" + +    def packageHandleFromJSON(self, json): +        raise NotImplemented() # XXXX + +    def getTransaction(self): +        return RPMPackageTransaction() + +_CALLBACK_CODES = {} + +for name in dir(rpm): +    if name.startswith("RPMCALLBACK_"): +        _CALLBACK_CODES[getattr(rpm, name)] = name[12:] +del name + +class RPMPackageTransaction(thandy.packagesys.PackageSystem.PackageTransaction): + +    def _start(self): +        thandy.packagesys.PackageSystem.PackageTransaction.__init__(self) +        self._tset = rpm.TransactionSet() + +    def _commit(self): +        self._tset.run(self._callback, "") + +    def _callback(self, what, amount, total, mydata, _): +        if what == rpm.RPMCALLBACK_INST_OPEN_FILE: +            hdr, path = mydata +            logging.info("Installing RPM for %s [%s]", hdr['name'], path) + +        elif what == rpm.RPMCALLBACK_INST_CLOSE_FILE: +            hdr, path = mydata +            logging.info("Done installing RPM for %s", path) + +        elif what == rpm.RPMCALLBACK_INST_PROGRESS: +            hdr, path = mydata +            logging.info("%s: %.5s%% done", name, float(amount)/total*100) + +        else: +            hdr, path = mydata +            logging.info("RPM event %s on %s [%s/%s]", +                         _CALLBACK_CODES.get(what,str(what)), +                         hdr['name'], amount, total) + +def addRPMInstall(ts, path): +    fd = os.open(path, os.O_RDONLY) +    try: +        hdr = ts.hdrFromFdno(fd) +    finally: +        os.close(fd) +    ts.addInstall(hdr, (hdr, path), "u") + +def addRPMErase(ts, name): +    ts.addErase(name) + +def getInstalledRPMVersions(name, ts=None): +    if ts is None: +        ts = rpm.TransactionSet() +        #XXXX need to close? + +    versions = set() +    for match in ts.dbMatch(rpm.RPMTAG_NAME, name): +        versions.add(match['version']) + +    return versions + +def fileMD5(fname): +    d = md5.new() +    try: +        f = open(fname, 'r') +        try: +            while 1: +                s = f.read(4096) +                if not s: +                    break +                d.update(s) + +        finally: +            f.close() +    except OSError, e: +        logging.warn("Couldn't get digest of %s: %s", fname, e) +        return None + +    return d.hexdigest() + +def checkRPMInstall(name, version, ts=None): +    if ts is None: +        ts = rpm.TransactionSet() +        #XXXX need to close? + +    found = False +    all_ok = True + +    for h in ts.dbMatch(rpm.RPMTAG_NAME, name): +        if h['version'] != version: +            continue + +        found = True + +        for fname, flags, md5sum in zip(h['filenames'], h['fileflags'], h['filemd5s']): +            haveMD5 = fileMD5(fname) +            if not haveMD5: +                if flags & RPMFILE_MISSINGOK: +                    logging.info("%s is missing or unreadable from %s %s; " +                                 "that's ok.", fname, name, h['version']) +                else: +                    logging.warn("%s is missing or unreadable from %s %s." +                                 fname, name, h['version']) +                    all_ok = False +            elif haveMD5 == md5sum: +                logging.info("%s is unchanged from %s %s", +                             fname, name, h['version']) +            else: +                # file changed.  If it's not configuration, that's a problem. +                if not flags & RPMFILE_CONFIG: +                    logging.warn("%s changed from installed version of %s %s", +                                 fname, name, h['version']) +                    all_ok = False + +    return found and all_ok + +class RPMPackageHandle(thandy.packagesys.PackageSystem.PackageHandle): +    def __init__(self, name, version, filename): +        self._name = name +        self._version = version +        self._filename = filename + +    def anyVersionInstalled(self, transaction=None): +        return len(getInstalledRPMVersions(self.name, transaction)) > 1 + +    def getInstalledVersion(self, transaction=None): +        s = max(getInstalledRPMVersions(self._name, transaction)) + +    def install(self, transaction): +        addRPMInstall(transaction._trans, self._filename) + +    def remove(self, transaction): +        addRPMErase(transaction._trans, self._name) + +    def isInstalled(self, transaction=None): +        return self._version in getInstalledRPMVersions(self._name,transaction) + +    def checkInstall(self, transaction=None): +        return checkRPMInstall(self._name, self._version) + diff --git a/lib/thandy/packagesys/__init__.py b/lib/thandy/packagesys/__init__.py new file mode 100644 index 0000000..4dd7019 --- /dev/null +++ b/lib/thandy/packagesys/__init__.py @@ -0,0 +1,4 @@ +# Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information. + +__all__ = [ ] + diff --git a/lib/thandy/repository.py b/lib/thandy/repository.py index 1385dd6..af2fa4f 100644 --- a/lib/thandy/repository.py +++ b/lib/thandy/repository.py @@ -16,8 +16,18 @@ import time  MAX_TIMESTAMP_AGE = 24*60*60  class RepositoryFile: +    """Represents information about a file stored in our local repository +       cache.  Used to validate and load files. +    """      def __init__(self, repository, relativePath, schema,                   needRole=None, signedFormat=True, needSigs=1): +        """Allocate a new RepositoryFile for a file to be stored under +           the LocalRepository 'repository' in relativePath.  Make +           sure the file validates with 'schema' (or its signed form, +           if 'signedFormat').  When checking signatures, this file needs +           at least 'needSigs' signatures with role 'needRole'. +        """ +        # These fields are as in the arguments.          self._repository = repository          self._relativePath = relativePath          self._schema = schema @@ -25,17 +35,37 @@ class RepositoryFile:          self._signedFormat = signedFormat          self._needSigs = needSigs -        self._signed_obj = self._main_obj = None +        # The contents of the file, parsed.  None if we haven't loaded +        # the file. +        self._main_obj = None + +        # The contents of the file along with their signatures.  May +        # be aliased by _main_obj.  None if we haven't loaded the +        # file. +        self._signed_obj = None + +        # A SignatureStatus object, if we have checked signatures. +        self._sigStatus = None +        # The mtime of the file on disk, if we know it. +        self._mtime = None + +    def clear(self): +        """DOCDOC""" +        self._main_obj = self._signed_obj = None          self._sigStatus = None          self._mtime = None      def getRelativePath(self): +        """Return the filename for this item relative to the top of the +           repository."""          return self._relativePath      def getPath(self): +        """Return the actual filename for this item."""          return self._repository.getFilename(self._relativePath)      def _load(self): +        """Helper: load and parse this item's contents."""          fname = self.getPath()          # Propagate OSError @@ -59,6 +89,7 @@ class RepositoryFile:          self._mtime = mtime      def _save(self, content=None): +        """Helper: Flush this object's contents to disk."""          if content == None:              content = sexpr.encode @@ -69,9 +100,13 @@ class RepositoryFile:          self._signed_obj = signed_obj          self._main_obj = main_obj -        self._mtime = mtime +        self._mtime = time.time()      def _checkContent(self, content): +        """Helper.  Check whether 'content' matches SIGNED_SCHEMA, and +           self._schema (as appropraite).  Return a tuple of the +           signed_schema match, and the schema match, or raise +           FormatException."""          try:              obj = json.loads(content) @@ -94,20 +129,26 @@ class RepositoryFile:          return signed_obj, main_obj      def load(self): +        """Load this object from disk if it hasn't already been loaded."""          if self._main_obj == None:              self._load()      def get(self): +        """Return the object, or None if it isn't loaded."""          return self._main_obj      def isLoaded(self): +        """Return true iff this object is loaded."""          return self._main_obj != None      def getContent(self): +        """Load this object as needed and return its content."""          self.load()          return self._main_obj      def _checkSignatures(self): +        """Helper: Try to verify all the signatures on this object, and +           cache the SignatureStatus object."""          self.load()          sigStatus = thandy.formats.checkSignatures(self._signed_obj,                                       self._repository._keyDB, @@ -115,15 +156,47 @@ class RepositoryFile:          self._sigStatus = sigStatus      def checkSignatures(self): +        """Try to verify all the signatures on this object if we +           haven't already done so, and return a SignatureStatus +           object."""          if self._sigStatus is None:              self._checkSignatures()          return self._sigStatus +class PkgFile: +    def __init__(self, repository, relativePath, needHash): +        self._repository = repository +        self._relativePath = relativePath +        self._needHash = needHash + +        self._mtime = None + +    def clear(self): +        self._mtime = None + +    def getRelativePath(self): +        return self._relativePath + +    def getPath(self): +        return self._repository.getFilename(self._relativePath) + +    def getExpectedHash(self): +        return self._needHash + +    def checkFile(self): +        return self._needHash == self._repository.getFileDigest() +  class LocalRepository: +    """Represents a client's partial copy of a remote mirrored repository."""      def __init__(self, root): +        """Create a new local repository that stores its files under 'root'""" +        # Top of our mirror.          self._root = root + +        # A base keylist of master keys; we'll add others later.          self._keyDB = thandy.util.getKeylist(None) +        # Entries for the three invariant metafiles.          self._keylistFile = RepositoryFile(              self, "/meta/keys.txt", thandy.formats.KEYLIST_SCHEMA,              needRole="master") @@ -133,28 +206,38 @@ class LocalRepository:          self._mirrorlistFile = RepositoryFile(              self, "/meta/mirrors.txt", thandy.formats.MIRRORLIST_SCHEMA,              needRole="mirrors") +          self._metaFiles = [ self._keylistFile,                              self._timestampFile,                              self._mirrorlistFile ] +        # Map from relative path to a RepositoryFile for packages.          self._packageFiles = {} + +        # Map from relative path to a RepositoryFile for bundles.          self._bundleFiles = {}      def getFilename(self, relativePath): +        """Return the file on disk that caches 'relativePath'."""          if relativePath.startswith("/"):              relativePath = relativePath[1:]          return os.path.join(self._root, relativePath)      def getKeylistFile(self): +        """Return a RepositoryFile for our keylist."""          return self._keylistFile      def getTimestampFile(self): +        """Return a RepositoryFile for our timestamp file."""          return self._timestampFile      def getMirrorlistFile(self): +        """Return a RepositoryFile for our mirrorlist."""          return self._mirrorlistFile      def getPackageFile(self, relPath): +        """Return a RepositoryFile for a package stored at relative path +           'relPath'."""          try:              return self._packageFiles[relPath]          except KeyError: @@ -164,6 +247,8 @@ class LocalRepository:              return pkg      def getBundleFile(self, relPath): +        """Return a RepositoryFile for a bundle stored at relative path +           'relPath'."""          try:              return self._bundleFiles[relPath]          except KeyError: @@ -172,10 +257,38 @@ class LocalRepository:                  needRole='bundle')              return pkg -    def getFilesToUpdate(self, now=None, trackingBundles=()): +    def getRequestedFile(self, relPath): +        """ """ +        for f in self._metafiles: +            if f.getRelativePath() == relPath: +                return f +        for f in self._bundleFiles.itervalues(): +            if f.getRelativePath() == relPath: +                return f +        for f in self._packageFiles.itervalues(): +            if f.getRelativePath() == relPath: +                return f +            f.load() +            for item in f.get()['files']: +                rp, h = item[:2] +                if rp == relPath: +                    return PkgFile(self, rp, thandy.formats.parseHash(h)) + +    def getFilesToUpdate(self, now=None, trackingBundles=(), hashDict=None): +        """Return a set of relative paths for all files that we need +           to fetch.  Assumes that we care about the bundles +           'trackingBundles'.  If hashDict is provided, add mappings to it +           from the relative paths we want to fecth to the hashes that we +           want those items to have, when we know those hashes. +        """ +          if now == None:              now = time.time() +        if hashDict == None: +            # Use a dummy hashdict. +            hashDict = {} +          need = set()          # Fetch missing metafiles. @@ -196,6 +309,8 @@ class LocalRepository:              age = now - thandy.formats.parseTime(ts['at'])              ts = thandy.formats.TimestampFile.fromJSon(ts)              if age > MAX_TIMESTAMP_AGE: +                logging.info("Timestamp file from %s is out of " +                             "date; must fetch it.", ts['at'])                  need.add(self._timestampFile.getRelativePath())          # If the keylist isn't signed right, we can't check the @@ -203,6 +318,8 @@ class LocalRepository:          if self._keylistFile.get():              s = self._keylistFile.checkSignatures()              if not s.isValid(): # For now only require one master key. +                logging.info("Key list is not properly signed; must get a " +                             "new one.")                  need.add(self._keylistFile.getRelativePath())          if need: @@ -215,6 +332,8 @@ class LocalRepository:          # new keylist.          s = self._timestampFile.checkSignatures()          if not s.isValid(): +            logging.info("Timestamp file is not properly signed; fetching new " +                         "timestamp file and keylist.")              need.add(self._keylistFile.getRelativePath())              need.add(self._timestampFile.getRelativePath())              return need @@ -222,9 +341,15 @@ class LocalRepository:          # FINALLY, we know we have an up-to-date, signed timestamp          # file.  Check whether the keys and mirrors file are as          # authenticated. +        hashDict[self._keylistFile.getRelativePath()] = \ +            ts.getKeylistInfo().getHash() +        hashDict[self._mirrorlistFile.getRelativePath()] = \ +            ts.getMirrorlistInfo().getHash() +          h_kf = thandy.formats.getDigest(self._keylistFile.get())          h_expected = ts.getKeylistInfo().getHash()          if h_kf != h_expected: +            logging.info("Keylist file hash did not match.  Must fetch it.")              need.add(self._keylistFile.getRelativePath())          if need: @@ -232,11 +357,13 @@ class LocalRepository:          s = self._mirrorlistFile.checkSignatures()          if not s.isValid(): +            logging.info("Mirrorlist file signatures not valid. Must fetch.")              need.add(self._mirrorlistFile.getRelativePath())          h_mf = thandy.formats.getDigest(self._mirrorlistFile.get())          h_expected = ts.getMirrorlistInfo().getHash()          if h_mf != h_expected: +            logging.info("Mirrorlist file hash did not match. Must fetch.")              need.add(self._mirrorlistFile.getRelativePath())          if need: @@ -249,26 +376,30 @@ class LocalRepository:              try:                  binfo = ts.getBundleInfo(b)              except KeyError: -                logging.warn("Unrecognized bundle %s"%b) +                logging.warn("Bundle %s not listed in timestamp file."%b)                  continue              rp = binfo.getRelativePath() +            hashDict[rp] = h_expected = binfo.getHash()              bfile = self.getBundleFile(rp)              try:                  bfile.load()              except OSError: +                logging.info("Can't find bundle %s on disk; must fetch.", rp)                  need.add(rp)                  continue              h_b = thandy.formats.getDigest(bfile.get()) -            h_expected = binfo.getHash()              if h_b != h_expected: +                logging.info("Bundle hash not as expected; must fetch.", rp)                  need.add(rp)                  continue              s = bfile.checkSignatures()              if not s.isValid():                  # Can't actually use it. +                logging.warn("Bundle hash was as expected, but signatures did " +                             "not match.")                  continue              bundles[rp] = bfile @@ -280,20 +411,26 @@ class LocalRepository:              for pkginfo in bundle['packages']:                  rp = pkginfo['path']                  pfile = self.getPackageFile(rp) +                h_expected = thandy.formats.parseHash(pkginfo['hash']) +                hashDict[rp] = h_expected                  try:                      pfile.load()                  except OSError: +                    logging.info("Can't find package %s on disk; must fetch.", +                                 rp)                      need.add(rp)                      continue                  h_p = thandy.formats.getDigest(pfile.get()) -                h_expected = thandy.formats.parseHash(pkginfo['hash'])                  if h_p != h_expected: +                    logging.info("Wrong hash for package %s; must fetch.", rp)                      need.add(rp)                      continue                  s = pfile.checkSignatures()                  if not s.isValid(): +                    logging.warn("Package hash was as expected, but signature " +                                 "did nto match")                      # Can't use it.                      continue                  packages[rp] = pfile @@ -305,13 +442,17 @@ class LocalRepository:              for f in package['files']:                  rp, h = f[:2]                  h_expected = thandy.formats.parseHash(h) +                hashDict[rp] = h_expected                  fn = self.getFilename(rp)                  try:                      h_got = thandy.formats.getFileDigest(fn)                  except OSError: +                    logging.info("Installable file %s not found on disk; " +                                 "must load", rp)                      need.add(rp)                      continue                  if h_got != h_expected: +                    logging.info("Hash for %s not as expected; must load.", rp)                      need.add(rp)          # Okay; these are the files we need. diff --git a/lib/thandy/socksurls.py b/lib/thandy/socksurls.py new file mode 100644 index 0000000..d035a6c --- /dev/null +++ b/lib/thandy/socksurls.py @@ -0,0 +1,93 @@ +# Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information. + +"""Implements URL types for socks-mediated connections.""" + +import socket +import httplib +import logging +import struct +import urllib2 + +# XXXX This isn't really threadsafe, but for now we don't change this after +# startup. +SOCKS_HOST = None +SOCKS_PORT = None + +def setSocksProxy(host, port): +    """Set the global SOCKS proxy to host:port.""" +    global SOCKS_HOST, SOCKS_PORT +    SOCKS_HOST = host +    SOCKS_PORT = port + +def _recvall(sock, n): +    """Helper: fetch N bytes from the socket sock.""" +    result = "" +    while 1: +        s = sock.recv(n) +        if not s: +            return result +        result += s +        n -= len(s) +        if n <= 0: +            return result + +def socks_connect(host, port): +    """Helper: use the SOCKS proxy to open a connection to host:port. +       Uses the simple and Tor-friendly SOCKS4a protocol.""" +    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +    try: +        logging.debug("Connecting to SOCKS proxy") +        sock.connect((SOCKS_HOST, SOCKS_PORT)) + +        # Now, the handshake!  We just do socks4a, since that's the simplest. +        version = 4 # socks 4 +        command = 1 # connect +        addr    = 1   # 0.0.0.1, signals socks4a. +        userid  = "" + +        messageheader = struct.pack("!BBHL", version, command, port, addr) +        message = "%s%s\x00%s\x00" % (messageheader, userid, host) + +        sock.sendall(message) + +        logging.debug("Waiting for reply from SOCKS proxy") +        reply = _recvall(sock, 8) +        code = ord(reply[1]) +        if code == 0x5a: +            logging.debug("SOCKS proxy is connected.") +            return sock +        else: +            raise socket.error("Bad SOCKS response code from proxy: %d", code) +    except: +        sock.close() +        raise + +# Copies of HTTPConnection and HTTPSConnection that use socks instead of +# direct connections. +class SocksHTTPConnection(httplib.HTTPConnection): +    def connect(self): +        self.sock = socks_connect(self.host, self.port) +class SocksHTTPSConnection(httplib.HTTPSConnection): +    def connect(self): +        socket = socks_connect(self.host, self.port) +        ssl = socket.ssl(sock, None, None) +        self.sock = socket.FakeSocket(socket, ssl) + +# URL handlers for HTTP and HTTPS urls that use socks instead of direct +# connections. +class SocksHTTPHandler(urllib2.AbstractHTTPHandler): +    def http_open(self, req): +        return self.do_open(SocksHTTPConnection, req) +    http_request = urllib2.AbstractHTTPHandler.do_request_ +class SocksHTTPSHandler(urllib2.AbstractHTTPHandler): +    def https_open(self, req): +        return self.do_open(SocksHTTPSConnection, req) +    https_request = urllib2.AbstractHTTPHandler.do_request_ + +def build_socks_opener(): +    """Return an urllib2.OpenerDirector object to open HTTP and HTTPS +       urls using SOCKS connections.""" +    opener = urllib2.OpenerDirector() +    opener.add_handler(SocksHTTPSHandler()) +    opener.add_handler(SocksHTTPHandler()) +    return opener diff --git a/lib/thandy/util.py b/lib/thandy/util.py index b7281ae..2db7df4 100644 --- a/lib/thandy/util.py +++ b/lib/thandy/util.py @@ -3,6 +3,7 @@  import os  import sys  import tempfile +import random  try:      import json @@ -20,6 +21,7 @@ def moveFile(fromLocation, toLocation):              os.unlink(toLocation)          except OSError:              pass +      os.rename(fromLocation, toLocation) @@ -75,3 +77,21 @@ def getKeylist(keys_fname, checkKeys=True):          keydb.addFromKeylist(obj['signed'], allowMasterKeys=False)      return keydb + +def randChooseWeighted(lst): +    """Given a list of (weight,item) tuples, pick an item with +       probability proportional to its weight. +    """ + +    totalweight = sum(w for w,i in lst) +    position = random.uniform(0, totalweight) +    soFar = 0 + +    # We could use bisect here, but this is not going to be in the +    # critical path.  If it is, oops. +    for w,i in lst: +        soFar += w +        if position < soFar: +            return i + +    return lst[-1][1]  | 
