summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2008-11-16 20:15:34 +0000
committerNick Mathewson <nickm@torproject.org>2008-11-16 20:15:34 +0000
commitc5749895ab4f6893bdf1d398691d1dd33e81574c (patch)
tree4ee47c3c6c56e313c3074f04c77a3637cf0fe31d
parent02a2e5807f23ad0cad9a49b5febe08ec25fcc74c (diff)
have some more thandy. This update includes a working downloader with tor support, a package system framework, and more. Most of what's left is glue code.
git-svn-id: file:///home/or/svnrepo/updater/trunk@17288 55e972cd-5a19-0410-ae62-a4d7a52db4cd
-rw-r--r--lib/thandy/ClientCLI.py9
-rw-r--r--lib/thandy/__init__.py2
-rw-r--r--lib/thandy/download.py331
-rw-r--r--lib/thandy/formats.py24
-rw-r--r--lib/thandy/packagesys/ExePackages.py33
-rw-r--r--lib/thandy/packagesys/PackageDB.py81
-rw-r--r--lib/thandy/packagesys/PackageSystem.py58
-rw-r--r--lib/thandy/packagesys/RPMPackages.py156
-rw-r--r--lib/thandy/packagesys/__init__.py4
-rw-r--r--lib/thandy/repository.py153
-rw-r--r--lib/thandy/socksurls.py93
-rw-r--r--lib/thandy/util.py20
12 files changed, 890 insertions, 74 deletions
diff --git a/lib/thandy/ClientCLI.py b/lib/thandy/ClientCLI.py
index ba7fb2e..ad39a2d 100644
--- a/lib/thandy/ClientCLI.py
+++ b/lib/thandy/ClientCLI.py
@@ -33,11 +33,11 @@ def update(args):
for f in files:
# XXXX Use hash.
- dj = thandy.download.DownloadJob(f, repo.getFilename(f),
- mirrorlist)
+ dj = thandy.download.ThandyDownloadJob(f, repo.getFilename(f),
+ mirrorlist)
downloader.addDownloadJob(dj)
# XXXX replace file in repository if ok; reload; see what changed.
-
+
# Wait for in-progress jobs
# Check my repository
@@ -48,6 +48,7 @@ def update(args):
# Tell me what to install.
+
def usage():
print "Known commands:"
print " update [--repo=repository] [--no-download]"
@@ -58,7 +59,7 @@ def main():
usage()
cmd = sys.argv[1]
args = sys.argv[2:]
- if cmd in [ "update" ]:
+ if cmd in [ "update", "geturls" ]:
globals()[cmd](args)
else:
usage()
diff --git a/lib/thandy/__init__.py b/lib/thandy/__init__.py
index 03b262e..5b53bfe 100644
--- a/lib/thandy/__init__.py
+++ b/lib/thandy/__init__.py
@@ -34,3 +34,5 @@ class PubkeyFormatException(FormatException):
class UnknownMethod(CryptoError):
pass
+class DownloadError(Exception):
+ pass
diff --git a/lib/thandy/download.py b/lib/thandy/download.py
index bf7dc43..73226c7 100644
--- a/lib/thandy/download.py
+++ b/lib/thandy/download.py
@@ -1,28 +1,50 @@
# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information.
-import urllib2
import httplib
+import logging
+import os
+import Queue
import random
-
-import threading, Queue
+import sys
+import threading
+import traceback
+import urllib2
import thandy.util
+import thandy.socksurls
-class Downloads:
+
+class DownloadManager:
+ """Class to track a set of downloads and pass them out to worker threads.
+ """
def __init__(self, n_threads=2):
+ # Prevents concurrent modification to downloads and haveDownloaded
self._lock = threading.RLock()
+ # Map from resource relPath to job.
self.downloads = {}
+ # Map from resource relPath from True to objects that we have
+ # managed to dowload.
self.haveDownloaded = {}
+ # Work queue of DownloadJobs that we intend to process once a thread
+ # is free.
self.downloadQueue = Queue.Queue()
- self.threads = [ threading.Thread(target=self._thread) ]
+ # List of worker threads.
+ self.threads = [ threading.Thread(target=self._thread, args=[idx])
+ for idx in xrange(n_threads) ]
+ # Condition that gets triggered whenever a thread is finished doing
+ # something.
+ self.done = threading.Condition()
for t in self.threads:
t.setDaemon(True)
def start(self):
+ """Start all of this download manager's worker threads."""
for t in self.threads:
t.start()
def isCurrentlyDownloading(self, relPath):
+ """Return true iff this download manager is currently downloading
+ some copy of the resource at relPath."""
self._lock.acquire()
try:
return self.downloads.has_key(relPath)
@@ -30,6 +52,9 @@ class Downloads:
self._lock.release()
def isRedundant(self, relPath):
+ """Return true iff we are currently downloading, or have
+ downloaded, the resource at relPath."""
+
self._lock.acquire()
try:
return (self.downloads.has_key(relPath) or
@@ -37,91 +62,285 @@ class Downloads:
finally:
self._lock.release()
+ def finished(self):
+ """Return true iff we have no active or pending jobs."""
+ self._lock.acquire()
+ try:
+ return downloadQueue.empty() and len(self.downloads) == 0
+ finally:
+ self._lock.release()
+
+ def wait(self):
+ """Pause until we have no active or pending jobs."""
+ while True:
+ self.done.acquire()
+ self.done.wait()
+ self.done.release()
+
+ if self.finished():
+ break
+
def addDownloadJob(self, job):
+ """Add another DownloadJob to the end of the work queue."""
rp = job.getRelativePath()
self._lock.acquire()
self.downloads[rp] = job
self._lock.release()
self.downloadQueue.put(job)
- def _thread(self):
+ def downloadFailed(self, mirror, relpath):
+ """DOCDOC"""
+ pass # Track failure; don't try the same mirror right away.
+
+ def _thread(self, idx):
+ # Run in the background per thread. idx is the number of the thread.
while True:
- job = self.downloadQueue.get()
- job.download()
+ job = self.downloadQueue.get() # Grab job from queue.
rp = job.getRelativePath()
- self._lock.acquire()
try:
- del self.downloads[rp]
- self.haveDownloaded[rp] = True
+ logging.info("start %s in Thread %s", rp, idx)
+ success = job.download() # Execute the download.
+ logging.info("end %s in Thread %s", rp, idx)
finally:
- self._lock.release()
+ self._lock.acquire()
+ try:
+ del self.downloads[rp]
+ if success: # If we downloaded correctly, say so.
+ self.haveDownloaded[rp] = True
+ finally:
+ self._lock.release()
+
+ self.done.acquire()
+ self.done.notify()
+ self.done.release()
class DownloadJob:
- def __init__(self, relPath, destPath, mirrorlist=None,
- wantHash=None, canStall=False):
- self._relPath = relPath
+ """Abstract base class. Represents a thing to be downloaded, and the
+ knowledge of how to download it."""
+ def __init__(self, targetPath, tmpPath, wantHash=None, useTor=False):
+ """Create a new DownloadJob. When it is finally downloaded,
+ store it in targetPath. Store partial results in tmpPath;
+ if there is already a file in tmpPath, assume that it is an
+ incomplete download. If wantHash, reject the file unless
+ the hash is as given. If useTor, use a socks connection."""
+
+ self._destPath = targetPath
+ self._tmpPath = tmpPath
self._wantHash = wantHash
- self._mirrorList = mirrorlist
- self._destPath = destPath
+ self._useTor = useTor
+
+ def getURL(self):
+ """Abstract implementation helper. Returns the URL that the
+ _download function downloads from."""
+ raise NotImplemented()
+
+ def getRelativePath(self):
+ """Abstract. Returns a string representing this download, to
+ keep two downloads of the same object from running at once.
+ In Thandy, this is usually a relative path of a downloaded
+ object within the repository.
+ """
+ raise NotImplemented()
+
+ def haveStalledFile(self):
+ """Return true iff we have an existing incomplete download stored in
+ the temporary file.
+ """
+ return os.path.exists(self._tmpPath)
+
+ def download(self):
+ """Main interface function: Start the download, and return
+ when complete.
+ """
+ try:
+ self._download()
+ return True
+ except (OSError, thandy.DownloadError), err:
+ # XXXXX retry on failure
+ logging.warn("Download failed: %s", err)
+ return False
+ except:
+ tp, val, tb = sys.exc_info()
+ logging.warn("Internal during download: %s, %s", val,
+ traceback.format_exc())
+ sys.exit(1)
+
+
+ def _download(self):
+ # Implementation function. Unlike download(), can throw exceptions.
+ f_in = f_out = None
+
+ try:
+ url = self.getURL()
+
+ logging.info("Downloading %s", url)
+
+ if self.haveStalledFile():
+ have_length = os.stat(self._tmpPath).st_size
+ print "Have stalled file with %s bytes"%have_length
+ else:
+ have_length = None
+
+ f_in = getConnection(url, self._useTor, have_length)
+
+ logging.info("Connected to %s", url)
+
+ gotRange = f_in.info().get("Content-Range")
+ expectLength = f_in.info().get("Content-Length", "???")
+ if gotRange:
+ if gotRange.startswith("bytes %s-"%(have_length+1)):
+ logging.info("Resuming download from %s"%url)
+ f_out = open(self._tmpPath, 'a')
+ else:
+ raise thandy.DownloadError("Got an unexpected range %s"
+ %gotRange)
+ else:
+ f_out = open(self._tmpPath, 'w')
+
+ total = 0
+ while True:
+ c = f_in.read(1024)
+ if not c:
+ break
+ f_out.write(c)
+ total += len(c)
+ logging.debug("Got %s/%s bytes from %s",
+ total, expectLength, url)
+
+ finally:
+ if f_in is not None:
+ f_in.close()
+ if f_out is not None:
+ f_out.close()
+
+ if self._wantHash:
+ gotHash = thandy.formats.getFileDigest(self._tmpPath)
+ if gotHash != self._wantHash:
+ raise thandy.DownloadError("File hash was not as expected.")
+
+ thandy.util.moveFile(self._tmpPath, self._destPath)
+
+
+class SimpleDownloadJob(DownloadJob):
+ """Testing subtype of DownloadJob: just downloads a URL and writes it to
+ disk."""
+ def __init__(self, targetPath, url,
+ wantHash=None, supportedURLTypes=None, useTor=False):
+ DownloadJob.__init__(self, targetPath, targetPath+".tmp",
+ wantHash=wantHash,
+ useTor=useTor)
+ self._url = url
+
+ def getURL(self):
+ return self._url
+
+ def getRelativePath(self):
+ return self._url
+
+class ThandyDownloadJob(DownloadJob):
+ """Thandy's subtype of DownloadJob: knows about mirrors, weighting,
+ and Thandy's directory structure."""
+ def __init__(self, relPath, destPath, mirrorList, wantHash=None,
+ supportedURLTypes=None, useTor=None):
+
+ DownloadJob.__init__(self, destPath, None, wantHash=wantHash,
+ useTor=useTor)
+ self._mirrorList = mirrorList[:]
+ self._relPath = relPath
tmppath = thandy.util.userFilename("tmp")
if relPath.startswith("/"):
relPath = relPath[1:]
- self._tmppath = os.path.join(tmppath, relPath)
+ self._tmpPath = os.path.join(tmppath, relPath)
- d = os.path.dirname(self._tmppath)
+ d = os.path.dirname(self._tmpPath)
if not os.path.exists(d):
os.makedirs(d, 0700)
- def getRelativePath(self):
- return self._relPath
+ self._supportedURLTypes = None
+ if self._supportedURLTypes is None and useTor:
+ self._supportedURLTypes = [ "http", "https" ]
- def haveStalledFile(self):
- return os.path.exists(self._tmppath)
- def getURL(self, mirrorlist=None):
- if mirrorlist is None:
- mirrorlist = self._mirrorList
- weightSoFar = 0
+ def getURL(self):
usable = []
- for m in mirrorlist['mirrors']:
+ for m in self._mirrorList['mirrors']:
for c in m['contents']:
- # CHECK FOR URL SUITABILITY XXXXX
+
+ if self._supportedURLTypes is not None:
+ urltype = urllib2.splittype(m['urlbase'][0])
+ if urltype.lower() not in self._supportedURLTypes:
+ continue
if thandy.formats.rolePathMatches(c, self._relPath):
- weightSoFar += m['weight']
- usable.append( (weightSoFar, m) )
+ usable.append( (m['weight'], m) )
break
- wTarget = random.randint(0, weightSoFar)
- mirror = None
- # Could use bisect here instead
- for w, m in mirrorlist:
- if w >= wTarget:
- mirror = m
- break
+ mirror = thandy.util.randChooseWeighted(usable)
return m['urlbase'] + self._relPath
- def download(self):
- # XXXX RESUME
+ def getRelativePath(self):
+ return self._relPath
- f_in = urllib2.urlopen(self.getURL())
- f_out = open(self._tmpPath, 'w')
- while True:
- c = f_in.read(1024)
- if not c:
- break
- f_out.write(c)
- f_in.close()
- f_out.close()
- # XXXXX retry on failure
- if self._wantHash:
- gotHash = thandy.formats.getFileDigest(self._tmpPath)
- if gotHash != self._wantHash:
- # XXXX Corrupt file.
- pass
+_socks_opener = thandy.socksurls.build_socks_opener()
+
+def getConnection(url, useTor, have_length=None):
+ """Open a connection to 'url'. We already have received
+ have_length bytes of the file we're trying to fetch, so resume
+ if possible.
+
+ """
+ headers = {}
+ urltype = urllib2.splittype(url)[0]
+ is_http = urltype in ["http", "https"]
+
+ if have_length is not None and is_http:
+ headers['Range'] = "bytes=%s-"%(have_length+1)
+
+ req = urllib2.Request(url, headers=headers)
+
+ if useTor:
+ conn = _socks_opener.open(req)
+ else:
+ conn = urllib2.urlopen(req)
+
+ return conn
+
+
+if __name__ == '__main__':
+ # Trivial CLI to test out downloading.
+
+ import getopt
+ options, args = getopt.getopt(sys.argv[1:], "",
+ ["use-tor", "socksport=", "threads="])
+
+ useTor = False
+ socksPort = 9050
+ nThreads = 2
+ for o,v in options:
+ if o == "--use-tor":
+ useTor = True
+ elif o == "--socksport":
+ socksPort = int(v)
+ elif o == "--threads":
+ nThreads = int(v)
+
+ logging.basicConfig(level=logging.DEBUG)
+
+ if useTor:
+ thandy.socksurls.setSocksProxy("127.0.0.1", socksPort)
+
+ manager = DownloadManager(nThreads)
+
+ for url in args:
+ fn = urllib2.splithost(urllib2.splittype(url)[1])[1]
+ fn = os.path.split(fn)[1]
+
+ job = SimpleDownloadJob(fn, url, useTor=useTor)
+ manager.addDownloadJob(job)
- thandy.utils.moveFile(self._tmpPath, self._destPath)
+ manager.start()
+ manager.wait()
diff --git a/lib/thandy/formats.py b/lib/thandy/formats.py
index 42e3d79..7ae6080 100644
--- a/lib/thandy/formats.py
+++ b/lib/thandy/formats.py
@@ -258,9 +258,9 @@ def getDigest(obj, digestObj=None):
return digestObj.digest()
def getFileDigest(f, digestObj=None):
- """Update 'digestObj' (typically a SHA256 object) with the digest of
- the file object in f. If digestObj is none, compute the SHA256
- hash and return it.
+ """Update 'digestObj' (typically a SHA256 object) with the digest
+ of the file object (or filename) in f. If digestObj is none,
+ compute the SHA256 hash and return it.
>>> s = "here is a long string"*1000
>>> import cStringIO, Crypto.Hash.SHA256
@@ -271,15 +271,23 @@ def getFileDigest(f, digestObj=None):
>>> h1.digest() == h2.digest()
True
"""
+ f_to_close = None
+ if isinstance(f, basestring):
+ t_to_close = f = open(f, 'rb')
+
useTempDigestObj = (digestObj == None)
if useTempDigestObj:
digestObj = Crypto.Hash.SHA256.new()
- while 1:
- s = f.read(4096)
- if not s:
- break
- digestObj.update(s)
+ try:
+ while 1:
+ s = f.read(4096)
+ if not s:
+ break
+ digestObj.update(s)
+ finally:
+ if f_to_close != None:
+ f_to_close.close()
if useTempDigestObj:
return digestObj.digest()
diff --git a/lib/thandy/packagesys/ExePackages.py b/lib/thandy/packagesys/ExePackages.py
new file mode 100644
index 0000000..1688da3
--- /dev/null
+++ b/lib/thandy/packagesys/ExePackages.py
@@ -0,0 +1,33 @@
+# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information.
+
+import thandy.packagesys.PackageSystem as ps
+import thandy.packagesys.PackageDB as pdb
+
+class ExePackageSystem(pdb.DBBackedPackageSystem):
+
+ def getName(self):
+ return "executable"
+
+ def packageHandleFromJSON(self, json):
+ raise NotImplemented() #XXXX????
+
+ def canBeAutomatic(self):
+ return True
+
+ def canHaveUI(self):
+ return True
+
+class ExePackageHandle(pdb.DBBackedPackageHandle):
+ def __init__(self, packageDB, name, version, filelist, filename,
+ arguments):
+ pdb.DBBackedPackageHandle.__init__(packageDB, name, version, filelist)
+ self._filename = filename
+ self._arguments = arguments
+
+ def _doInstall(self):
+ commandline = [ self._filename ] + self._arguments
+ logging.info("Installing %s. Command line: %s", self._filename,
+ commandLine)
+ subprocess.call(commandline)
+
+
diff --git a/lib/thandy/packagesys/PackageDB.py b/lib/thandy/packagesys/PackageDB.py
new file mode 100644
index 0000000..bb218be
--- /dev/null
+++ b/lib/thandy/packagesys/PackageDB.py
@@ -0,0 +1,81 @@
+# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information.
+
+import anydbm
+import shelve
+
+import thandy.util
+import thandy.formats
+
+class SimplePackageDB:
+
+ def __init__(self, filename):
+ self._db = anydbm.open(filename, 'c')
+
+ def setVersion(self, package, version, filelist):
+ pass
+
+ def setInstallParameters(self, package, params):
+ pass
+
+ def getCurVersion(self, package):
+ pass
+
+ def getInstallParameters(self, package):
+ pass
+
+
+class DBBackedPackageSystem(thandy.packagesys.PackageSystem):
+ def __init__(self, packageDB):
+ self._packageDB = packageDB
+
+class DBBackedPackageHandle(thandy.packagesys.PackageHandle):
+ def __init__(self, packageDB, name, version, filelist):
+ thandy.packagesys.PackageSystem.PackageHandle.__init__(self)
+ self._packageDB = packageDB
+ self._name = name
+ self._version = version
+ self._filelist = filelist
+
+ self._metaData = None
+
+ def _getInstallBase(self):
+ raise NotImplemented()
+
+ def anyVersionInstalled(self, transaction=None):
+ return self._packageDB.getCurVersion(self._name) != None
+
+ def getInstalledVersion(self, transaction=None):
+ return self._packageDB.getCurVersion(self._name)
+
+ def install(self):
+ params = self._doInstall()
+ self._packageDB.setCurVersion(
+ self._name, self._version, self._filelist)
+ self._packageDB.setInstallParameters(self._name, params)
+
+ def _doInstall(self):
+ raise NotImplemented()
+
+ def isInstalled(self):
+ return self.getInstalledVersion(self, transaction) == self._version
+
+ def checkInstall(self):
+ base = self._getInstallBase()
+
+ all_ok = True
+ for fn, hash in self._filelist:
+ fn = os.path.join(base, fn)
+ if not os.path.exists(fn):
+ all_ok = False
+ else:
+ f = open(fn, 'rb')
+ try:
+ try:
+ d = thandy.formats.getFileDigest(f)
+ except OSError:
+ all_ok = False
+ break
+ finally:
+ f.close()
+
+ return all_ok
diff --git a/lib/thandy/packagesys/PackageSystem.py b/lib/thandy/packagesys/PackageSystem.py
new file mode 100644
index 0000000..ad1e221
--- /dev/null
+++ b/lib/thandy/packagesys/PackageSystem.py
@@ -0,0 +1,58 @@
+# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information.
+
+class PackageSystem:
+ def getName(self):
+ raise NotImplemented()
+
+ def packageHandleFromJSON(self, json):
+ raise NotImplemented()
+
+ def canBeAutomatic(self):
+ return True
+
+ def canHaveUI(self):
+ return False
+
+ def getTransaction(self):
+ return PackageTransaction()
+
+class PackageTransaction:
+ def __init__(self):
+ self._transactions = []
+
+ def _start(self):
+ pass
+
+ def _commit(self):
+ pass
+
+ def run(self):
+ self._start()
+ for cb in self._transactions:
+ cb(self)
+ self._commit()
+
+ def addInstall(self, packageHandle):
+ self._transactions.append(packageHandle.install)
+
+ def addRemove(self, packageHandle):
+ self._transactions.append(packageHandle.remove)
+
+class PackageHandle:
+ def isInstalled(self, transaction=None):
+ raise NotImplemented()
+
+ def anyVersionInstalled(self, transaction=None):
+ raise NotImplemented()
+
+ def getInstalledVersion(self, transaction=None):
+ raise NotImplemented()
+
+ def install(self, transaction):
+ raise NotImplemented()
+
+ def remove(self, transaction):
+ raise NotImplemented()
+
+ def checkInstall(self, transaction=None):
+ raise NotImplemented()
diff --git a/lib/thandy/packagesys/RPMPackages.py b/lib/thandy/packagesys/RPMPackages.py
new file mode 100644
index 0000000..5e4c960
--- /dev/null
+++ b/lib/thandy/packagesys/RPMPackages.py
@@ -0,0 +1,156 @@
+# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information.
+
+import thandy.packagesys.PackageSystem
+
+import os
+import rpm
+import md5
+
+__all__ = [ 'RPMPackageSystem' ]
+
+class RPMPackageSystem(thandy.packagesys.PackageSystem.PackageSystem):
+ def getName(self):
+ return "RPM"
+
+ def packageHandleFromJSON(self, json):
+ raise NotImplemented() # XXXX
+
+ def getTransaction(self):
+ return RPMPackageTransaction()
+
+_CALLBACK_CODES = {}
+
+for name in dir(rpm):
+ if name.startswith("RPMCALLBACK_"):
+ _CALLBACK_CODES[getattr(rpm, name)] = name[12:]
+del name
+
+class RPMPackageTransaction(thandy.packagesys.PackageSystem.PackageTransaction):
+
+ def _start(self):
+ thandy.packagesys.PackageSystem.PackageTransaction.__init__(self)
+ self._tset = rpm.TransactionSet()
+
+ def _commit(self):
+ self._tset.run(self._callback, "")
+
+ def _callback(self, what, amount, total, mydata, _):
+ if what == rpm.RPMCALLBACK_INST_OPEN_FILE:
+ hdr, path = mydata
+ logging.info("Installing RPM for %s [%s]", hdr['name'], path)
+
+ elif what == rpm.RPMCALLBACK_INST_CLOSE_FILE:
+ hdr, path = mydata
+ logging.info("Done installing RPM for %s", path)
+
+ elif what == rpm.RPMCALLBACK_INST_PROGRESS:
+ hdr, path = mydata
+ logging.info("%s: %.5s%% done", name, float(amount)/total*100)
+
+ else:
+ hdr, path = mydata
+ logging.info("RPM event %s on %s [%s/%s]",
+ _CALLBACK_CODES.get(what,str(what)),
+ hdr['name'], amount, total)
+
+def addRPMInstall(ts, path):
+ fd = os.open(path, os.O_RDONLY)
+ try:
+ hdr = ts.hdrFromFdno(fd)
+ finally:
+ os.close(fd)
+ ts.addInstall(hdr, (hdr, path), "u")
+
+def addRPMErase(ts, name):
+ ts.addErase(name)
+
+def getInstalledRPMVersions(name, ts=None):
+ if ts is None:
+ ts = rpm.TransactionSet()
+ #XXXX need to close?
+
+ versions = set()
+ for match in ts.dbMatch(rpm.RPMTAG_NAME, name):
+ versions.add(match['version'])
+
+ return versions
+
+def fileMD5(fname):
+ d = md5.new()
+ try:
+ f = open(fname, 'r')
+ try:
+ while 1:
+ s = f.read(4096)
+ if not s:
+ break
+ d.update(s)
+
+ finally:
+ f.close()
+ except OSError, e:
+ logging.warn("Couldn't get digest of %s: %s", fname, e)
+ return None
+
+ return d.hexdigest()
+
+def checkRPMInstall(name, version, ts=None):
+ if ts is None:
+ ts = rpm.TransactionSet()
+ #XXXX need to close?
+
+ found = False
+ all_ok = True
+
+ for h in ts.dbMatch(rpm.RPMTAG_NAME, name):
+ if h['version'] != version:
+ continue
+
+ found = True
+
+ for fname, flags, md5sum in zip(h['filenames'], h['fileflags'], h['filemd5s']):
+ haveMD5 = fileMD5(fname)
+ if not haveMD5:
+ if flags & RPMFILE_MISSINGOK:
+ logging.info("%s is missing or unreadable from %s %s; "
+ "that's ok.", fname, name, h['version'])
+ else:
+ logging.warn("%s is missing or unreadable from %s %s."
+ fname, name, h['version'])
+ all_ok = False
+ elif haveMD5 == md5sum:
+ logging.info("%s is unchanged from %s %s",
+ fname, name, h['version'])
+ else:
+ # file changed. If it's not configuration, that's a problem.
+ if not flags & RPMFILE_CONFIG:
+ logging.warn("%s changed from installed version of %s %s",
+ fname, name, h['version'])
+ all_ok = False
+
+ return found and all_ok
+
+class RPMPackageHandle(thandy.packagesys.PackageSystem.PackageHandle):
+ def __init__(self, name, version, filename):
+ self._name = name
+ self._version = version
+ self._filename = filename
+
+ def anyVersionInstalled(self, transaction=None):
+ return len(getInstalledRPMVersions(self.name, transaction)) > 1
+
+ def getInstalledVersion(self, transaction=None):
+ s = max(getInstalledRPMVersions(self._name, transaction))
+
+ def install(self, transaction):
+ addRPMInstall(transaction._trans, self._filename)
+
+ def remove(self, transaction):
+ addRPMErase(transaction._trans, self._name)
+
+ def isInstalled(self, transaction=None):
+ return self._version in getInstalledRPMVersions(self._name,transaction)
+
+ def checkInstall(self, transaction=None):
+ return checkRPMInstall(self._name, self._version)
+
diff --git a/lib/thandy/packagesys/__init__.py b/lib/thandy/packagesys/__init__.py
new file mode 100644
index 0000000..4dd7019
--- /dev/null
+++ b/lib/thandy/packagesys/__init__.py
@@ -0,0 +1,4 @@
+# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information.
+
+__all__ = [ ]
+
diff --git a/lib/thandy/repository.py b/lib/thandy/repository.py
index 1385dd6..af2fa4f 100644
--- a/lib/thandy/repository.py
+++ b/lib/thandy/repository.py
@@ -16,8 +16,18 @@ import time
MAX_TIMESTAMP_AGE = 24*60*60
class RepositoryFile:
+ """Represents information about a file stored in our local repository
+ cache. Used to validate and load files.
+ """
def __init__(self, repository, relativePath, schema,
needRole=None, signedFormat=True, needSigs=1):
+ """Allocate a new RepositoryFile for a file to be stored under
+ the LocalRepository 'repository' in relativePath. Make
+ sure the file validates with 'schema' (or its signed form,
+ if 'signedFormat'). When checking signatures, this file needs
+ at least 'needSigs' signatures with role 'needRole'.
+ """
+ # These fields are as in the arguments.
self._repository = repository
self._relativePath = relativePath
self._schema = schema
@@ -25,17 +35,37 @@ class RepositoryFile:
self._signedFormat = signedFormat
self._needSigs = needSigs
- self._signed_obj = self._main_obj = None
+ # The contents of the file, parsed. None if we haven't loaded
+ # the file.
+ self._main_obj = None
+
+ # The contents of the file along with their signatures. May
+ # be aliased by _main_obj. None if we haven't loaded the
+ # file.
+ self._signed_obj = None
+
+ # A SignatureStatus object, if we have checked signatures.
+ self._sigStatus = None
+ # The mtime of the file on disk, if we know it.
+ self._mtime = None
+
+ def clear(self):
+ """DOCDOC"""
+ self._main_obj = self._signed_obj = None
self._sigStatus = None
self._mtime = None
def getRelativePath(self):
+ """Return the filename for this item relative to the top of the
+ repository."""
return self._relativePath
def getPath(self):
+ """Return the actual filename for this item."""
return self._repository.getFilename(self._relativePath)
def _load(self):
+ """Helper: load and parse this item's contents."""
fname = self.getPath()
# Propagate OSError
@@ -59,6 +89,7 @@ class RepositoryFile:
self._mtime = mtime
def _save(self, content=None):
+ """Helper: Flush this object's contents to disk."""
if content == None:
content = sexpr.encode
@@ -69,9 +100,13 @@ class RepositoryFile:
self._signed_obj = signed_obj
self._main_obj = main_obj
- self._mtime = mtime
+ self._mtime = time.time()
def _checkContent(self, content):
+ """Helper. Check whether 'content' matches SIGNED_SCHEMA, and
+ self._schema (as appropraite). Return a tuple of the
+ signed_schema match, and the schema match, or raise
+ FormatException."""
try:
obj = json.loads(content)
@@ -94,20 +129,26 @@ class RepositoryFile:
return signed_obj, main_obj
def load(self):
+ """Load this object from disk if it hasn't already been loaded."""
if self._main_obj == None:
self._load()
def get(self):
+ """Return the object, or None if it isn't loaded."""
return self._main_obj
def isLoaded(self):
+ """Return true iff this object is loaded."""
return self._main_obj != None
def getContent(self):
+ """Load this object as needed and return its content."""
self.load()
return self._main_obj
def _checkSignatures(self):
+ """Helper: Try to verify all the signatures on this object, and
+ cache the SignatureStatus object."""
self.load()
sigStatus = thandy.formats.checkSignatures(self._signed_obj,
self._repository._keyDB,
@@ -115,15 +156,47 @@ class RepositoryFile:
self._sigStatus = sigStatus
def checkSignatures(self):
+ """Try to verify all the signatures on this object if we
+ haven't already done so, and return a SignatureStatus
+ object."""
if self._sigStatus is None:
self._checkSignatures()
return self._sigStatus
+class PkgFile:
+ def __init__(self, repository, relativePath, needHash):
+ self._repository = repository
+ self._relativePath = relativePath
+ self._needHash = needHash
+
+ self._mtime = None
+
+ def clear(self):
+ self._mtime = None
+
+ def getRelativePath(self):
+ return self._relativePath
+
+ def getPath(self):
+ return self._repository.getFilename(self._relativePath)
+
+ def getExpectedHash(self):
+ return self._needHash
+
+ def checkFile(self):
+ return self._needHash == self._repository.getFileDigest()
+
class LocalRepository:
+ """Represents a client's partial copy of a remote mirrored repository."""
def __init__(self, root):
+ """Create a new local repository that stores its files under 'root'"""
+ # Top of our mirror.
self._root = root
+
+ # A base keylist of master keys; we'll add others later.
self._keyDB = thandy.util.getKeylist(None)
+ # Entries for the three invariant metafiles.
self._keylistFile = RepositoryFile(
self, "/meta/keys.txt", thandy.formats.KEYLIST_SCHEMA,
needRole="master")
@@ -133,28 +206,38 @@ class LocalRepository:
self._mirrorlistFile = RepositoryFile(
self, "/meta/mirrors.txt", thandy.formats.MIRRORLIST_SCHEMA,
needRole="mirrors")
+
self._metaFiles = [ self._keylistFile,
self._timestampFile,
self._mirrorlistFile ]
+ # Map from relative path to a RepositoryFile for packages.
self._packageFiles = {}
+
+ # Map from relative path to a RepositoryFile for bundles.
self._bundleFiles = {}
def getFilename(self, relativePath):
+ """Return the file on disk that caches 'relativePath'."""
if relativePath.startswith("/"):
relativePath = relativePath[1:]
return os.path.join(self._root, relativePath)
def getKeylistFile(self):
+ """Return a RepositoryFile for our keylist."""
return self._keylistFile
def getTimestampFile(self):
+ """Return a RepositoryFile for our timestamp file."""
return self._timestampFile
def getMirrorlistFile(self):
+ """Return a RepositoryFile for our mirrorlist."""
return self._mirrorlistFile
def getPackageFile(self, relPath):
+ """Return a RepositoryFile for a package stored at relative path
+ 'relPath'."""
try:
return self._packageFiles[relPath]
except KeyError:
@@ -164,6 +247,8 @@ class LocalRepository:
return pkg
def getBundleFile(self, relPath):
+ """Return a RepositoryFile for a bundle stored at relative path
+ 'relPath'."""
try:
return self._bundleFiles[relPath]
except KeyError:
@@ -172,10 +257,38 @@ class LocalRepository:
needRole='bundle')
return pkg
- def getFilesToUpdate(self, now=None, trackingBundles=()):
+ def getRequestedFile(self, relPath):
+ """ """
+ for f in self._metafiles:
+ if f.getRelativePath() == relPath:
+ return f
+ for f in self._bundleFiles.itervalues():
+ if f.getRelativePath() == relPath:
+ return f
+ for f in self._packageFiles.itervalues():
+ if f.getRelativePath() == relPath:
+ return f
+ f.load()
+ for item in f.get()['files']:
+ rp, h = item[:2]
+ if rp == relPath:
+ return PkgFile(self, rp, thandy.formats.parseHash(h))
+
+ def getFilesToUpdate(self, now=None, trackingBundles=(), hashDict=None):
+ """Return a set of relative paths for all files that we need
+ to fetch. Assumes that we care about the bundles
+ 'trackingBundles'. If hashDict is provided, add mappings to it
+ from the relative paths we want to fecth to the hashes that we
+ want those items to have, when we know those hashes.
+ """
+
if now == None:
now = time.time()
+ if hashDict == None:
+ # Use a dummy hashdict.
+ hashDict = {}
+
need = set()
# Fetch missing metafiles.
@@ -196,6 +309,8 @@ class LocalRepository:
age = now - thandy.formats.parseTime(ts['at'])
ts = thandy.formats.TimestampFile.fromJSon(ts)
if age > MAX_TIMESTAMP_AGE:
+ logging.info("Timestamp file from %s is out of "
+ "date; must fetch it.", ts['at'])
need.add(self._timestampFile.getRelativePath())
# If the keylist isn't signed right, we can't check the
@@ -203,6 +318,8 @@ class LocalRepository:
if self._keylistFile.get():
s = self._keylistFile.checkSignatures()
if not s.isValid(): # For now only require one master key.
+ logging.info("Key list is not properly signed; must get a "
+ "new one.")
need.add(self._keylistFile.getRelativePath())
if need:
@@ -215,6 +332,8 @@ class LocalRepository:
# new keylist.
s = self._timestampFile.checkSignatures()
if not s.isValid():
+ logging.info("Timestamp file is not properly signed; fetching new "
+ "timestamp file and keylist.")
need.add(self._keylistFile.getRelativePath())
need.add(self._timestampFile.getRelativePath())
return need
@@ -222,9 +341,15 @@ class LocalRepository:
# FINALLY, we know we have an up-to-date, signed timestamp
# file. Check whether the keys and mirrors file are as
# authenticated.
+ hashDict[self._keylistFile.getRelativePath()] = \
+ ts.getKeylistInfo().getHash()
+ hashDict[self._mirrorlistFile.getRelativePath()] = \
+ ts.getMirrorlistInfo().getHash()
+
h_kf = thandy.formats.getDigest(self._keylistFile.get())
h_expected = ts.getKeylistInfo().getHash()
if h_kf != h_expected:
+ logging.info("Keylist file hash did not match. Must fetch it.")
need.add(self._keylistFile.getRelativePath())
if need:
@@ -232,11 +357,13 @@ class LocalRepository:
s = self._mirrorlistFile.checkSignatures()
if not s.isValid():
+ logging.info("Mirrorlist file signatures not valid. Must fetch.")
need.add(self._mirrorlistFile.getRelativePath())
h_mf = thandy.formats.getDigest(self._mirrorlistFile.get())
h_expected = ts.getMirrorlistInfo().getHash()
if h_mf != h_expected:
+ logging.info("Mirrorlist file hash did not match. Must fetch.")
need.add(self._mirrorlistFile.getRelativePath())
if need:
@@ -249,26 +376,30 @@ class LocalRepository:
try:
binfo = ts.getBundleInfo(b)
except KeyError:
- logging.warn("Unrecognized bundle %s"%b)
+ logging.warn("Bundle %s not listed in timestamp file."%b)
continue
rp = binfo.getRelativePath()
+ hashDict[rp] = h_expected = binfo.getHash()
bfile = self.getBundleFile(rp)
try:
bfile.load()
except OSError:
+ logging.info("Can't find bundle %s on disk; must fetch.", rp)
need.add(rp)
continue
h_b = thandy.formats.getDigest(bfile.get())
- h_expected = binfo.getHash()
if h_b != h_expected:
+ logging.info("Bundle hash not as expected; must fetch.", rp)
need.add(rp)
continue
s = bfile.checkSignatures()
if not s.isValid():
# Can't actually use it.
+ logging.warn("Bundle hash was as expected, but signatures did "
+ "not match.")
continue
bundles[rp] = bfile
@@ -280,20 +411,26 @@ class LocalRepository:
for pkginfo in bundle['packages']:
rp = pkginfo['path']
pfile = self.getPackageFile(rp)
+ h_expected = thandy.formats.parseHash(pkginfo['hash'])
+ hashDict[rp] = h_expected
try:
pfile.load()
except OSError:
+ logging.info("Can't find package %s on disk; must fetch.",
+ rp)
need.add(rp)
continue
h_p = thandy.formats.getDigest(pfile.get())
- h_expected = thandy.formats.parseHash(pkginfo['hash'])
if h_p != h_expected:
+ logging.info("Wrong hash for package %s; must fetch.", rp)
need.add(rp)
continue
s = pfile.checkSignatures()
if not s.isValid():
+ logging.warn("Package hash was as expected, but signature "
+ "did nto match")
# Can't use it.
continue
packages[rp] = pfile
@@ -305,13 +442,17 @@ class LocalRepository:
for f in package['files']:
rp, h = f[:2]
h_expected = thandy.formats.parseHash(h)
+ hashDict[rp] = h_expected
fn = self.getFilename(rp)
try:
h_got = thandy.formats.getFileDigest(fn)
except OSError:
+ logging.info("Installable file %s not found on disk; "
+ "must load", rp)
need.add(rp)
continue
if h_got != h_expected:
+ logging.info("Hash for %s not as expected; must load.", rp)
need.add(rp)
# Okay; these are the files we need.
diff --git a/lib/thandy/socksurls.py b/lib/thandy/socksurls.py
new file mode 100644
index 0000000..d035a6c
--- /dev/null
+++ b/lib/thandy/socksurls.py
@@ -0,0 +1,93 @@
+# Copyright 2008 The Tor Project, Inc. See LICENSE for licensing information.
+
+"""Implements URL types for socks-mediated connections."""
+
+import socket
+import httplib
+import logging
+import struct
+import urllib2
+
+# XXXX This isn't really threadsafe, but for now we don't change this after
+# startup.
+SOCKS_HOST = None
+SOCKS_PORT = None
+
+def setSocksProxy(host, port):
+ """Set the global SOCKS proxy to host:port."""
+ global SOCKS_HOST, SOCKS_PORT
+ SOCKS_HOST = host
+ SOCKS_PORT = port
+
+def _recvall(sock, n):
+ """Helper: fetch N bytes from the socket sock."""
+ result = ""
+ while 1:
+ s = sock.recv(n)
+ if not s:
+ return result
+ result += s
+ n -= len(s)
+ if n <= 0:
+ return result
+
+def socks_connect(host, port):
+ """Helper: use the SOCKS proxy to open a connection to host:port.
+ Uses the simple and Tor-friendly SOCKS4a protocol."""
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ logging.debug("Connecting to SOCKS proxy")
+ sock.connect((SOCKS_HOST, SOCKS_PORT))
+
+ # Now, the handshake! We just do socks4a, since that's the simplest.
+ version = 4 # socks 4
+ command = 1 # connect
+ addr = 1 # 0.0.0.1, signals socks4a.
+ userid = ""
+
+ messageheader = struct.pack("!BBHL", version, command, port, addr)
+ message = "%s%s\x00%s\x00" % (messageheader, userid, host)
+
+ sock.sendall(message)
+
+ logging.debug("Waiting for reply from SOCKS proxy")
+ reply = _recvall(sock, 8)
+ code = ord(reply[1])
+ if code == 0x5a:
+ logging.debug("SOCKS proxy is connected.")
+ return sock
+ else:
+ raise socket.error("Bad SOCKS response code from proxy: %d", code)
+ except:
+ sock.close()
+ raise
+
+# Copies of HTTPConnection and HTTPSConnection that use socks instead of
+# direct connections.
+class SocksHTTPConnection(httplib.HTTPConnection):
+ def connect(self):
+ self.sock = socks_connect(self.host, self.port)
+class SocksHTTPSConnection(httplib.HTTPSConnection):
+ def connect(self):
+ socket = socks_connect(self.host, self.port)
+ ssl = socket.ssl(sock, None, None)
+ self.sock = socket.FakeSocket(socket, ssl)
+
+# URL handlers for HTTP and HTTPS urls that use socks instead of direct
+# connections.
+class SocksHTTPHandler(urllib2.AbstractHTTPHandler):
+ def http_open(self, req):
+ return self.do_open(SocksHTTPConnection, req)
+ http_request = urllib2.AbstractHTTPHandler.do_request_
+class SocksHTTPSHandler(urllib2.AbstractHTTPHandler):
+ def https_open(self, req):
+ return self.do_open(SocksHTTPSConnection, req)
+ https_request = urllib2.AbstractHTTPHandler.do_request_
+
+def build_socks_opener():
+ """Return an urllib2.OpenerDirector object to open HTTP and HTTPS
+ urls using SOCKS connections."""
+ opener = urllib2.OpenerDirector()
+ opener.add_handler(SocksHTTPSHandler())
+ opener.add_handler(SocksHTTPHandler())
+ return opener
diff --git a/lib/thandy/util.py b/lib/thandy/util.py
index b7281ae..2db7df4 100644
--- a/lib/thandy/util.py
+++ b/lib/thandy/util.py
@@ -3,6 +3,7 @@
import os
import sys
import tempfile
+import random
try:
import json
@@ -20,6 +21,7 @@ def moveFile(fromLocation, toLocation):
os.unlink(toLocation)
except OSError:
pass
+
os.rename(fromLocation, toLocation)
@@ -75,3 +77,21 @@ def getKeylist(keys_fname, checkKeys=True):
keydb.addFromKeylist(obj['signed'], allowMasterKeys=False)
return keydb
+
+def randChooseWeighted(lst):
+ """Given a list of (weight,item) tuples, pick an item with
+ probability proportional to its weight.
+ """
+
+ totalweight = sum(w for w,i in lst)
+ position = random.uniform(0, totalweight)
+ soFar = 0
+
+ # We could use bisect here, but this is not going to be in the
+ # critical path. If it is, oops.
+ for w,i in lst:
+ soFar += w
+ if position < soFar:
+ return i
+
+ return lst[-1][1]