diff options
Diffstat (limited to 'client/src/leap')
-rw-r--r-- | client/src/leap/soledad/client/_version.py | 489 | ||||
-rw-r--r-- | client/src/leap/soledad/client/api.py | 78 | ||||
-rw-r--r-- | client/src/leap/soledad/client/crypto.py | 16 | ||||
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 27 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/__init__.py | 1 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/api.py | 5 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 14 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/send.py | 59 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/support.py | 13 | ||||
-rw-r--r-- | client/src/leap/soledad/client/secrets.py | 22 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sync.py | 4 |
11 files changed, 663 insertions, 65 deletions
diff --git a/client/src/leap/soledad/client/_version.py b/client/src/leap/soledad/client/_version.py index 62315c76..3ee3f81b 100644 --- a/client/src/leap/soledad/client/_version.py +++ b/client/src/leap/soledad/client/_version.py @@ -1,13 +1,484 @@ -# This file was generated by the `freeze_debianver` command in setup.py -# Using 'versioneer.py' (0.7+) from -# revision-control system data, or from the parent directory name of an -# unpacked source archive. Distribution tarballs contain a pre-generated copy -# of this file. +# This file helps to compute a version number in source trees obtained from +# git-archive tarball (such as those provided by githubs download-from-tag +# feature). Distribution tarballs (built by setup.py sdist) and build +# directories (produced by setup.py build) will contain a much shorter file +# that just contains the computed version number. -version_version = '0.7.4' -version_full = '49fd07cde3b1f50dcce85d4e9fcdfc6196f484c4' +# This file is released into the public domain. Generated by +# versioneer-0.16 (https://github.com/warner/python-versioneer) +"""Git implementation of _version.py.""" -def get_versions(default={}, verbose=False): - return {'version': version_version, 'full': version_full} +import errno +import os +import re +import subprocess +import sys + + +def get_keywords(): + """Get the keywords needed to look up the version information.""" + # these strings will be replaced by git during git-archive. + # setup.py/versioneer.py will grep for the variable names, so they must + # each be defined on a line of their own. _version.py will just call + # get_keywords(). + git_refnames = "$Format:%d$" + git_full = "$Format:%H$" + keywords = {"refnames": git_refnames, "full": git_full} + return keywords + + +class VersioneerConfig: + """Container for Versioneer configuration parameters.""" + + +def get_config(): + """Create, populate and return the VersioneerConfig() object.""" + # these strings are filled in when 'setup.py versioneer' creates + # _version.py + cfg = VersioneerConfig() + cfg.VCS = "git" + cfg.style = "pep440" + cfg.tag_prefix = "" + cfg.parentdir_prefix = "None" + cfg.versionfile_source = "src/leap/soledad/client/_version.py" + cfg.verbose = False + return cfg + + +class NotThisMethod(Exception): + """Exception raised if a method is not valid for the current scenario.""" + + +LONG_VERSION_PY = {} +HANDLERS = {} + + +def register_vcs_handler(vcs, method): # decorator + """Decorator to mark a method as the handler for a particular VCS.""" + def decorate(f): + """Store f in HANDLERS[vcs][method].""" + if vcs not in HANDLERS: + HANDLERS[vcs] = {} + HANDLERS[vcs][method] = f + return f + return decorate + + +def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False): + """Call the given command(s).""" + assert isinstance(commands, list) + p = None + for c in commands: + try: + dispcmd = str([c] + args) + # remember shell=False, so use git.cmd on windows, not just git + p = subprocess.Popen([c] + args, cwd=cwd, stdout=subprocess.PIPE, + stderr=(subprocess.PIPE if hide_stderr + else None)) + break + except EnvironmentError: + e = sys.exc_info()[1] + if e.errno == errno.ENOENT: + continue + if verbose: + print("unable to run %s" % dispcmd) + print(e) + return None + else: + if verbose: + print("unable to find command, tried %s" % (commands,)) + return None + stdout = p.communicate()[0].strip() + if sys.version_info[0] >= 3: + stdout = stdout.decode() + if p.returncode != 0: + if verbose: + print("unable to run %s (error)" % dispcmd) + return None + return stdout + + +def versions_from_parentdir(parentdir_prefix, root, verbose): + """Try to determine the version from the parent directory name. + + Source tarballs conventionally unpack into a directory that includes + both the project name and a version string. + """ + dirname = os.path.basename(root) + if not dirname.startswith(parentdir_prefix): + if verbose: + print("guessing rootdir is '%s', but '%s' doesn't start with " + "prefix '%s'" % (root, dirname, parentdir_prefix)) + raise NotThisMethod("rootdir doesn't start with parentdir_prefix") + return {"version": dirname[len(parentdir_prefix):], + "full-revisionid": None, + "dirty": False, "error": None} + + +@register_vcs_handler("git", "get_keywords") +def git_get_keywords(versionfile_abs): + """Extract version information from the given file.""" + # the code embedded in _version.py can just fetch the value of these + # keywords. When used from setup.py, we don't want to import _version.py, + # so we do it with a regexp instead. This function is not used from + # _version.py. + keywords = {} + try: + f = open(versionfile_abs, "r") + for line in f.readlines(): + if line.strip().startswith("git_refnames ="): + mo = re.search(r'=\s*"(.*)"', line) + if mo: + keywords["refnames"] = mo.group(1) + if line.strip().startswith("git_full ="): + mo = re.search(r'=\s*"(.*)"', line) + if mo: + keywords["full"] = mo.group(1) + f.close() + except EnvironmentError: + pass + return keywords + + +@register_vcs_handler("git", "keywords") +def git_versions_from_keywords(keywords, tag_prefix, verbose): + """Get version information from git keywords.""" + if not keywords: + raise NotThisMethod("no keywords at all, weird") + refnames = keywords["refnames"].strip() + if refnames.startswith("$Format"): + if verbose: + print("keywords are unexpanded, not using") + raise NotThisMethod("unexpanded keywords, not a git-archive tarball") + refs = set([r.strip() for r in refnames.strip("()").split(",")]) + # starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of + # just "foo-1.0". If we see a "tag: " prefix, prefer those. + TAG = "tag: " + tags = set([r[len(TAG):] for r in refs if r.startswith(TAG)]) + if not tags: + # Either we're using git < 1.8.3, or there really are no tags. We use + # a heuristic: assume all version tags have a digit. The old git %d + # expansion behaves like git log --decorate=short and strips out the + # refs/heads/ and refs/tags/ prefixes that would let us distinguish + # between branches and tags. By ignoring refnames without digits, we + # filter out many common branch names like "release" and + # "stabilization", as well as "HEAD" and "master". + tags = set([r for r in refs if re.search(r'\d', r)]) + if verbose: + print("discarding '%s', no digits" % ",".join(refs-tags)) + if verbose: + print("likely tags: %s" % ",".join(sorted(tags))) + for ref in sorted(tags): + # sorting will prefer e.g. "2.0" over "2.0rc1" + if ref.startswith(tag_prefix): + r = ref[len(tag_prefix):] + if verbose: + print("picking %s" % r) + return {"version": r, + "full-revisionid": keywords["full"].strip(), + "dirty": False, "error": None + } + # no suitable tags, so version is "0+unknown", but full hex is still there + if verbose: + print("no suitable tags, using unknown + full revision id") + return {"version": "0+unknown", + "full-revisionid": keywords["full"].strip(), + "dirty": False, "error": "no suitable tags"} + + +@register_vcs_handler("git", "pieces_from_vcs") +def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command): + """Get version from 'git describe' in the root of the source tree. + + This only gets called if the git-archive 'subst' keywords were *not* + expanded, and _version.py hasn't already been rewritten with a short + version string, meaning we're inside a checked out source tree. + """ + if not os.path.exists(os.path.join(root, ".git")): + if verbose: + print("no .git in %s" % root) + raise NotThisMethod("no .git directory") + + GITS = ["git"] + if sys.platform == "win32": + GITS = ["git.cmd", "git.exe"] + # if there is a tag matching tag_prefix, this yields TAG-NUM-gHEX[-dirty] + # if there isn't one, this yields HEX[-dirty] (no NUM) + describe_out = run_command(GITS, ["describe", "--tags", "--dirty", + "--always", "--long", + "--match", "%s*" % tag_prefix], + cwd=root) + # --long was added in git-1.5.5 + if describe_out is None: + raise NotThisMethod("'git describe' failed") + describe_out = describe_out.strip() + full_out = run_command(GITS, ["rev-parse", "HEAD"], cwd=root) + if full_out is None: + raise NotThisMethod("'git rev-parse' failed") + full_out = full_out.strip() + + pieces = {} + pieces["long"] = full_out + pieces["short"] = full_out[:7] # maybe improved later + pieces["error"] = None + + # parse describe_out. It will be like TAG-NUM-gHEX[-dirty] or HEX[-dirty] + # TAG might have hyphens. + git_describe = describe_out + + # look for -dirty suffix + dirty = git_describe.endswith("-dirty") + pieces["dirty"] = dirty + if dirty: + git_describe = git_describe[:git_describe.rindex("-dirty")] + + # now we have TAG-NUM-gHEX or HEX + + if "-" in git_describe: + # TAG-NUM-gHEX + mo = re.search(r'^(.+)-(\d+)-g([0-9a-f]+)$', git_describe) + if not mo: + # unparseable. Maybe git-describe is misbehaving? + pieces["error"] = ("unable to parse git-describe output: '%s'" + % describe_out) + return pieces + + # tag + full_tag = mo.group(1) + if not full_tag.startswith(tag_prefix): + if verbose: + fmt = "tag '%s' doesn't start with prefix '%s'" + print(fmt % (full_tag, tag_prefix)) + pieces["error"] = ("tag '%s' doesn't start with prefix '%s'" + % (full_tag, tag_prefix)) + return pieces + pieces["closest-tag"] = full_tag[len(tag_prefix):] + + # distance: number of commits since tag + pieces["distance"] = int(mo.group(2)) + + # commit: short hex revision ID + pieces["short"] = mo.group(3) + + else: + # HEX: no tags + pieces["closest-tag"] = None + count_out = run_command(GITS, ["rev-list", "HEAD", "--count"], + cwd=root) + pieces["distance"] = int(count_out) # total number of commits + + return pieces + + +def plus_or_dot(pieces): + """Return a + if we don't already have one, else return a .""" + if "+" in pieces.get("closest-tag", ""): + return "." + return "+" + + +def render_pep440(pieces): + """Build up version string, with post-release "local version identifier". + + Our goal: TAG[+DISTANCE.gHEX[.dirty]] . Note that if you + get a tagged build and then dirty it, you'll get TAG+0.gHEX.dirty + + Exceptions: + 1: no tags. git_describe was just HEX. 0+untagged.DISTANCE.gHEX[.dirty] + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + rendered += plus_or_dot(pieces) + rendered += "%d.g%s" % (pieces["distance"], pieces["short"]) + if pieces["dirty"]: + rendered += ".dirty" + else: + # exception #1 + rendered = "0+untagged.%d.g%s" % (pieces["distance"], + pieces["short"]) + if pieces["dirty"]: + rendered += ".dirty" + return rendered + + +def render_pep440_pre(pieces): + """TAG[.post.devDISTANCE] -- No -dirty. + + Exceptions: + 1: no tags. 0.post.devDISTANCE + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"]: + rendered += ".post.dev%d" % pieces["distance"] + else: + # exception #1 + rendered = "0.post.dev%d" % pieces["distance"] + return rendered + + +def render_pep440_post(pieces): + """TAG[.postDISTANCE[.dev0]+gHEX] . + + The ".dev0" means dirty. Note that .dev0 sorts backwards + (a dirty tree will appear "older" than the corresponding clean one), + but you shouldn't be releasing software with -dirty anyways. + + Exceptions: + 1: no tags. 0.postDISTANCE[.dev0] + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + rendered += ".post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + rendered += plus_or_dot(pieces) + rendered += "g%s" % pieces["short"] + else: + # exception #1 + rendered = "0.post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + rendered += "+g%s" % pieces["short"] + return rendered + + +def render_pep440_old(pieces): + """TAG[.postDISTANCE[.dev0]] . + + The ".dev0" means dirty. + + Eexceptions: + 1: no tags. 0.postDISTANCE[.dev0] + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + rendered += ".post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + else: + # exception #1 + rendered = "0.post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + return rendered + + +def render_git_describe(pieces): + """TAG[-DISTANCE-gHEX][-dirty]. + + Like 'git describe --tags --dirty --always'. + + Exceptions: + 1: no tags. HEX[-dirty] (note: no 'g' prefix) + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"]: + rendered += "-%d-g%s" % (pieces["distance"], pieces["short"]) + else: + # exception #1 + rendered = pieces["short"] + if pieces["dirty"]: + rendered += "-dirty" + return rendered + + +def render_git_describe_long(pieces): + """TAG-DISTANCE-gHEX[-dirty]. + + Like 'git describe --tags --dirty --always -long'. + The distance/hash is unconditional. + + Exceptions: + 1: no tags. HEX[-dirty] (note: no 'g' prefix) + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + rendered += "-%d-g%s" % (pieces["distance"], pieces["short"]) + else: + # exception #1 + rendered = pieces["short"] + if pieces["dirty"]: + rendered += "-dirty" + return rendered + + +def render(pieces, style): + """Render the given version pieces into the requested style.""" + if pieces["error"]: + return {"version": "unknown", + "full-revisionid": pieces.get("long"), + "dirty": None, + "error": pieces["error"]} + + if not style or style == "default": + style = "pep440" # the default + + if style == "pep440": + rendered = render_pep440(pieces) + elif style == "pep440-pre": + rendered = render_pep440_pre(pieces) + elif style == "pep440-post": + rendered = render_pep440_post(pieces) + elif style == "pep440-old": + rendered = render_pep440_old(pieces) + elif style == "git-describe": + rendered = render_git_describe(pieces) + elif style == "git-describe-long": + rendered = render_git_describe_long(pieces) + else: + raise ValueError("unknown style '%s'" % style) + + return {"version": rendered, "full-revisionid": pieces["long"], + "dirty": pieces["dirty"], "error": None} + + +def get_versions(): + """Get version information or return default if unable to do so.""" + # I am in _version.py, which lives at ROOT/VERSIONFILE_SOURCE. If we have + # __file__, we can work backwards from there to the root. Some + # py2exe/bbfreeze/non-CPython implementations don't do __file__, in which + # case we can only use expanded keywords. + + cfg = get_config() + verbose = cfg.verbose + + try: + return git_versions_from_keywords(get_keywords(), cfg.tag_prefix, + verbose) + except NotThisMethod: + pass + + try: + root = os.path.realpath(__file__) + # versionfile_source is the relative path from the top of the source + # tree (where the .git directory might live) to this file. Invert + # this to find the root from __file__. + for i in cfg.versionfile_source.split('/'): + root = os.path.dirname(root) + except NameError: + return {"version": "0+unknown", "full-revisionid": None, + "dirty": None, + "error": "unable to find root of source tree"} + + try: + pieces = git_pieces_from_vcs(cfg.tag_prefix, root, verbose) + return render(pieces, cfg.style) + except NotThisMethod: + pass + + try: + if cfg.parentdir_prefix: + return versions_from_parentdir(cfg.parentdir_prefix, root, verbose) + except NotThisMethod: + pass + + return {"version": "0+unknown", "full-revisionid": None, + "dirty": None, + "error": "unable to compute version"} diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 8c5f7f1b..e657c939 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -32,6 +32,7 @@ import logging import os import socket import ssl +import uuid import urlparse try: @@ -44,11 +45,11 @@ from StringIO import StringIO from collections import defaultdict from u1db.remote import http_client from u1db.remote.ssl_match_hostname import match_hostname +from twisted.internet.defer import DeferredLock, returnValue, inlineCallbacks from zope.interface import implements from leap.common.config import get_path_prefix from leap.common.plugins import collect_plugins -from twisted.internet.defer import DeferredLock from leap.soledad.common import SHARED_DB_NAME from leap.soledad.common import soledad_assert @@ -102,11 +103,11 @@ class Soledad(object): soledad starts to retrieve keys from server. SOLEDAD_DONE_DOWNLOADING_KEYS: emitted during bootstrap sequence when soledad finishes downloading keys from server. - SOLEDAD_NEW_DATA_TO_SYNC: emitted upon call to C{need_sync()} when - there's indeed new data to be synchronized between local database - replica and server's replica. SOLEDAD_DONE_DATA_SYNC: emitted inside C{sync()} method when it has finished synchronizing with remote replica. + SOLEDAD_NEW_DATA_TO_SYNC: emitted upon call to C{need_sync()} when + there's indeed new data to be synchronized between local database + replica and server's replica. --- not used right now. """ implements(soledad_interfaces.ILocalStorage, soledad_interfaces.ISyncableStorage, @@ -125,7 +126,8 @@ class Soledad(object): def __init__(self, uuid, passphrase, secrets_path, local_db_path, server_url, cert_file, shared_db=None, - auth_token=None, defer_encryption=False, syncable=True): + auth_token=None, defer_encryption=False, syncable=True, + userid=None): """ Initialize configuration, cryptographic keys and dbs. @@ -179,12 +181,14 @@ class Soledad(object): """ # store config params self._uuid = uuid + self._userid = userid self._passphrase = passphrase self._local_db_path = local_db_path self._server_url = server_url self._defer_encryption = defer_encryption self._secrets_path = None self._sync_enc_pool = None + self._dbsyncer = None self.shared_db = shared_db @@ -215,6 +219,7 @@ class Soledad(object): # # initialization/destruction methods # + def _init_config_with_defaults(self): """ Initialize configuration using default values for missing params. @@ -250,7 +255,7 @@ class Soledad(object): """ self._secrets = SoledadSecrets( self.uuid, self._passphrase, self._secrets_path, - self.shared_db) + self.shared_db, userid=self._userid) self._secrets.bootstrap() def _init_u1db_sqlcipher_backend(self): @@ -648,10 +653,29 @@ class Soledad(object): def uuid(self): return self._uuid + @property + def userid(self): + return self._userid + # # ISyncableStorage # + def set_syncable(self, syncable): + """ + Toggle the syncable state for this database. + + This can be used to start a database with offline state and switch it + online afterwards. Or the opposite: stop syncs when connection is lost. + + :param syncable: new status for syncable. + :type syncable: bool + """ + # TODO should check that we've got a token! + self.shared_db.syncable = syncable + if syncable and not self._dbsyncer: + self._init_u1db_syncer() + def sync(self, defer_decryption=True): """ Synchronize documents with the server replica. @@ -718,8 +742,9 @@ class Soledad(object): return failure def _emit_done_data_sync(passthrough): + user_data = {'uuid': self.uuid, 'userid': self.userid} soledad_events.emit_async( - soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid) + soledad_events.SOLEDAD_DONE_DATA_SYNC, user_data) return passthrough d.addCallbacks(_sync_callback, _sync_errback) @@ -747,6 +772,13 @@ class Soledad(object): """ return self.sync_lock.locked + @property + def syncable(self): + if self.shared_db: + return self.shared_db.syncable + else: + return False + def _set_token(self, token): """ Set the authentication token for remote database access. @@ -911,6 +943,38 @@ class Soledad(object): """ return self._dbpool.runOperation(*args, **kw) + # + # Service authentication + # + + @inlineCallbacks + def get_or_create_service_token(self, service): + """ + Return the stored token for a given service, or generates and stores a + random one if it does not exist. + + These tokens can be used to authenticate services. + """ + # FIXME this could use the local sqlcipher database, to avoid + # problems with different replicas creating different tokens. + + yield self.create_index('by-servicetoken', 'type', 'service') + docs = yield self._get_token_for_service(service) + if docs: + doc = docs[0] + returnValue(doc.content['token']) + else: + token = str(uuid.uuid4()).replace('-', '')[-24:] + yield self._set_token_for_service(service, token) + returnValue(token) + + def _get_token_for_service(self, service): + return self.get_from_index('by-servicetoken', 'servicetoken', service) + + def _set_token_for_service(self, service, token): + doc = {'type': 'servicetoken', 'service': service, 'token': token} + return self.create_doc(doc) + def _convert_to_unicode(content): """ diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 90ad656e..363d71b9 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -24,7 +24,9 @@ import hashlib import json import logging -from pycryptopp.cipher.aes import AES +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends.multibackend import MultiBackend +from cryptography.hazmat.backends.openssl.backend import Backend as OpenSSLBackend from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type @@ -56,7 +58,10 @@ def encrypt_sym(data, key): (len(key) * 8)) iv = os.urandom(16) - ciphertext = AES(key=key, iv=iv).process(data) + backend = MultiBackend([OpenSSLBackend()]) + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) + encryptor = cipher.encryptor() + ciphertext = encryptor.update(data) + encryptor.finalize() return binascii.b2a_base64(iv), ciphertext @@ -81,8 +86,11 @@ def decrypt_sym(data, key, iv): soledad_assert( len(key) == 32, # 32 x 8 = 256 bits. 'Wrong key size: %s (must be 256 bits long).' % len(key)) - return AES( - key=key, iv=binascii.a2b_base64(iv)).process(data) + backend = MultiBackend([OpenSSLBackend()]) + iv = binascii.a2b_base64(iv) + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) + decryptor = cipher.decryptor() + return decryptor.update(data) + decryptor.finalize() def doc_mac_key(doc_id, secret): diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 6d3c11b9..34667a1e 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -28,6 +28,7 @@ import json import logging from twisted.internet import reactor +from twisted.internet import threads from twisted.internet import defer from twisted.python import log @@ -70,11 +71,13 @@ class SyncEncryptDecryptPool(object): self._started = False def start(self): + if self.running: + return self._create_pool() self._started = True def stop(self): - if not self._started: + if not self.running: return self._started = False self._destroy_pool() @@ -650,14 +653,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): last_idx = self._last_inserted_idx for doc_id, rev, content, gen, trans_id, encrypted, idx in \ decrypted_docs: - # XXX for some reason, a document might not have been deleted from - # the database. This is a bug. In this point, already - # processed documents should have been removed from the sync - # database and we should not have to skip them here. We need - # to find out why this is happening, fix, and remove the - # skipping below. - if (idx < last_idx + 1): - continue if (idx != last_idx + 1): break insertable.append((doc_id, rev, content, gen, trans_id, idx)) @@ -693,7 +688,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ insertable = yield self._get_insertable_docs() for doc_fields in insertable: - self._insert_decrypted_local_doc(*doc_fields) + method = self._insert_decrypted_local_doc + # FIXME: This is used only because SQLCipherU1DBSync is synchronous + # When adbapi is used there is no need for an external thread + # Without this the reactor can freeze and fail docs download + yield threads.deferToThread(method, *doc_fields) defer.returnValue(insertable) def _delete_processed_docs(self, inserted): @@ -763,6 +762,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) return self._runOperation(query) + @defer.inlineCallbacks def _collect_async_decryption_results(self): """ Collect the results of the asynchronous doc decryptions and re-raise @@ -773,7 +773,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): async_results = self._async_results[:] for res in async_results: if res.ready(): - self._decrypt_doc_cb(res.get()) # might raise an exception! + # XXX: might raise an exception! + yield self._decrypt_doc_cb(res.get()) self._async_results.remove(res) @defer.inlineCallbacks @@ -796,7 +797,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): if processed < pending: yield self._async_decrypt_received_docs() - self._collect_async_decryption_results() + yield self._collect_async_decryption_results() docs = yield self._process_decrypted_docs() yield self._delete_processed_docs(docs) # recurse @@ -807,6 +808,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._finish() def _finish(self): - self._deferred.callback(None) self._processed_docs = 0 self._last_inserted_idx = 0 + self._deferred.callback(None) diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py index 498fb6e7..a16531ef 100644 --- a/client/src/leap/soledad/client/http_target/__init__.py +++ b/client/src/leap/soledad/client/http_target/__init__.py @@ -79,6 +79,7 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): self._url = str(url) + "/sync-from/" + str(source_replica_uid) self.source_replica_uid = source_replica_uid self._auth_header = None + self._uuid = None self.set_creds(creds) self._crypto = crypto self._sync_db = sync_db diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index dcc762f6..94354092 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -40,6 +40,10 @@ class SyncTargetAPI(SyncTarget): self._sync_decr_pool.stop() yield self._http.close() + @property + def uuid(self): + return self._uuid + def set_creds(self, creds): """ Update credentials. @@ -49,6 +53,7 @@ class SyncTargetAPI(SyncTarget): """ uuid = creds['token']['uuid'] token = creds['token']['token'] + self._uuid = uuid auth = '%s:%s' % (uuid, token) b64_token = base64.b64encode(auth) self._auth_header = {'Authorization': ['Token %s' % b64_token]} diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 65e576d9..9f7a4193 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -39,6 +39,13 @@ class HTTPDocFetcher(object): So we parse, decrypt and insert locally as they arrive. """ + # The uuid of the local replica. + # Any class inheriting from this one should provide a meaningful attribute + # if the sync status event is meant to be used somewhere else. + + uuid = 'undefined' + userid = 'undefined' + @defer.inlineCallbacks def _receive_docs(self, last_known_generation, last_known_trans_id, ensure_callback, sync_id, defer_decryption): @@ -176,7 +183,8 @@ class HTTPDocFetcher(object): # end of symmetric decryption # ------------------------------------------------------------- self._received_docs += 1 - _emit_receive_status(self._received_docs, total) + user_data = {'uuid': self.uuid, 'userid': self.userid} + _emit_receive_status(user_data, self._received_docs, total) return number_of_changes, new_generation, new_transaction_id def _parse_received_doc_response(self, response): @@ -243,9 +251,9 @@ class HTTPDocFetcher(object): source_replica_uid=self.source_replica_uid) -def _emit_receive_status(received_docs, total): +def _emit_receive_status(user_data, received_docs, total): content = {'received': received_docs, 'total': total} - emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, content) + emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, user_data, content) if received_docs % 20 == 0: msg = "%d/%d" % (received_docs, total) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index 80483f0d..89288779 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -29,6 +29,15 @@ class HTTPDocSender(object): They need to be encrypted and metadata prepared before sending. """ + MAX_BATCH_SIZE = 0 # disabled by now, this is being tested yet + + # The uuid of the local replica. + # Any class inheriting from this one should provide a meaningful attribute + # if the sync status event is meant to be used somewhere else. + + uuid = 'undefined' + userid = 'undefined' + @defer.inlineCallbacks def _send_docs(self, docs_by_generation, last_known_generation, last_known_trans_id, sync_id): @@ -43,25 +52,43 @@ class HTTPDocSender(object): sync_id=sync_id, ensure=self._ensure_callback is not None) total = len(docs_by_generation) - for idx, entry in enumerate(docs_by_generation, 1): - yield self._prepare_one_doc(entry, body, idx, total) - result = yield self._http_request( - self._url, - method='POST', - body=body.pop(1), - content_type='application/x-soledad-sync-put') - if self._defer_encryption: - self._delete_sent(idx, docs_by_generation) - _emit_send_status(idx, total) + while body.consumed < total: + result = yield self._send_batch(total, body, docs_by_generation) response_dict = json.loads(result)[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] defer.returnValue([gen_after_send, trans_id_after_send]) - def _delete_sent(self, idx, docs_by_generation): - doc = docs_by_generation[idx - 1][0] - self._sync_enc_pool.delete_encrypted_doc( - doc.doc_id, doc.rev) + def _delete_sent(self, docs): + for doc, gen, trans_id in docs: + self._sync_enc_pool.delete_encrypted_doc( + doc.doc_id, doc.rev) + + @defer.inlineCallbacks + def _send_batch(self, total, body, docs): + sent = [] + missing = total - body.consumed + for i in xrange(1, missing + 1): + if body.pending_size > self.MAX_BATCH_SIZE: + break + idx = body.consumed + i + entry = docs[idx - 1] + sent.append(entry) + yield self._prepare_one_doc(entry, body, idx, total) + result = yield self._send_request(body.pop()) + if self._defer_encryption: + self._delete_sent(sent) + + user_data = {'uuid': self.uuid, 'userid': self.userid} + _emit_send_status(self.uuid, body.consumed, total) + defer.returnValue(result) + + def _send_request(self, body): + return self._http_request( + self._url, + method='POST', + body=body, + content_type='application/x-soledad-sync-put') @defer.inlineCallbacks def _prepare_one_doc(self, entry, body, idx, total): @@ -94,9 +121,9 @@ class HTTPDocSender(object): return d -def _emit_send_status(idx, total): +def _emit_send_status(user_data, idx, total): content = {'sent': idx, 'total': total} - emit_async(SOLEDAD_SYNC_SEND_STATUS, content) + emit_async(SOLEDAD_SYNC_SEND_STATUS, user_data, content) msg = "%d/%d" % (idx, total) logger.debug("Sync send status: %s" % msg) diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index 44cd7089..2625744c 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -152,6 +152,8 @@ class RequestBody(object): """ self.headers = header_dict self.entries = [] + self.consumed = 0 + self.pending_size = 0 def insert_info(self, **entry_dict): """ @@ -165,11 +167,11 @@ class RequestBody(object): """ entry = json.dumps(entry_dict) self.entries.append(entry) - return len(entry) + self.pending_size += len(entry) - def pop(self, number=1): + def pop(self): """ - Removes an amount of entries and returns it formatted and ready + Removes all entries and returns it formatted and ready to be sent. :param number: number of entries to pop and format @@ -178,7 +180,10 @@ class RequestBody(object): :return: formatted body ready to be sent :rtype: str """ - entries = [self.entries.pop(0) for i in xrange(number)] + entries = self.entries[:] + self.entries = [] + self.pending_size = 0 + self.consumed += len(entries) return self.entries_to_str(entries) def __str__(self): diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index c3c3dff5..e2a5a1d7 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -147,7 +147,7 @@ class SoledadSecrets(object): Keys used to access storage secrets in recovery documents. """ - def __init__(self, uuid, passphrase, secrets_path, shared_db): + def __init__(self, uuid, passphrase, secrets_path, shared_db, userid=None): """ Initialize the secrets manager. @@ -167,6 +167,7 @@ class SoledadSecrets(object): # param secret_id: The id of the storage secret to be used. self._uuid = uuid + self._userid = userid self._passphrase = passphrase self._secrets_path = secrets_path self._shared_db = shared_db @@ -433,13 +434,15 @@ class SoledadSecrets(object): :return: a document with encrypted key material in its contents :rtype: document.SoledadDocument """ - events.emit_async(events.SOLEDAD_DOWNLOADING_KEYS, self._uuid) + user_data = self._get_user_data() + events.emit_async(events.SOLEDAD_DOWNLOADING_KEYS, user_data) db = self._shared_db if not db: logger.warning('No shared db found') return doc = db.get_doc(self._shared_db_doc_id()) - events.emit_async(events.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid) + user_data = {'userid': self._userid, 'uuid': self._uuid} + events.emit_async(events.SOLEDAD_DONE_DOWNLOADING_KEYS, user_data) return doc def _put_secrets_in_shared_db(self): @@ -462,13 +465,14 @@ class SoledadSecrets(object): # fill doc with encrypted secrets doc.content = self._export_recovery_document() # upload secrets to server - events.emit_async(events.SOLEDAD_UPLOADING_KEYS, self._uuid) + user_data = self._get_user_data() + events.emit_async(events.SOLEDAD_UPLOADING_KEYS, user_data) db = self._shared_db if not db: logger.warning('No shared db found') return db.put_doc(doc) - events.emit_async(events.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_DONE_UPLOADING_KEYS, user_data) # # Management of secret for symmetric encryption. @@ -588,13 +592,14 @@ class SoledadSecrets(object): :return: The id of the generated secret. :rtype: str """ - events.emit_async(events.SOLEDAD_CREATING_KEYS, self._uuid) + user_data = self._get_user_data() + events.emit_async(events.SOLEDAD_CREATING_KEYS, user_data) # generate random secret secret = os.urandom(self.GEN_SECRET_LENGTH) secret_id = sha256(secret).hexdigest() self._secrets[secret_id] = secret self._store_secrets() - events.emit_async(events.SOLEDAD_DONE_CREATING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_DONE_CREATING_KEYS, user_data) return secret_id def _store_secrets(self): @@ -738,3 +743,6 @@ class SoledadSecrets(object): salt=self._get_sync_db_salt(), buflen=32, # we need a key with 256 bits (32 bytes) ) + + def _get_user_data(self): + return {'uuid': self._uuid, 'userid': self._userid} diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 2276db2a..1879031f 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -22,7 +22,7 @@ import logging from twisted.internet import defer from u1db import errors -from leap.soledad.common.errors import MissingDesignDocError +from leap.soledad.common.errors import BackendNotReadyError from u1db.sync import Synchronizer @@ -74,7 +74,7 @@ class SoledadSynchronizer(Synchronizer): (self.target_replica_uid, target_gen, target_trans_id, target_my_gen, target_my_trans_id) = yield \ sync_target.get_sync_info(self.source._replica_uid) - except (errors.DatabaseDoesNotExist, MissingDesignDocError) as e: + except (errors.DatabaseDoesNotExist, BackendNotReadyError) as e: logger.debug("Database isn't ready on server. Will be created.") logger.debug("Reason: %s", e.__class__) self.target_replica_uid = None |