From 129467d00e57f6cce34a8a4dc8b8b0e4a9b5e6e9 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 26 Apr 2016 00:48:25 -0300 Subject: [refactor] remove shared db locking from client Shared db locking was used to avoid the case in which two different devices try to store/modify remotelly stored secrets at the same time. We want to avoid remote locks because of the problems they create, and prefer to crash locally. For the record, we are currently using the user's password to encrypt the secrets stored in the server, and while we continue to do this we will have to re-encrypt the secrets and update the remote storage whenever the user changes her password. --- client/src/leap/soledad/client/secrets.py | 122 +++++++++++----------------- client/src/leap/soledad/client/shared_db.py | 30 ------- 2 files changed, 49 insertions(+), 103 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index e2a5a1d7..a72aac0d 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -33,7 +33,6 @@ from hashlib import sha256 from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type from leap.soledad.common import document -from leap.soledad.common import errors from leap.soledad.client import events from leap.soledad.client.crypto import encrypt_sym, decrypt_sym @@ -81,6 +80,7 @@ class BootstrapSequenceError(SecretsException): # Secrets handler # + class SoledadSecrets(object): """ @@ -162,17 +162,12 @@ class SoledadSecrets(object): :param shared_db: The shared database that stores user secrets. :type shared_db: leap.soledad.client.shared_db.SoledadSharedDatabase """ - # XXX removed since not in use - # We will pick the first secret available. - # param secret_id: The id of the storage secret to be used. - self._uuid = uuid self._userid = userid self._passphrase = passphrase self._secrets_path = secrets_path self._shared_db = shared_db self._secrets = {} - self._secret_id = None def bootstrap(self): @@ -197,47 +192,19 @@ class SoledadSecrets(object): # STAGE 1 - verify if secrets exist locally if not self._has_secret(): # try to load from local storage. - # STAGE 2 - there are no secrets in local storage, so try to fetch - # encrypted secrets from server. - logger.info( - 'Trying to fetch cryptographic secrets from shared recovery ' - 'database...') - - # --- start of atomic operation in shared db --- - - # obtain lock on shared db - token = timeout = None - try: - token, timeout = self._shared_db.lock() - except errors.AlreadyLockedError: - raise BootstrapSequenceError('Database is already locked.') - except errors.LockTimedOutError: - raise BootstrapSequenceError('Lock operation timed out.') + # STAGE 2 - there are no secrets in local storage and this is the + # first time we are running soledad with the specified + # secrets_path. Try to fetch encrypted secrets from + # server. + self._download_crypto_secrets() - self._get_or_gen_crypto_secrets() + if not self._has_secret(): - # release the lock on shared db - try: - self._shared_db.unlock(token) - self._shared_db.close() - except errors.NotLockedError: - # for some reason the lock expired. Despite that, secret - # loading or generation/storage must have been executed - # successfully, so we pass. - pass - except errors.InvalidTokenError: - # here, our lock has not only expired but also some other - # client application has obtained a new lock and is currently - # doing its thing in the shared database. Using the same - # reasoning as above, we assume everything went smooth and - # pass. - pass - except Exception as e: - logger.error("Unhandled exception when unlocking shared " - "database.") - logger.exception(e) - - # --- end of atomic operation in shared db --- + # STAGE 3 - there are no secrets in server also, so we want to + # generate the secrets and store them in the remote + # db. + self._gen_crypto_secrets() + self._upload_crypto_secrets() def _has_secret(self): """ @@ -295,13 +262,14 @@ class SoledadSecrets(object): self._store_secrets() self._put_secrets_in_shared_db() - def _get_or_gen_crypto_secrets(self): + def _download_crypto_secrets(self): """ - Retrieves or generates the crypto secrets. - - :raises BootstrapSequenceError: Raised when unable to store secrets in - shared database. + Downloads the crypto secrets. """ + logger.info( + 'Trying to fetch cryptographic secrets from shared recovery ' + 'database...') + if self._shared_db.syncable: doc = self._get_secrets_from_shared_db() else: @@ -314,31 +282,39 @@ class SoledadSecrets(object): _, active_secret = self._import_recovery_document(doc.content) self._maybe_set_active_secret(active_secret) self._store_secrets() # save new secrets in local file - else: - # STAGE 3 - there are no secrets in server also, so - # generate a secret and store it in remote db. - logger.info( - 'No cryptographic secrets found, creating new ' - ' secrets...') - self.set_secret_id(self._gen_secret()) - if self._shared_db.syncable: + def _gen_crypto_secrets(self): + """ + Generate the crypto secrets. + """ + logger.info('No cryptographic secrets found, creating new secrets...') + secret_id = self._gen_secret() + self.set_secret_id(secret_id) + + def _upload_crypto_secrets(self): + """ + Send crypto secrets to shared db. + + :raises BootstrapSequenceError: Raised when unable to store secrets in + shared database. + """ + if self._shared_db.syncable: + try: + self._put_secrets_in_shared_db() + except Exception as ex: + # storing generated secret in shared db failed for + # some reason, so we erase the generated secret and + # raise. try: - self._put_secrets_in_shared_db() - except Exception as ex: - # storing generated secret in shared db failed for - # some reason, so we erase the generated secret and - # raise. - try: - os.unlink(self._secrets_path) - except OSError as e: - if e.errno != errno.ENOENT: - # no such file or directory - logger.exception(e) - logger.exception(ex) - raise BootstrapSequenceError( - 'Could not store generated secret in the shared ' - 'database, bailing out...') + os.unlink(self._secrets_path) + except OSError as e: + if e.errno != errno.ENOENT: + # no such file or directory + logger.exception(e) + logger.exception(ex) + raise BootstrapSequenceError( + 'Could not store generated secret in the shared ' + 'database, bailing out...') # # Shared DB related methods diff --git a/client/src/leap/soledad/client/shared_db.py b/client/src/leap/soledad/client/shared_db.py index 6abf8ea3..a1d95fbe 100644 --- a/client/src/leap/soledad/client/shared_db.py +++ b/client/src/leap/soledad/client/shared_db.py @@ -151,33 +151,3 @@ class SoledadSharedDatabase(http_database.HTTPDatabase, TokenBasedAuth): http_database.HTTPDatabase.__init__(self, url, document_factory, creds) self._uuid = uuid - - def lock(self): - """ - Obtain a lock on document with id C{doc_id}. - - :return: A tuple containing the token to unlock and the timeout until - lock expiration. - :rtype: (str, float) - - :raise HTTPError: Raised if any HTTP error occurs. - """ - if self.syncable: - res, headers = self._request_json( - 'PUT', ['lock', self._uuid], body={}) - return res['token'], res['timeout'] - else: - return None, None - - def unlock(self, token): - """ - Release the lock on shared database. - - :param token: The token returned by a previous call to lock(). - :type token: str - - :raise HTTPError: - """ - if self.syncable: - _, _ = self._request_json( - 'DELETE', ['lock', self._uuid], params={'token': token}) -- cgit v1.2.3 From c634874aeeb4a9950e77ed28c8b8e643246e6bbd Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 26 Apr 2016 09:58:36 -0300 Subject: [refactor] cleanup bootstrap process --- client/src/leap/soledad/client/secrets.py | 89 ++++++++++++++++--------------- 1 file changed, 45 insertions(+), 44 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index a72aac0d..16487572 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -190,21 +190,33 @@ class SoledadSecrets(object): storage on server sequence has failed for some reason. """ # STAGE 1 - verify if secrets exist locally - if not self._has_secret(): # try to load from local storage. - - # STAGE 2 - there are no secrets in local storage and this is the - # first time we are running soledad with the specified - # secrets_path. Try to fetch encrypted secrets from - # server. + try: + logger.info("Trying to load secrets from local storage...") + self._load_secrets_from_local_file() + logger.info("Found secrets in local storage.") + return + except NoStorageSecret: + logger.info("Could not find secrets in local storage.") + + # STAGE 2 - there are no secrets in local storage and this is the + # first time we are running soledad with the specified + # secrets_path. Try to fetch encrypted secrets from + # server. + try: + logger.info('Trying to fetch secrets from remote storage...') self._download_crypto_secrets() + logger.info('Found secrets in remote storage.') + return + except NoStorageSecret: + logger.info("Could not find secrets in remote storage.") - if not self._has_secret(): - - # STAGE 3 - there are no secrets in server also, so we want to - # generate the secrets and store them in the remote - # db. - self._gen_crypto_secrets() - self._upload_crypto_secrets() + # STAGE 3 - there are no secrets in server also, so we want to + # generate the secrets and store them in the remote + # db. + logger.info("Generating secrets...") + self._gen_crypto_secrets() + logger.info("Uploading secrets...") + self._upload_crypto_secrets() def _has_secret(self): """ @@ -213,21 +225,7 @@ class SoledadSecrets(object): :return: Whether there's a storage secret for symmetric encryption. :rtype: bool """ - logger.info("Checking if there's a secret in local storage...") - if (self._secret_id is None or self._secret_id not in self._secrets) \ - and os.path.isfile(self._secrets_path): - try: - self._load_secrets() # try to load from disk - except IOError as e: - logger.warning( - 'IOError while loading secrets from disk: %s' % str(e)) - - if self.storage_secret is not None: - logger.info("Found a secret in local storage.") - return True - - logger.info("Could not find a secret in local storage.") - return False + return self.storage_secret is not None def _maybe_set_active_secret(self, active_secret): """ @@ -239,10 +237,16 @@ class SoledadSecrets(object): active_secret = self._secrets.items()[0][0] self.set_secret_id(active_secret) - def _load_secrets(self): + def _load_secrets_from_local_file(self): """ Load storage secrets from local file. + :raise NoStorageSecret: Raised if there are no secrets available in + local storage. """ + # check if secrets file exists and we can read it + if not os.path.isfile(self._secrets_path): + raise NoStorageSecret + # read storage secrets from file content = None with open(self._secrets_path, 'r') as f: @@ -264,24 +268,21 @@ class SoledadSecrets(object): def _download_crypto_secrets(self): """ - Downloads the crypto secrets. - """ - logger.info( - 'Trying to fetch cryptographic secrets from shared recovery ' - 'database...') + Download crypto secrets. + :raise NoStorageSecret: Raised if there are no secrets available in + remote storage. + """ + doc = None if self._shared_db.syncable: doc = self._get_secrets_from_shared_db() - else: - doc = None - - if doc is not None: - logger.info( - 'Found cryptographic secrets in shared recovery ' - 'database.') - _, active_secret = self._import_recovery_document(doc.content) - self._maybe_set_active_secret(active_secret) - self._store_secrets() # save new secrets in local file + + if doc is None: + raise NoStorageSecret + + _, active_secret = self._import_recovery_document(doc.content) + self._maybe_set_active_secret(active_secret) + self._store_secrets() # save new secrets in local file def _gen_crypto_secrets(self): """ -- cgit v1.2.3 From af692e04c62b374c197d3ff45935ece5a7e100c1 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 26 Apr 2016 09:59:31 -0300 Subject: [refactor] remove old code for enlarging secrets --- client/src/leap/soledad/client/secrets.py | 12 ------------ 1 file changed, 12 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index 16487572..714b2dfe 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -253,18 +253,6 @@ class SoledadSecrets(object): content = json.loads(f.read()) _, active_secret = self._import_recovery_document(content) self._maybe_set_active_secret(active_secret) - # enlarge secret if needed - enlarged = False - if len(self._secrets[self._secret_id]) < self.GEN_SECRET_LENGTH: - gen_len = self.GEN_SECRET_LENGTH \ - - len(self._secrets[self._secret_id]) - new_piece = os.urandom(gen_len) - self._secrets[self._secret_id] += new_piece - enlarged = True - # store and save in shared db if needed - if enlarged: - self._store_secrets() - self._put_secrets_in_shared_db() def _download_crypto_secrets(self): """ -- cgit v1.2.3 From 96e27b6d258562d0e83696cefb1d11c60a31acf2 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 26 Apr 2016 22:43:27 -0300 Subject: [refactor] add changes file for shared db lock removal --- client/changes/next-changelog.rst | 1 + 1 file changed, 1 insertion(+) (limited to 'client') diff --git a/client/changes/next-changelog.rst b/client/changes/next-changelog.rst index bdc9f893..6e97386c 100644 --- a/client/changes/next-changelog.rst +++ b/client/changes/next-changelog.rst @@ -20,6 +20,7 @@ Bugfixes Misc ~~~~ +- Refactor bootstrap to remove shared db lock. - `#1236 `_: Description of the new feature corresponding with issue #1236. - Some change without issue number. -- cgit v1.2.3 From ff5bd3f8e1cedd01119cd02b395c2bdbfa377c71 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 16 May 2016 15:49:18 -0400 Subject: [style] pep8 --- client/src/leap/soledad/client/crypto.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 363d71b9..b75d4301 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -26,7 +26,8 @@ import logging from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends.multibackend import MultiBackend -from cryptography.hazmat.backends.openssl.backend import Backend as OpenSSLBackend +from cryptography.hazmat.backends.openssl.backend \ + import Backend as OpenSSLBackend from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type -- cgit v1.2.3 From 21dbbc534be2c4668011cc9e631a7e4ba24061fa Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 18 May 2016 12:04:59 -0400 Subject: [pkg] update to new versioneer json format --- client/setup.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) (limited to 'client') diff --git a/client/setup.py b/client/setup.py index 4480e247..90986dde 100644 --- a/client/setup.py +++ b/client/setup.py @@ -68,14 +68,20 @@ class freeze_debianver(Command): # unpacked source archive. Distribution tarballs contain a pre-generated copy # of this file. -version_version = '{version}' -full_revisionid = '{full_revisionid}' -""" - templatefun = r""" - -def get_versions(default={}, verbose=False): - return {'version': version_version, - 'full-revisionid': full_revisionid} +import json +import sys + +version_json = ''' +{ + "dirty": false, + "error": null, + "full-revisionid": "FULL_REVISIONID", + "version": "VERSION_STRING" +} +''' # END VERSION_JSON + +def get_versions(): + return json.loads(version_json) """ def initialize_options(self): @@ -90,9 +96,9 @@ def get_versions(default={}, verbose=False): if proceed != "y": print("He. You scared. Aborting.") return - subst_template = self.template.format( - version=VERSION_SHORT, - full_revisionid=VERSION_REVISION) + self.templatefun + subst_template = self.template.replace( + 'VERSION_STRING', VERSION_SHORT).replace( + 'FULL_REVISIONID', VERSION_REVISION) versioneer_cfg = versioneer.get_config_from_root('.') with open(versioneer_cfg.versionfile_source, 'w') as f: f.write(subst_template) -- cgit v1.2.3 From 66e3572959774449d4efca5b72efe41af54075e7 Mon Sep 17 00:00:00 2001 From: Caio Carrara Date: Thu, 14 Apr 2016 22:12:53 -0300 Subject: [refactor] remove user_id argument from Soledad init The constructor method of Soledad was receiving two arguments for user id. One of them was optional with None as default. It could cause an inconsistent state with uuid set but userid unset. This change remove the optional user_id argument from initialization method and return the uuid if anyone call Soledad.userid method. --- client/src/leap/soledad/client/api.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index e657c939..2477350e 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -126,8 +126,7 @@ class Soledad(object): def __init__(self, uuid, passphrase, secrets_path, local_db_path, server_url, cert_file, shared_db=None, - auth_token=None, defer_encryption=False, syncable=True, - userid=None): + auth_token=None, defer_encryption=False, syncable=True): """ Initialize configuration, cryptographic keys and dbs. @@ -181,7 +180,6 @@ class Soledad(object): """ # store config params self._uuid = uuid - self._userid = userid self._passphrase = passphrase self._local_db_path = local_db_path self._server_url = server_url @@ -255,7 +253,7 @@ class Soledad(object): """ self._secrets = SoledadSecrets( self.uuid, self._passphrase, self._secrets_path, - self.shared_db, userid=self._userid) + self.shared_db, userid=self.userid) self._secrets.bootstrap() def _init_u1db_sqlcipher_backend(self): @@ -655,7 +653,7 @@ class Soledad(object): @property def userid(self): - return self._userid + return self.uuid # # ISyncableStorage -- cgit v1.2.3 From 06f3c80e848b14d3fff1a6edd2cd58f998b976db Mon Sep 17 00:00:00 2001 From: drebs Date: Sun, 1 May 2016 13:34:33 -0300 Subject: [bug] remove doc content conversion to unicode Theoretically (until now), Soledad inherits from U1DB the behaviour of only accepting valid JSON for documents contents. JSON documents only allow for unicode strings. Despite that, until now we had implemented lossy convertion to unicode to avoid encoding errors when dumping/loading JSON content. This allowed for API users to pass non-unicode to Soledad, but caused the application to take more time because of conversion. There were 2 problem with this: (1) conversion may take a long time and a lot of memory when convertin large payloads; and (2) conversion was being made before deferring to the adbapi, and this was blocking the reactor. This commit completelly removes the conversion to unicode, thus leaving the responsibility of unicode conversion to users of the Soledad API. --- client/changes/next-changelog.rst | 2 ++ client/src/leap/soledad/client/api.py | 46 +---------------------------------- 2 files changed, 3 insertions(+), 45 deletions(-) (limited to 'client') diff --git a/client/changes/next-changelog.rst b/client/changes/next-changelog.rst index 6e97386c..050d84be 100644 --- a/client/changes/next-changelog.rst +++ b/client/changes/next-changelog.rst @@ -16,6 +16,8 @@ Features Bugfixes ~~~~~~~~ - `#1235 `_: Description for the fixed stuff corresponding with issue #1235. +- Remove document content conversion to unicode. Users of API are responsible + for only passing valid JSON to Soledad for storage. - Bugfix without related issue number. Misc diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 2477350e..d83291e7 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -35,10 +35,6 @@ import ssl import uuid import urlparse -try: - import cchardet as chardet -except ImportError: - import chardet from itertools import chain from StringIO import StringIO @@ -357,7 +353,6 @@ class Soledad(object): also be updated. :rtype: twisted.internet.defer.Deferred """ - doc.content = _convert_to_unicode(doc.content) return self._defer("put_doc", doc) def delete_doc(self, doc): @@ -452,8 +447,7 @@ class Soledad(object): # create_doc (and probably to put_doc too). There are cases (mail # payloads for example) in which we already have the encoding in the # headers, so we don't need to guess it. - return self._defer( - "create_doc", _convert_to_unicode(content), doc_id=doc_id) + return self._defer("create_doc", content, doc_id=doc_id) def create_doc_from_json(self, json, doc_id=None): """ @@ -974,44 +968,6 @@ class Soledad(object): return self.create_doc(doc) -def _convert_to_unicode(content): - """ - Convert content to unicode (or all the strings in content). - - NOTE: Even though this method supports any type, it will - currently ignore contents of lists, tuple or any other - iterable than dict. We don't need support for these at the - moment - - :param content: content to convert - :type content: object - - :rtype: object - """ - # Chardet doesn't guess very well with some smallish payloads. - # This parameter might need some empirical tweaking. - CUTOFF_CONFIDENCE = 0.90 - - if isinstance(content, unicode): - return content - elif isinstance(content, str): - encoding = "utf-8" - result = chardet.detect(content) - if result["confidence"] > CUTOFF_CONFIDENCE: - encoding = result["encoding"] - try: - content = content.decode(encoding) - except UnicodeError as e: - logger.error("Unicode error: {0!r}. Using 'replace'".format(e)) - content = content.decode(encoding, 'replace') - return content - else: - if isinstance(content, dict): - for key in content.keys(): - content[key] = _convert_to_unicode(content[key]) - return content - - def create_path_if_not_exists(path): try: if not os.path.isdir(path): -- cgit v1.2.3 From 951593776e6dabdeef69b4138e4cc3d789e6295f Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Sun, 1 May 2016 13:58:28 -0400 Subject: [feature] use deferred semaphore --- client/changes/next-changelog.rst | 1 + client/src/leap/soledad/client/adbapi.py | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'client') diff --git a/client/changes/next-changelog.rst b/client/changes/next-changelog.rst index 050d84be..a696fe10 100644 --- a/client/changes/next-changelog.rst +++ b/client/changes/next-changelog.rst @@ -10,6 +10,7 @@ I've added a new category `Misc` so we can track doc/style/packaging stuff. Features ~~~~~~~~ +- Use DeferredLock instead of its locking cousin. - `#1234 `_: Description of the new feature corresponding with issue #1234. - New feature without related issue number. diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 77822247..f43e8110 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -24,9 +24,9 @@ import sys import logging from functools import partial -from threading import BoundedSemaphore from twisted.enterprise import adbapi +from twisted.internet.defer import DeferredSemaphore from twisted.python import log from zope.proxy import ProxyBase, setProxiedObject from pysqlcipher.dbapi2 import OperationalError @@ -204,7 +204,7 @@ class U1DBConnectionPool(adbapi.ConnectionPool): :rtype: twisted.internet.defer.Deferred """ meth = "u1db_%s" % meth - semaphore = BoundedSemaphore(SQLCIPHER_MAX_RETRIES - 1) + semaphore = DeferredSemaphore(SQLCIPHER_MAX_RETRIES ) def _run_interaction(): return self.runInteraction( @@ -213,7 +213,7 @@ class U1DBConnectionPool(adbapi.ConnectionPool): def _errback(failure): failure.trap(OperationalError) if failure.getErrorMessage() == "database is locked": - should_retry = semaphore.acquire(False) + should_retry = semaphore.acquire() if should_retry: logger.warning( "Database operation timed out while waiting for " -- cgit v1.2.3 From 50f90874815a85eb82e0fba2d953b680ee9eeb53 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 3 Feb 2016 11:01:23 -0300 Subject: [refactor] bye multiprocessing pool This commit removes the multiprocessing pool and gives a step closer to make encdecpool simpler. Download speed is now at a constant rate, CPU usage lower and reactor responding fast when running with a HTTP server like Pixelated. --- client/src/leap/soledad/client/encdecpool.py | 117 ++------------------- .../src/leap/soledad/client/http_target/fetch.py | 9 +- 2 files changed, 14 insertions(+), 112 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 34667a1e..576b8b2c 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -22,8 +22,6 @@ during synchronization. """ -import multiprocessing -import Queue import json import logging @@ -51,9 +49,6 @@ class SyncEncryptDecryptPool(object): Base class for encrypter/decrypter pools. """ - # TODO implement throttling to reduce cpu usage?? - WORKERS = multiprocessing.cpu_count() - def __init__(self, crypto, sync_db): """ Initialize the pool of encryption-workers. @@ -66,21 +61,18 @@ class SyncEncryptDecryptPool(object): """ self._crypto = crypto self._sync_db = sync_db - self._pool = None self._delayed_call = None self._started = False def start(self): if self.running: return - self._create_pool() self._started = True def stop(self): if not self.running: return self._started = False - self._destroy_pool() # maybe cancel the next delayed call if self._delayed_call \ and not self._delayed_call.called: @@ -90,27 +82,6 @@ class SyncEncryptDecryptPool(object): def running(self): return self._started - def _create_pool(self): - self._pool = multiprocessing.Pool(self.WORKERS) - - def _destroy_pool(self): - """ - Cleanly close the pool of workers. - """ - logger.debug("Closing %s" % (self.__class__.__name__,)) - self._pool.close() - try: - self._pool.join() - except Exception: - pass - - def terminate(self): - """ - Terminate the pool of workers. - """ - logger.debug("Terminating %s" % (self.__class__.__name__,)) - self._pool.terminate() - def _runOperation(self, query, *args): """ Run an operation on the sync db. @@ -180,7 +151,6 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): Initialize the sync encrypter pool. """ SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self._encr_queue = defer.DeferredQueue() # TODO delete already synced files from database def start(self): @@ -189,19 +159,11 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): """ SyncEncryptDecryptPool.start(self) logger.debug("Starting the encryption loop...") - reactor.callWhenRunning(self._maybe_encrypt_and_recurse) def stop(self): """ Stop the encrypter pool. """ - # close the sync queue - if self._encr_queue: - q = self._encr_queue - for d in q.pending: - d.cancel() - del q - self._encr_queue = None SyncEncryptDecryptPool.stop(self) @@ -212,29 +174,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :param doc: The document to be encrypted. :type doc: SoledadDocument """ - try: - self._encr_queue.put(doc) - except Queue.Full: - # do not asynchronously encrypt this file if the queue is full - pass - - @defer.inlineCallbacks - def _maybe_encrypt_and_recurse(self): - """ - Process one document from the encryption queue. - - Asynchronously encrypt a document that will then be stored in the sync - db. Processed documents will be read by the SoledadSyncTarget during - the sync_exchange. - """ - try: - while self.running: - doc = yield self._encr_queue.get() - self._encrypt_doc(doc) - except defer.QueueUnderflow: - self._delayed_call = reactor.callLater( - self.ENCRYPT_LOOP_PERIOD, - self._maybe_encrypt_and_recurse) + self._encrypt_doc(doc) def _encrypt_doc(self, doc): """ @@ -253,9 +193,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): secret = self._crypto.secret args = doc.doc_id, doc.rev, docstr, key, secret # encrypt asynchronously - self._pool.apply_async( - encrypt_doc_task, args, - callback=self._encrypt_doc_cb) + d = threads.deferToThread( + encrypt_doc_task, *args) + d.addCallback(self._encrypt_doc_cb) def _encrypt_doc_cb(self, result): """ @@ -354,6 +294,7 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, :return: A tuple containing the doc id, revision and encrypted content. :rtype: tuple(str, str, str) """ + content = json.loads(content) if type(content) == str else content decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) return doc_id, doc_rev, decrypted_content, gen, trans_id, idx @@ -414,7 +355,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._docs_to_process = None self._processed_docs = 0 self._last_inserted_idx = 0 - self._decrypting_docs = [] # a list that holds the asynchronous decryption results so they can be # collected when they are ready @@ -511,11 +451,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): has finished. :rtype: twisted.internet.defer.Deferred """ - docstr = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ - % self.TABLE_NAME - return self._runOperation( - query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx)) + return self._async_decrypt_doc(doc_id, doc_rev, content, gen, trans_id, idx) def insert_received_doc( self, doc_id, doc_rev, content, gen, trans_id, idx): @@ -585,14 +521,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ soledad_assert(self._crypto is not None, "need a crypto object") - content = json.loads(content) key = self._crypto.doc_passphrase(doc_id) secret = self._crypto.secret args = doc_id, rev, content, gen, trans_id, key, secret, idx # decrypt asynchronously - self._async_results.append( - self._pool.apply_async( - decrypt_doc_task, args)) + d = threads.deferToThread(decrypt_doc_task, *args) + d.addCallback(self._decrypt_doc_cb) def _decrypt_doc_cb(self, result): """ @@ -610,7 +544,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): doc_id, rev, content, gen, trans_id, idx = result logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" % (doc_id, rev, gen, trans_id)) - self._decrypting_docs.remove((doc_id, rev)) return self.insert_received_doc( doc_id, rev, content, gen, trans_id, idx) @@ -659,23 +592,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): last_idx += 1 defer.returnValue(insertable) - @defer.inlineCallbacks - def _async_decrypt_received_docs(self): - """ - Get all the encrypted documents from the sync database and dispatch a - decrypt worker to decrypt each one of them. - - :return: A deferred that will fire after all documents have been - decrypted and inserted back in the sync db. - :rtype: twisted.internet.defer.Deferred - """ - docs = yield self._get_docs(encrypted=True) - for doc_id, rev, content, gen, trans_id, _, idx in docs: - if (doc_id, rev) not in self._decrypting_docs: - self._decrypting_docs.append((doc_id, rev)) - self._async_decrypt_doc( - doc_id, rev, content, gen, trans_id, idx) - @defer.inlineCallbacks def _process_decrypted_docs(self): """ @@ -762,21 +678,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) return self._runOperation(query) - @defer.inlineCallbacks - def _collect_async_decryption_results(self): - """ - Collect the results of the asynchronous doc decryptions and re-raise - any exception raised by a multiprocessing async decryption call. - - :raise Exception: Raised if an async call has raised an exception. - """ - async_results = self._async_results[:] - for res in async_results: - if res.ready(): - # XXX: might raise an exception! - yield self._decrypt_doc_cb(res.get()) - self._async_results.remove(res) - @defer.inlineCallbacks def _decrypt_and_recurse(self): """ @@ -796,8 +697,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): pending = self._docs_to_process if processed < pending: - yield self._async_decrypt_received_docs() - yield self._collect_async_decryption_results() docs = yield self._process_decrypted_docs() yield self._delete_processed_docs(docs) # recurse diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 9f7a4193..fda90909 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -19,6 +19,7 @@ import json from u1db import errors from u1db.remote import utils from twisted.internet import defer +from twisted.internet import threads from leap.soledad.common.document import SoledadDocument from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import emit_async @@ -75,7 +76,7 @@ class HTTPDocFetcher(object): last_known_generation, last_known_trans_id, sync_id, 0) self._received_docs = 0 - number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) + number_of_changes, ngen, ntrans = yield self._insert_received_doc(doc, 1, 1) if ngen: new_generation = ngen @@ -137,6 +138,7 @@ class HTTPDocFetcher(object): body=str(body), content_type='application/x-soledad-sync-get') + @defer.inlineCallbacks def _insert_received_doc(self, response, idx, total): """ Insert a received document into the local replica. @@ -150,7 +152,8 @@ class HTTPDocFetcher(object): """ new_generation, new_transaction_id, number_of_changes, doc_id, \ rev, content, gen, trans_id = \ - self._parse_received_doc_response(response) + (yield threads.deferToThread(self._parse_received_doc_response, + response)) if doc_id is not None: # decrypt incoming document and insert into local database # ------------------------------------------------------------- @@ -185,7 +188,7 @@ class HTTPDocFetcher(object): self._received_docs += 1 user_data = {'uuid': self.uuid, 'userid': self.userid} _emit_receive_status(user_data, self._received_docs, total) - return number_of_changes, new_generation, new_transaction_id + defer.returnValue((number_of_changes, new_generation, new_transaction_id)) def _parse_received_doc_response(self, response): """ -- cgit v1.2.3 From 498e9e1353700b61950ef87c007e6c0a84fbe201 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 4 Feb 2016 13:33:49 -0300 Subject: [refactor] encdecpool queries and testing This commit adds tests for doc ordering and encdecpool control (start/stop). Also optimizes by deleting in batch and checking for a sequence in memory before asking the local staging for documents. --- client/changes/next-changelog.rst | 1 + client/src/leap/soledad/client/encdecpool.py | 183 ++++++++------------- .../src/leap/soledad/client/http_target/fetch.py | 9 +- client/src/leap/soledad/client/sqlcipher.py | 2 +- 4 files changed, 75 insertions(+), 120 deletions(-) (limited to 'client') diff --git a/client/changes/next-changelog.rst b/client/changes/next-changelog.rst index a696fe10..c676625f 100644 --- a/client/changes/next-changelog.rst +++ b/client/changes/next-changelog.rst @@ -26,6 +26,7 @@ Misc - Refactor bootstrap to remove shared db lock. - `#1236 `_: Description of the new feature corresponding with issue #1236. - Some change without issue number. +- Removed multiprocessing from encdecpool with some extra refactoring. Known Issues ~~~~~~~~~~~~ diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 576b8b2c..218ebfa9 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -25,7 +25,7 @@ during synchronization. import json import logging -from twisted.internet import reactor +from twisted.internet.task import LoopingCall from twisted.internet import threads from twisted.internet import defer from twisted.python import log @@ -167,26 +167,14 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): SyncEncryptDecryptPool.stop(self) - def enqueue_doc_for_encryption(self, doc): + def encrypt_doc(self, doc): """ - Enqueue a document for encryption. + Encrypt document asynchronously then insert it on + local staging database. :param doc: The document to be encrypted. :type doc: SoledadDocument """ - self._encrypt_doc(doc) - - def _encrypt_doc(self, doc): - """ - Symmetrically encrypt a document. - - :param doc: The document with contents to be encrypted. - :type doc: SoledadDocument - - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool - """ soledad_assert(self._crypto is not None, "need a crypto object") docstr = doc.get_json() key = self._crypto.doc_passphrase(doc.doc_id) @@ -276,8 +264,8 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, :type doc_id: str :param doc_rev: The document revision. :type doc_rev: str - :param content: The encrypted content of the document. - :type content: str + :param content: The encrypted content of the document as JSON dict. + :type content: dict :param gen: The generation corresponding to the modification of that document. :type gen: int @@ -294,7 +282,6 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, :return: A tuple containing the doc id, revision and encrypted content. :rtype: tuple(str, str, str) """ - content = json.loads(content) if type(content) == str else content decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) return doc_id, doc_rev, decrypted_content, gen, trans_id, idx @@ -356,14 +343,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._processed_docs = 0 self._last_inserted_idx = 0 - # a list that holds the asynchronous decryption results so they can be - # collected when they are ready - self._async_results = [] - # initialize db and make sure any database operation happens after # db initialization self._deferred_init = self._init_db() self._wait_init_db('_runOperation', '_runQuery') + self._loop = LoopingCall(self._decrypt_and_recurse) + self._decrypted_docs_indexes = set() def _wait_init_db(self, *methods): """ @@ -408,11 +393,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): SyncEncryptDecryptPool.start(self) self._docs_to_process = docs_to_process self._deferred = defer.Deferred() - reactor.callWhenRunning(self._launch_decrypt_and_recurse) + self._loop.start(self.DECRYPT_LOOP_PERIOD) - def _launch_decrypt_and_recurse(self): - d = self._decrypt_and_recurse() - d.addErrback(self._errback) + def stop(self): + if self._loop.running: + self._loop.stop() + self._finish() + SyncEncryptDecryptPool.stop(self) def _errback(self, failure): log.err(failure) @@ -431,8 +418,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): def insert_encrypted_received_doc( self, doc_id, doc_rev, content, gen, trans_id, idx): """ - Insert a received message with encrypted content, to be decrypted later - on. + Decrypt and insert a received document into local staging area to be + processed later on. :param doc_id: The document ID. :type doc_id: str @@ -447,11 +434,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :param idx: The index of this document in the current sync process. :type idx: int - :return: A deferred that will fire when the operation in the database - has finished. + :return: A deferred that will fire after the decrypted document has + been inserted in the sync db. :rtype: twisted.internet.defer.Deferred """ - return self._async_decrypt_doc(doc_id, doc_rev, content, gen, trans_id, idx) + soledad_assert(self._crypto is not None, "need a crypto object") + + key = self._crypto.doc_passphrase(doc_id) + secret = self._crypto.secret + args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx + # decrypt asynchronously + doc = decrypt_doc_task(*args) + # callback will insert it for later processing + return self._decrypt_doc_cb(doc) def insert_received_doc( self, doc_id, doc_rev, content, gen, trans_id, idx): @@ -481,52 +476,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): content = json.dumps(content) query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ % self.TABLE_NAME - return self._runOperation( + d = self._runOperation( query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) + d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx)) + return d - def _delete_received_doc(self, doc_id): + def _delete_received_docs(self, doc_ids): """ - Delete a received doc after it was inserted into the local db. + Delete a list of received docs after get them inserted into the db. - :param doc_id: Document ID. - :type doc_id: str + :param doc_id: Document ID list. + :type doc_id: list :return: A deferred that will fire when the operation in the database has finished. :rtype: twisted.internet.defer.Deferred """ - query = "DELETE FROM '%s' WHERE doc_id=?" \ - % self.TABLE_NAME - return self._runOperation(query, (doc_id,)) - - def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx): - """ - Dispatch an asynchronous document decrypting routine and save the - result object. - - :param doc_id: The ID for the document with contents to be encrypted. - :type doc: str - :param rev: The revision of the document. - :type rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - :param idx: The index of this document in the current sync process. - :type idx: int - """ - soledad_assert(self._crypto is not None, "need a crypto object") - - key = self._crypto.doc_passphrase(doc_id) - secret = self._crypto.secret - args = doc_id, rev, content, gen, trans_id, key, secret, idx - # decrypt asynchronously - d = threads.deferToThread(decrypt_doc_task, *args) - d.addCallback(self._decrypt_doc_cb) + placeholders = ', '.join('?' for _ in doc_ids) + query = "DELETE FROM '%s' WHERE doc_id in (%s)" \ + % (self.TABLE_NAME, placeholders) + return self._runOperation(query, (doc_ids)) def _decrypt_doc_cb(self, result): """ @@ -547,7 +516,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): return self.insert_received_doc( doc_id, rev, content, gen, trans_id, idx) - def _get_docs(self, encrypted=None, order_by='idx', order='ASC'): + def _get_docs(self, encrypted=None, order_by='idx', order='ASC', + sequence=None): """ Get documents from the received docs table in the sync db. @@ -565,8 +535,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ "idx FROM %s" % self.TABLE_NAME - if encrypted is not None: - query += " WHERE encrypted = %d" % int(encrypted) + if encrypted or sequence: + query += " WHERE" + if encrypted: + query += " encrypted = %d" % int(encrypted) + if sequence: + query += " idx in (" + ', '.join(sequence) + ")" + query += " ORDER BY %s %s" % (order_by, order) return self._runQuery(query) @@ -579,18 +554,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): documents. :rtype: twisted.internet.defer.Deferred """ - # here, we fetch the list of decrypted documents and compare with the - # index of the last succesfully processed document. - decrypted_docs = yield self._get_docs(encrypted=False) - insertable = [] - last_idx = self._last_inserted_idx - for doc_id, rev, content, gen, trans_id, encrypted, idx in \ - decrypted_docs: - if (idx != last_idx + 1): - break - insertable.append((doc_id, rev, content, gen, trans_id, idx)) - last_idx += 1 - defer.returnValue(insertable) + # Here, check in memory what are the insertable indexes that can + # form a sequence starting from the last inserted index + sequence = [] + insertable_docs = [] + next_index = self._last_inserted_idx + 1 + while next_index in self._decrypted_docs_indexes: + sequence.append(str(next_index)) + next_index += 1 + # Then fetch all the ones ready for insertion. + if sequence: + insertable_docs = yield self._get_docs(encrypted=False, + sequence=sequence) + defer.returnValue(insertable_docs) @defer.inlineCallbacks def _process_decrypted_docs(self): @@ -603,36 +579,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :rtype: twisted.internet.defer.Deferred """ insertable = yield self._get_insertable_docs() + processed_docs_ids = [] for doc_fields in insertable: method = self._insert_decrypted_local_doc # FIXME: This is used only because SQLCipherU1DBSync is synchronous # When adbapi is used there is no need for an external thread # Without this the reactor can freeze and fail docs download yield threads.deferToThread(method, *doc_fields) - defer.returnValue(insertable) - - def _delete_processed_docs(self, inserted): - """ - Delete from the sync db documents that have been processed. - - :param inserted: List of documents inserted in the previous process - step. - :type inserted: list - - :return: A list of deferreds that will fire when each operation in the - database has finished. - :rtype: twisted.internet.defer.DeferredList - """ - deferreds = [] - for doc_id, doc_rev, _, _, _, _ in inserted: - deferreds.append( - self._delete_received_doc(doc_id)) - if not deferreds: - return defer.succeed(None) - return defer.gatherResults(deferreds) + processed_docs_ids.append(doc_fields[0]) + yield self._delete_received_docs(processed_docs_ids) def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, - gen, trans_id, idx): + gen, trans_id, encrypted, idx): """ Insert the decrypted document into the local replica. @@ -693,20 +651,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): delete operations have been executed. :rtype: twisted.internet.defer.Deferred """ + if not self.running: + defer.returnValue(None) processed = self._processed_docs pending = self._docs_to_process if processed < pending: - docs = yield self._process_decrypted_docs() - yield self._delete_processed_docs(docs) - # recurse - self._delayed_call = reactor.callLater( - self.DECRYPT_LOOP_PERIOD, - self._launch_decrypt_and_recurse) + yield self._process_decrypted_docs() else: self._finish() def _finish(self): self._processed_docs = 0 self._last_inserted_idx = 0 - self._deferred.callback(None) + self._decrypted_docs_indexes = set() + if not self._deferred.called: + self._deferred.callback(None) diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index fda90909..9f7a4193 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -19,7 +19,6 @@ import json from u1db import errors from u1db.remote import utils from twisted.internet import defer -from twisted.internet import threads from leap.soledad.common.document import SoledadDocument from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import emit_async @@ -76,7 +75,7 @@ class HTTPDocFetcher(object): last_known_generation, last_known_trans_id, sync_id, 0) self._received_docs = 0 - number_of_changes, ngen, ntrans = yield self._insert_received_doc(doc, 1, 1) + number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) if ngen: new_generation = ngen @@ -138,7 +137,6 @@ class HTTPDocFetcher(object): body=str(body), content_type='application/x-soledad-sync-get') - @defer.inlineCallbacks def _insert_received_doc(self, response, idx, total): """ Insert a received document into the local replica. @@ -152,8 +150,7 @@ class HTTPDocFetcher(object): """ new_generation, new_transaction_id, number_of_changes, doc_id, \ rev, content, gen, trans_id = \ - (yield threads.deferToThread(self._parse_received_doc_response, - response)) + self._parse_received_doc_response(response) if doc_id is not None: # decrypt incoming document and insert into local database # ------------------------------------------------------------- @@ -188,7 +185,7 @@ class HTTPDocFetcher(object): self._received_docs += 1 user_data = {'uuid': self.uuid, 'userid': self.userid} _emit_receive_status(user_data, self._received_docs, total) - defer.returnValue((number_of_changes, new_generation, new_transaction_id)) + return number_of_changes, new_generation, new_transaction_id def _parse_received_doc_response(self, response): """ diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 22ddc87d..cdc7255c 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -278,7 +278,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) if self.defer_encryption: # TODO move to api? - self._sync_enc_pool.enqueue_doc_for_encryption(doc) + self._sync_enc_pool.encrypt_doc(doc) return doc_rev # -- cgit v1.2.3 From ebcf2a098fb8e9b1211e31b4955aa67cfebc5854 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 16 Feb 2016 12:36:43 -0300 Subject: [bug] delete all docs on start and ensure isolation Docs created from one failed sync would be there for the next one, possibly causing a lot of hard to find errors. This commit adds a sync_id field to track each sync documents isolated and cleans up the pool on start instead of constructor. --- client/src/leap/soledad/client/encdecpool.py | 96 +++++++--------------- .../src/leap/soledad/client/http_target/fetch.py | 7 +- 2 files changed, 35 insertions(+), 68 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 218ebfa9..7d646c51 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -24,6 +24,7 @@ during synchronization. import json import logging +from uuid import uuid4 from twisted.internet.task import LoopingCall from twisted.internet import threads @@ -65,13 +66,9 @@ class SyncEncryptDecryptPool(object): self._started = False def start(self): - if self.running: - return self._started = True def stop(self): - if not self.running: - return self._started = False # maybe cancel the next delayed call if self._delayed_call \ @@ -312,7 +309,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ TABLE_NAME = "docs_received" FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ - "trans_id, encrypted, idx" + "trans_id, encrypted, idx, sync_id" """ Period of recurrence of the periodic decrypting task, in seconds. @@ -343,42 +340,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._processed_docs = 0 self._last_inserted_idx = 0 - # initialize db and make sure any database operation happens after - # db initialization - self._deferred_init = self._init_db() - self._wait_init_db('_runOperation', '_runQuery') self._loop = LoopingCall(self._decrypt_and_recurse) - self._decrypted_docs_indexes = set() - - def _wait_init_db(self, *methods): - """ - Methods that need to wait for db initialization. - - :param methods: methods that need to wait for initialization - :type methods: tuple(str) - """ - self._waiting = [] - self._stored = {} - - def _restore(_): - for method in self._stored: - setattr(self, method, self._stored[method]) - for d in self._waiting: - d.callback(None) - - def _makeWrapper(method): - def wrapper(*args, **kw): - d = defer.Deferred() - d.addCallback(lambda _: self._stored[method](*args, **kw)) - self._waiting.append(d) - return d - return wrapper - - for method in methods: - self._stored[method] = getattr(self, method) - setattr(self, method, _makeWrapper(method)) - - self._deferred_init.addCallback(_restore) def start(self, docs_to_process): """ @@ -391,9 +353,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type docs_to_process: int """ SyncEncryptDecryptPool.start(self) + self._decrypted_docs_indexes = set() + self._sync_id = uuid4().hex self._docs_to_process = docs_to_process self._deferred = defer.Deferred() - self._loop.start(self.DECRYPT_LOOP_PERIOD) + d = self._init_db() + d.addCallback(lambda _: self._loop.start(self.DECRYPT_LOOP_PERIOD)) + return d def stop(self): if self._loop.running: @@ -401,6 +367,17 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._finish() SyncEncryptDecryptPool.stop(self) + def _init_db(self): + """ + Empty the received docs table of the sync database. + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,) + return self._runOperation(query, (self._sync_id,)) + def _errback(self, failure): log.err(failure) self._deferred.errback(failure) @@ -474,10 +451,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ if not isinstance(content, str): content = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \ % self.TABLE_NAME d = self._runOperation( - query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) + query, (doc_id, doc_rev, content, gen, trans_id, 0, + idx, self._sync_id)) d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx)) return d @@ -516,8 +494,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): return self.insert_received_doc( doc_id, rev, content, gen, trans_id, idx) - def _get_docs(self, encrypted=None, order_by='idx', order='ASC', - sequence=None): + def _get_docs(self, encrypted=None, sequence=None): """ Get documents from the received docs table in the sync db. @@ -525,9 +502,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): field equal to given parameter. :type encrypted: bool or None :param order_by: The name of the field to order results. - :type order_by: str - :param order: Whether the order should be ASC or DESC. - :type order: str :return: A deferred that will fire with the results of the database query. @@ -535,15 +509,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ "idx FROM %s" % self.TABLE_NAME + parameters = [] if encrypted or sequence: - query += " WHERE" + query += " WHERE sync_id = ? and" + parameters += [self._sync_id] if encrypted: - query += " encrypted = %d" % int(encrypted) + query += " encrypted = ?" + parameters += [int(encrypted)] if sequence: - query += " idx in (" + ', '.join(sequence) + ")" - - query += " ORDER BY %s %s" % (order_by, order) - return self._runQuery(query) + query += " idx in (" + ', '.join('?' * len(sequence)) + ")" + parameters += [int(i) for i in sequence] + query += " ORDER BY idx ASC" + return self._runQuery(query, parameters) @defer.inlineCallbacks def _get_insertable_docs(self): @@ -625,17 +602,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._last_inserted_idx = idx self._processed_docs += 1 - def _init_db(self): - """ - Empty the received docs table of the sync database. - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) - return self._runOperation(query) - @defer.inlineCallbacks def _decrypt_and_recurse(self): """ diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 9f7a4193..9801c3d9 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -81,9 +81,6 @@ class HTTPDocFetcher(object): new_generation = ngen new_transaction_id = ntrans - if defer_decryption: - self._sync_decr_pool.start(number_of_changes) - # --------------------------------------------------------------------- # maybe receive the rest of the documents # --------------------------------------------------------------------- @@ -151,6 +148,10 @@ class HTTPDocFetcher(object): new_generation, new_transaction_id, number_of_changes, doc_id, \ rev, content, gen, trans_id = \ self._parse_received_doc_response(response) + + if self._sync_decr_pool and not self._sync_decr_pool.running: + self._sync_decr_pool.start(number_of_changes) + if doc_id is not None: # decrypt incoming document and insert into local database # ------------------------------------------------------------- -- cgit v1.2.3 From f6a7cdded4285af2335263a058479fa158980b31 Mon Sep 17 00:00:00 2001 From: NavaL Date: Fri, 29 Apr 2016 18:47:13 +0200 Subject: [bug] ensures docs_received table has the sync_id column For the case where the user already has data synced, this commit will migrate the docs_received table to have the column sync_id. That is required by the refactoring in the previous commits. --- client/src/leap/soledad/client/encdecpool.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 7d646c51..e348f545 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -369,14 +369,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): def _init_db(self): """ + Ensure sync_id column is present then Empty the received docs table of the sync database. :return: A deferred that will fire when the operation in the database has finished. :rtype: twisted.internet.defer.Deferred """ - query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,) - return self._runOperation(query, (self._sync_id,)) + ensure_sync_id_column = "ALTER TABLE %s ADD COLUMN sync_id" % self.TABLE_NAME + d = self._runQuery(ensure_sync_id_column) + + def empty_received_docs(_): + query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,) + return self._runOperation(query, (self._sync_id,)) + + d.addCallbacks(empty_received_docs, empty_received_docs) + return d def _errback(self, failure): log.err(failure) -- cgit v1.2.3 From 86b74a3404c3dc98b422d348edc65848b381f5f1 Mon Sep 17 00:00:00 2001 From: drebs Date: Sun, 1 May 2016 19:57:42 -0300 Subject: [feature] add sync phase stats --- client/changes/next-changelog.rst | 2 ++ client/src/leap/soledad/client/api.py | 17 +++++++++ .../leap/soledad/client/http_target/__init__.py | 10 ++++++ client/src/leap/soledad/client/http_target/api.py | 22 ++++++++++++ client/src/leap/soledad/client/sqlcipher.py | 13 +++++++ client/src/leap/soledad/client/sync.py | 42 ++++++++++++++++++++++ 6 files changed, 106 insertions(+) (limited to 'client') diff --git a/client/changes/next-changelog.rst b/client/changes/next-changelog.rst index c676625f..cffe4954 100644 --- a/client/changes/next-changelog.rst +++ b/client/changes/next-changelog.rst @@ -23,6 +23,8 @@ Bugfixes Misc ~~~~ +- Add ability to get information about sync phases for profiling purposes. +- Add script for setting up develop environment. - Refactor bootstrap to remove shared db lock. - `#1236 `_: Description of the new feature corresponding with issue #1236. - Some change without issue number. diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index d83291e7..a1588aa9 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -62,6 +62,13 @@ from leap.soledad.client import encdecpool logger = logging.getLogger(name=__name__) + +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + # # Constants # @@ -297,6 +304,16 @@ class Soledad(object): sync_db=self._sync_db, sync_enc_pool=self._sync_enc_pool) + def sync_stats(self): + sync_phase = 0 + if getattr(self._dbsyncer, 'sync_phase', None): + sync_phase = self._dbsyncer.sync_phase[0] + sync_exchange_phase = 0 + if getattr(self._dbsyncer, 'syncer', None): + if getattr(self._dbsyncer.syncer, 'sync_exchange_phase', None): + sync_exchange_phase = self._dbsyncer.syncer.sync_exchange_phase[0] + return sync_phase, sync_exchange_phase + # # Closing methods # diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py index a16531ef..b7e54aa4 100644 --- a/client/src/leap/soledad/client/http_target/__init__.py +++ b/client/src/leap/soledad/client/http_target/__init__.py @@ -22,6 +22,7 @@ after receiving. """ +import os import logging from leap.common.http import HTTPClient @@ -33,6 +34,12 @@ from leap.soledad.client.http_target.fetch import HTTPDocFetcher logger = logging.getLogger(__name__) +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): """ @@ -93,3 +100,6 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): # the duplicated syncing bug. This could be reduced to the 30s default # after implementing Cancellable Sync. See #7382 self._http = HTTPClient(cert_file, timeout=90) + + if DO_STATS: + self.sync_exchange_phase = [0] diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index 94354092..b19ce9ce 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -14,6 +14,8 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import os +import time import json import base64 @@ -27,6 +29,12 @@ from leap.soledad.common.errors import InvalidAuthTokenError from leap.soledad.client.http_target.support import readBody +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + class SyncTargetAPI(SyncTarget): """ Declares public methods and implements u1db.SyncTarget. @@ -187,6 +195,10 @@ class SyncTargetAPI(SyncTarget): transaction id of the target replica. :rtype: twisted.internet.defer.Deferred """ + # ---------- phase 1: send docs to server ---------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- self._ensure_callback = ensure_callback @@ -203,6 +215,11 @@ class SyncTargetAPI(SyncTarget): last_known_trans_id, sync_id) + # ---------- phase 2: receive docs ----------------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- + cur_target_gen, cur_target_trans_id = yield self._receive_docs( last_known_generation, last_known_trans_id, ensure_callback, sync_id, @@ -214,6 +231,11 @@ class SyncTargetAPI(SyncTarget): cur_target_gen = gen_after_send cur_target_trans_id = trans_id_after_send + # ---------- phase 3: sync exchange is over -------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- + defer.returnValue([cur_target_gen, cur_target_trans_id]) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index cdc7255c..99f5dad8 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -72,6 +72,12 @@ logger = logging.getLogger(__name__) sqlite_backend.dbapi2 = sqlcipher_dbapi2 +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True): """ Initialize a SQLCipher database. @@ -453,6 +459,9 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self.shutdownID = None + if DO_STATS: + self.sync_phase = None + @property def _replica_uid(self): return str(self.__replica_uid) @@ -497,6 +506,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :rtype: Deferred """ syncer = self._get_syncer(url, creds=creds) + if DO_STATS: + self.sync_phase = syncer.sync_phase + self.syncer = syncer + self.sync_exchange_phase = syncer.sync_exchange_phase local_gen_before_sync = yield syncer.sync( defer_decryption=defer_decryption) self.received_docs = syncer.received_docs diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 1879031f..9cafe62f 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -17,6 +17,8 @@ """ Soledad synchronization utilities. """ +import os +import time import logging from twisted.internet import defer @@ -29,6 +31,12 @@ from u1db.sync import Synchronizer logger = logging.getLogger(__name__) +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + class SoledadSynchronizer(Synchronizer): """ Collect the state around synchronizing 2 U1DB replicas. @@ -42,6 +50,12 @@ class SoledadSynchronizer(Synchronizer): """ received_docs = [] + def __init__(self, *args, **kwargs): + Synchronizer.__init__(self, *args, **kwargs) + if DO_STATS: + self.sync_phase = [0] + self.sync_exchange_phase = None + @defer.inlineCallbacks def sync(self, defer_decryption=True): """ @@ -64,9 +78,16 @@ class SoledadSynchronizer(Synchronizer): the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred """ + sync_target = self.sync_target self.received_docs = [] + # ---------- phase 1: get sync info from server ---------------------- + if DO_STATS: + self.sync_phase[0] += 1 + self.sync_exchange_phase = self.sync_target.sync_exchange_phase + # -------------------------------------------------------------------- + # get target identifier, its current generation, # and its last-seen database generation for this source ensure_callback = None @@ -106,6 +127,11 @@ class SoledadSynchronizer(Synchronizer): self.source.validate_gen_and_trans_id( target_my_gen, target_my_trans_id) + # ---------- phase 2: what's changed --------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + # what's changed since that generation and this current gen my_gen, _, changes = self.source.whats_changed(target_my_gen) logger.debug("Soledad sync: there are %d documents to send." @@ -130,6 +156,11 @@ class SoledadSynchronizer(Synchronizer): raise errors.InvalidTransactionId defer.returnValue(my_gen) + # ---------- phase 3: sync exchange ---------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + # prepare to send all the changed docs changed_doc_ids = [doc_id for doc_id, _, _ in changes] docs_to_send = self.source.get_docs( @@ -162,6 +193,12 @@ class SoledadSynchronizer(Synchronizer): "my_gen": my_gen } self._syncing_info = info + + # ---------- phase 4: complete sync ---------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + yield self.complete_sync() _, _, changes = self.source.whats_changed(target_my_gen) @@ -170,6 +207,11 @@ class SoledadSynchronizer(Synchronizer): just_received = list(set(changed_doc_ids) - set(ids_sent)) self.received_docs = just_received + # ---------- phase 5: sync is over ----------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + defer.returnValue(my_gen) def complete_sync(self): -- cgit v1.2.3 From 7b468680cc2ad09cf836da0095330d941a5ea7b9 Mon Sep 17 00:00:00 2001 From: drebs Date: Sat, 7 May 2016 17:09:29 -0300 Subject: [feat] add recovery doc format version --- client/changes/next-changelog.rst | 1 + client/src/leap/soledad/client/secrets.py | 91 ++++++++++++++++++++++++++----- 2 files changed, 79 insertions(+), 13 deletions(-) (limited to 'client') diff --git a/client/changes/next-changelog.rst b/client/changes/next-changelog.rst index cffe4954..7ddb3a57 100644 --- a/client/changes/next-changelog.rst +++ b/client/changes/next-changelog.rst @@ -10,6 +10,7 @@ I've added a new category `Misc` so we can track doc/style/packaging stuff. Features ~~~~~~~~ +- Add recovery document format version for future migrations. - Use DeferredLock instead of its locking cousin. - `#1234 `_: Description of the new feature corresponding with issue #1234. - New feature without related issue number. diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index 714b2dfe..c35b881e 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -143,6 +143,8 @@ class SoledadSecrets(object): KDF_LENGTH_KEY = 'kdf_length' KDF_SCRYPT = 'scrypt' CIPHER_AES256 = 'aes256' + RECOVERY_DOC_VERSION_KEY = 'version' + RECOVERY_DOC_VERSION = 1 """ Keys used to access storage secrets in recovery documents. """ @@ -192,9 +194,15 @@ class SoledadSecrets(object): # STAGE 1 - verify if secrets exist locally try: logger.info("Trying to load secrets from local storage...") - self._load_secrets_from_local_file() + version = self._load_secrets_from_local_file() + # eventually migrate local and remote stored documents from old + # format version + if version < self.RECOVERY_DOC_VERSION: + self._store_secrets() + self._upload_crypto_secrets() logger.info("Found secrets in local storage.") return + except NoStorageSecret: logger.info("Could not find secrets in local storage.") @@ -204,7 +212,12 @@ class SoledadSecrets(object): # server. try: logger.info('Trying to fetch secrets from remote storage...') - self._download_crypto_secrets() + version = self._download_crypto_secrets() + self._store_secrets() + # eventually migrate remote stored document from old format + # version + if version < self.RECOVERY_DOC_VERSION: + self._upload_crypto_secrets() logger.info('Found secrets in remote storage.') return except NoStorageSecret: @@ -240,6 +253,9 @@ class SoledadSecrets(object): def _load_secrets_from_local_file(self): """ Load storage secrets from local file. + + :return version: The version of the locally stored recovery document. + :raise NoStorageSecret: Raised if there are no secrets available in local storage. """ @@ -251,13 +267,18 @@ class SoledadSecrets(object): content = None with open(self._secrets_path, 'r') as f: content = json.loads(f.read()) - _, active_secret = self._import_recovery_document(content) + _, active_secret, version = self._import_recovery_document(content) + self._maybe_set_active_secret(active_secret) + return version + def _download_crypto_secrets(self): """ Download crypto secrets. + :return version: The version of the remotelly stored recovery document. + :raise NoStorageSecret: Raised if there are no secrets available in remote storage. """ @@ -268,9 +289,10 @@ class SoledadSecrets(object): if doc is None: raise NoStorageSecret - _, active_secret = self._import_recovery_document(doc.content) + _, active_secret, version = self._import_recovery_document(doc.content) self._maybe_set_active_secret(active_secret) - self._store_secrets() # save new secrets in local file + + return version def _gen_crypto_secrets(self): """ @@ -325,7 +347,7 @@ class SoledadSecrets(object): """ Export the storage secrets. - A recovery document has the following structure: + Current format of recovery document has the following structure: { 'storage_secrets': { @@ -336,6 +358,7 @@ class SoledadSecrets(object): }, }, 'active_secret': '', + 'version': '', } Note that multiple storage secrets might be stored in one recovery @@ -353,13 +376,14 @@ class SoledadSecrets(object): data = { self.STORAGE_SECRETS_KEY: encrypted_secrets, self.ACTIVE_SECRET_KEY: self._secret_id, + self.RECOVERY_DOC_VERSION_KEY: self.RECOVERY_DOC_VERSION, } return data def _import_recovery_document(self, data): """ - Import storage secrets for symmetric encryption and uuid (if present) - from a recovery document. + Import storage secrets for symmetric encryption from a recovery + document. Note that this method does not store the imported data on disk. For that, use C{self._store_secrets()}. @@ -367,11 +391,44 @@ class SoledadSecrets(object): :param data: The recovery document. :type data: dict - :return: A tuple containing the number of imported secrets and the - secret_id of the last active secret. - :rtype: (int, str) + :return: A tuple containing the number of imported secrets, the + secret_id of the last active secret, and the recovery + document format version. + :rtype: (int, str, int) """ soledad_assert(self.STORAGE_SECRETS_KEY in data) + version = data.get(self.RECOVERY_DOC_VERSION_KEY, 1) + meth = getattr(self, '_import_recovery_document_version_%d' % version) + secret_count, active_secret = meth(data) + return secret_count, active_secret, version + + def _import_recovery_document_version_1(self, data): + """ + Import storage secrets for symmetric encryption from a recovery + document with format version 1. + + Version 1 of recovery document has the following structure: + + { + 'storage_secrets': { + '': { + 'cipher': 'aes256', + 'length': , + 'secret': '', + }, + }, + 'active_secret': '', + 'version': '', + } + + :param data: The recovery document. + :type data: dict + + :return: A tuple containing the number of imported secrets, the + secret_id of the last active secret, and the recovery + document format version. + :rtype: (int, str, int) + """ # include secrets in the secret pool. secret_count = 0 secrets = data[self.STORAGE_SECRETS_KEY].items() @@ -384,7 +441,7 @@ class SoledadSecrets(object): if secret_id not in self._secrets: try: self._secrets[secret_id] = \ - self._decrypt_storage_secret(encrypted_secret) + self._decrypt_storage_secret_version_1(encrypted_secret) secret_count += 1 except SecretsException as e: logger.error("Failed to decrypt storage secret: %s" @@ -443,13 +500,21 @@ class SoledadSecrets(object): # Management of secret for symmetric encryption. # - def _decrypt_storage_secret(self, encrypted_secret_dict): + def _decrypt_storage_secret_version_1(self, encrypted_secret_dict): """ Decrypt the storage secret. Storage secret is encrypted before being stored. This method decrypts and returns the decrypted storage secret. + Version 1 of storage secret format has the following structure: + + '': { + 'cipher': 'aes256', + 'length': , + 'secret': '', + }, + :param encrypted_secret_dict: The encrypted storage secret. :type encrypted_secret_dict: dict -- cgit v1.2.3 From f2739e7a100035d1d71eeffa3190b805a5931a50 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 7 Jun 2016 22:55:26 -0400 Subject: [pkg] remove unused chardet dependency --- client/pkg/requirements.pip | 1 - 1 file changed, 1 deletion(-) (limited to 'client') diff --git a/client/pkg/requirements.pip b/client/pkg/requirements.pip index 2f658d76..42c0d0b1 100644 --- a/client/pkg/requirements.pip +++ b/client/pkg/requirements.pip @@ -1,7 +1,6 @@ pysqlcipher>2.6.3 u1db scrypt -cchardet zope.proxy twisted -- cgit v1.2.3 From f01b2cf3aa27350eae788152f95c4f9ca8e11b9f Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 19 May 2016 17:19:31 -0300 Subject: [bug] install pip from default location Old versions of pip do not accept the --trusted-host option and will complain when trying to upgrade pip from wheel. To fix that we upgrade pip from usual location instead of doing it from wheel. --- client/pkg/pip_install_requirements.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'client') diff --git a/client/pkg/pip_install_requirements.sh b/client/pkg/pip_install_requirements.sh index d0479365..1f5ac5f6 100755 --- a/client/pkg/pip_install_requirements.sh +++ b/client/pkg/pip_install_requirements.sh @@ -80,5 +80,5 @@ insecure_flags=`return_insecure_flags` packages=`return_packages` pip install -U wheel -pip install $install_options pip +pip install -U pip pip install $install_options $insecure_flags $packages -- cgit v1.2.3 From 48ff88a7781165b98285d6c25ec5d49d49cc3503 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 15 Jun 2016 17:01:38 -0400 Subject: [bug] initialize OpenSSL context just once Do not initialize the openssl context on each call to decrypt. I'm not 100% sure of the causal chain, but it seems that the initialization of the osrandom engine that openssl backend does might be breaking havoc when sqlcipher is calling rand_bytes concurrently. further testing is needed to confirm this is the ultimate cause, but in my tests this change avoids the occurrence of the dreaded Floating Point Exception in soledad/sqlcipher. - Resolves: #8180 --- client/src/leap/soledad/client/crypto.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index b75d4301..f7d92372 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -39,6 +39,8 @@ logger = logging.getLogger(__name__) MAC_KEY_LENGTH = 64 +crypto_backend = MultiBackend([OpenSSLBackend()]) + def encrypt_sym(data, key): """ @@ -59,8 +61,7 @@ def encrypt_sym(data, key): (len(key) * 8)) iv = os.urandom(16) - backend = MultiBackend([OpenSSLBackend()]) - cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=crypto_backend) encryptor = cipher.encryptor() ciphertext = encryptor.update(data) + encryptor.finalize() @@ -87,9 +88,8 @@ def decrypt_sym(data, key, iv): soledad_assert( len(key) == 32, # 32 x 8 = 256 bits. 'Wrong key size: %s (must be 256 bits long).' % len(key)) - backend = MultiBackend([OpenSSLBackend()]) iv = binascii.a2b_base64(iv) - cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=crypto_backend) decryptor = cipher.decryptor() return decryptor.update(data) + decryptor.finalize() -- cgit v1.2.3 From a841230aa7d199151ffe1cb21d33b9b0a7bd5eb5 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 15 Jun 2016 17:29:30 -0400 Subject: [bug] move the decryption to a threadpool too --- client/src/leap/soledad/client/encdecpool.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index e348f545..7e807dcf 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -178,9 +178,11 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): secret = self._crypto.secret args = doc.doc_id, doc.rev, docstr, key, secret # encrypt asynchronously + # TODO use dedicated threadpool / move to ampoule d = threads.deferToThread( encrypt_doc_task, *args) d.addCallback(self._encrypt_doc_cb) + return d def _encrypt_doc_cb(self, result): """ @@ -429,9 +431,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): secret = self._crypto.secret args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx # decrypt asynchronously - doc = decrypt_doc_task(*args) + # TODO use dedicated threadpool / move to ampoule + d = threads.deferToThread( + decrypt_doc_task, *args) # callback will insert it for later processing - return self._decrypt_doc_cb(doc) + d.addCallback(self._decrypt_doc_cb) + return d def insert_received_doc( self, doc_id, doc_rev, content, gen, trans_id, idx): -- cgit v1.2.3 From ab37460772c3cf07c6915baf42a61a44156cfde2 Mon Sep 17 00:00:00 2001 From: NavaL Date: Mon, 20 Jun 2016 15:03:59 +0200 Subject: [style] pep8 compatibility: indent and white space It was breaking E126 and E202 before --- client/src/leap/soledad/client/adbapi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index f43e8110..cfd7675c 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -204,7 +204,7 @@ class U1DBConnectionPool(adbapi.ConnectionPool): :rtype: twisted.internet.defer.Deferred """ meth = "u1db_%s" % meth - semaphore = DeferredSemaphore(SQLCIPHER_MAX_RETRIES ) + semaphore = DeferredSemaphore(SQLCIPHER_MAX_RETRIES) def _run_interaction(): return self.runInteraction( -- cgit v1.2.3 From bcf4c28e0ad5fe1c3a3c285e50ef2a097f31cca5 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 22 Jun 2016 16:42:52 +0200 Subject: [style] pep8 --- client/src/leap/soledad/client/api.py | 3 ++- client/src/leap/soledad/client/encdecpool.py | 3 ++- client/src/leap/soledad/client/secrets.py | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index a1588aa9..33eae2c4 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -311,7 +311,8 @@ class Soledad(object): sync_exchange_phase = 0 if getattr(self._dbsyncer, 'syncer', None): if getattr(self._dbsyncer.syncer, 'sync_exchange_phase', None): - sync_exchange_phase = self._dbsyncer.syncer.sync_exchange_phase[0] + _p = self._dbsyncer.syncer.sync_exchange_phase[0] + sync_exchange_phase = _p return sync_phase, sync_exchange_phase # diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 7e807dcf..a6d49b21 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -378,7 +378,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): has finished. :rtype: twisted.internet.defer.Deferred """ - ensure_sync_id_column = "ALTER TABLE %s ADD COLUMN sync_id" % self.TABLE_NAME + ensure_sync_id_column = ("ALTER TABLE %s ADD COLUMN sync_id" % + self.TABLE_NAME) d = self._runQuery(ensure_sync_id_column) def empty_received_docs(_): diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index c35b881e..3547a711 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -441,7 +441,8 @@ class SoledadSecrets(object): if secret_id not in self._secrets: try: self._secrets[secret_id] = \ - self._decrypt_storage_secret_version_1(encrypted_secret) + self._decrypt_storage_secret_version_1( + encrypted_secret) secret_count += 1 except SecretsException as e: logger.error("Failed to decrypt storage secret: %s" -- cgit v1.2.3 From b5aa97e9f88934dd73af84f212c95775f97769a9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 28 Apr 2016 21:40:43 -0400 Subject: [refactor] make tests use l2db submodule From this moment on, we embed a fork of u1db called l2db. --- client/pkg/requirements.pip | 5 ----- client/src/leap/soledad/client/api.py | 5 +++-- client/src/leap/soledad/client/auth.py | 2 +- client/src/leap/soledad/client/http_target/api.py | 4 ++-- client/src/leap/soledad/client/http_target/fetch.py | 8 +++++--- client/src/leap/soledad/client/http_target/send.py | 3 +++ client/src/leap/soledad/client/http_target/support.py | 5 +++-- client/src/leap/soledad/client/shared_db.py | 2 +- client/src/leap/soledad/client/sqlcipher.py | 11 +++++------ client/src/leap/soledad/client/sync.py | 4 ++-- 10 files changed, 25 insertions(+), 24 deletions(-) (limited to 'client') diff --git a/client/pkg/requirements.pip b/client/pkg/requirements.pip index 42c0d0b1..9596470f 100644 --- a/client/pkg/requirements.pip +++ b/client/pkg/requirements.pip @@ -1,10 +1,5 @@ pysqlcipher>2.6.3 -u1db scrypt zope.proxy twisted -# XXX -- fix me! -# oauth is not strictly needed by us, but we need it until u1db adds it to its -# release as a dep. -oauth diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 33eae2c4..8c25243b 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -39,8 +39,7 @@ from itertools import chain from StringIO import StringIO from collections import defaultdict -from u1db.remote import http_client -from u1db.remote.ssl_match_hostname import match_hostname + from twisted.internet.defer import DeferredLock, returnValue, inlineCallbacks from zope.interface import implements @@ -50,6 +49,8 @@ from leap.common.plugins import collect_plugins from leap.soledad.common import SHARED_DB_NAME from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type +from leap.soledad.common.l2db.remote import http_client +from leap.soledad.common.l2db.remote.ssl_match_hostname import match_hostname from leap.soledad.client import adbapi from leap.soledad.client import events as soledad_events diff --git a/client/src/leap/soledad/client/auth.py b/client/src/leap/soledad/client/auth.py index 6dfabeb4..78e9bf1b 100644 --- a/client/src/leap/soledad/client/auth.py +++ b/client/src/leap/soledad/client/auth.py @@ -22,7 +22,7 @@ they can do token-based auth requests to the Soledad server. """ import base64 -from u1db import errors +from leap.soledad.common.l2db import errors class TokenBasedAuth(object): diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index b19ce9ce..f8de9a15 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -20,13 +20,13 @@ import json import base64 from uuid import uuid4 -from u1db import SyncTarget from twisted.web.error import Error from twisted.internet import defer -from leap.soledad.common.errors import InvalidAuthTokenError from leap.soledad.client.http_target.support import readBody +from leap.soledad.common.errors import InvalidAuthTokenError +from leap.soledad.common.l2db import SyncTarget # we may want to collect statistics from the sync process diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 9801c3d9..a3f70b02 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -16,15 +16,17 @@ # along with this program. If not, see . import logging import json -from u1db import errors -from u1db.remote import utils + from twisted.internet import defer -from leap.soledad.common.document import SoledadDocument + from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import emit_async from leap.soledad.client.crypto import is_symmetrically_encrypted from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.client.http_target.support import RequestBody +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import utils logger = logging.getLogger(__name__) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index 89288779..13218acf 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -16,10 +16,13 @@ # along with this program. If not, see . import json import logging + from twisted.internet import defer + from leap.soledad.client.events import emit_async from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.http_target.support import RequestBody + logger = logging.getLogger(__name__) diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index 2625744c..d82fe346 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -16,14 +16,15 @@ # along with this program. If not, see . import warnings import json -from u1db import errors -from u1db.remote import http_errors + from twisted.internet import defer from twisted.web.client import _ReadBodyProtocol from twisted.web.client import PartialDownloadError from twisted.web._newclient import ResponseDone from twisted.web._newclient import PotentialDataLoss +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import http_errors # we want to make sure that HTTP errors will raise appropriate u1db errors, # that is, fire errbacks with the appropriate failures, in the context of diff --git a/client/src/leap/soledad/client/shared_db.py b/client/src/leap/soledad/client/shared_db.py index a1d95fbe..d43db045 100644 --- a/client/src/leap/soledad/client/shared_db.py +++ b/client/src/leap/soledad/client/shared_db.py @@ -17,7 +17,7 @@ """ A shared database for storing/retrieving encrypted key material. """ -from u1db.remote import http_database +from leap.soledad.common.l2db.remote import http_database from leap.soledad.client.auth import TokenBasedAuth diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 99f5dad8..bf2a50f1 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -44,10 +44,6 @@ handled by Soledad should be created by SQLCipher >= 2.0. import logging import os import json -import u1db - -from u1db import errors as u1db_errors -from u1db.backends import sqlite_backend from hashlib import sha256 from functools import partial @@ -58,11 +54,14 @@ from twisted.internet import reactor from twisted.internet import defer from twisted.enterprise import adbapi +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common import l2db +from leap.soledad.common.l2db import errors as u1db_errors +from leap.soledad.common.l2db.backends import sqlite_backend + from leap.soledad.client.http_target import SoledadHTTPSyncTarget from leap.soledad.client.sync import SoledadSynchronizer - from leap.soledad.client import pragmas -from leap.soledad.common.document import SoledadDocument logger = logging.getLogger(__name__) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 9cafe62f..2656a150 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -23,9 +23,9 @@ import logging from twisted.internet import defer -from u1db import errors +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.sync import Synchronizer from leap.soledad.common.errors import BackendNotReadyError -from u1db.sync import Synchronizer logger = logging.getLogger(__name__) -- cgit v1.2.3 From 8a3bbc6c81f10d8e00fcdd779784f327425f1942 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 5 Jul 2016 08:37:45 +0200 Subject: [refactor] remove u1db dep from support code --- client/pkg/generate_wheels.sh | 2 +- client/pkg/pip_install_requirements.sh | 2 +- client/pkg/requirements-latest.pip | 1 - .../leap/soledad/client/examples/benchmarks/measure_index_times.py | 4 ++-- .../client/examples/benchmarks/measure_index_times_custom_docid.py | 4 ++-- client/src/leap/soledad/client/examples/use_adbapi.py | 4 ++-- client/src/leap/soledad/client/sqlcipher.py | 2 +- 7 files changed, 9 insertions(+), 10 deletions(-) (limited to 'client') diff --git a/client/pkg/generate_wheels.sh b/client/pkg/generate_wheels.sh index e29c327e..496f8e01 100755 --- a/client/pkg/generate_wheels.sh +++ b/client/pkg/generate_wheels.sh @@ -7,7 +7,7 @@ if [ "$WHEELHOUSE" = "" ]; then fi pip wheel --wheel-dir $WHEELHOUSE pip -pip wheel --wheel-dir $WHEELHOUSE --allow-external u1db --allow-unverified u1db --allow-external dirspec --allow-unverified dirspec -r pkg/requirements.pip +pip wheel --wheel-dir $WHEELHOUSE --allow-external dirspec --allow-unverified dirspec -r pkg/requirements.pip if [ -f pkg/requirements-testing.pip ]; then pip wheel --wheel-dir $WHEELHOUSE -r pkg/requirements-testing.pip fi diff --git a/client/pkg/pip_install_requirements.sh b/client/pkg/pip_install_requirements.sh index 1f5ac5f6..b97c826f 100755 --- a/client/pkg/pip_install_requirements.sh +++ b/client/pkg/pip_install_requirements.sh @@ -4,7 +4,7 @@ # Use at your own risk. # See $usage for help -insecure_packages="u1db dirspec" +insecure_packages="dirspec" leap_wheelhouse=https://lizard.leap.se/wheels show_help() { diff --git a/client/pkg/requirements-latest.pip b/client/pkg/requirements-latest.pip index a629aa57..fa483db7 100644 --- a/client/pkg/requirements-latest.pip +++ b/client/pkg/requirements-latest.pip @@ -1,6 +1,5 @@ --index-url https://pypi.python.org/simple/ ---allow-external u1db --allow-unverified u1db --allow-external dirspec --allow-unverified dirspec -e 'git+https://github.com/pixelated-project/leap_pycommon.git@develop#egg=leap.common' diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py index 08775580..4fc91d9d 100644 --- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py +++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py @@ -24,9 +24,9 @@ import hashlib import os import sys -import u1db from twisted.internet import defer, reactor +from leap.soledad.common import l2db from leap.soledad.client import adbapi from leap.soledad.client.sqlcipher import SQLCipherOptions @@ -135,7 +135,7 @@ def countDocs(_): def printResult(r, **kwargs): if kwargs: debug(*kwargs.values()) - elif isinstance(r, u1db.Document): + elif isinstance(r, l2db.Document): debug(r.doc_id, r.content['number']) else: len_results = len(r[1]) diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py index 9deba136..38ea18a3 100644 --- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py +++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py @@ -24,11 +24,11 @@ import hashlib import os import sys -import u1db from twisted.internet import defer, reactor from leap.soledad.client import adbapi from leap.soledad.client.sqlcipher import SQLCipherOptions +from leap.soledad.common import l2db folder = os.environ.get("TMPDIR", "tmp") @@ -135,7 +135,7 @@ def countDocs(_): def printResult(r, **kwargs): if kwargs: debug(*kwargs.values()) - elif isinstance(r, u1db.Document): + elif isinstance(r, l2db.Document): debug(r.doc_id, r.content['number']) else: len_results = len(r[1]) diff --git a/client/src/leap/soledad/client/examples/use_adbapi.py b/client/src/leap/soledad/client/examples/use_adbapi.py index d7bd21f2..a2683836 100644 --- a/client/src/leap/soledad/client/examples/use_adbapi.py +++ b/client/src/leap/soledad/client/examples/use_adbapi.py @@ -21,11 +21,11 @@ from __future__ import print_function import datetime import os -import u1db from twisted.internet import defer, reactor from leap.soledad.client import adbapi from leap.soledad.client.sqlcipher import SQLCipherOptions +from leap.soledad.common import l2db folder = os.environ.get("TMPDIR", "tmp") @@ -68,7 +68,7 @@ def countDocs(_): def printResult(r): - if isinstance(r, u1db.Document): + if isinstance(r, l2db.Document): debug(r.doc_id, r.content['number']) else: len_results = len(r[1]) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index bf2a50f1..f36c0b6a 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -594,7 +594,7 @@ class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): self._db_handle = conn self._real_replica_uid = None self._ensure_schema() - self._factory = u1db.Document + self._factory = l2db.Document class SoledadSQLCipherWrapper(SQLCipherDatabase): -- cgit v1.2.3 From b3fb215860a8e50e4a6c551fef78628acdbf25c7 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 5 Jul 2016 07:45:06 +0200 Subject: [bug] use default sqlcipher timeout --- client/src/leap/soledad/client/adbapi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index cfd7675c..328b4762 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -49,7 +49,7 @@ if DEBUG_SQL: How long the SQLCipher connection should wait for the lock to go away until raising an exception. """ -SQLCIPHER_CONNECTION_TIMEOUT = 10 +SQLCIPHER_CONNECTION_TIMEOUT = 5 """ How many times a SQLCipher query should be retried in case of timeout. -- cgit v1.2.3 From 297ecdb24b238eff7e7674c7ab2df1f116007d7e Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 7 Jul 2016 11:56:33 +0200 Subject: [pkg] remove unneeded dirspec exceptions --- client/pkg/generate_wheels.sh | 2 +- client/pkg/pip_install_requirements.sh | 2 +- client/pkg/requirements-latest.pip | 2 -- client/pkg/requirements.pip | 1 - 4 files changed, 2 insertions(+), 5 deletions(-) (limited to 'client') diff --git a/client/pkg/generate_wheels.sh b/client/pkg/generate_wheels.sh index 496f8e01..a13e2c7a 100755 --- a/client/pkg/generate_wheels.sh +++ b/client/pkg/generate_wheels.sh @@ -7,7 +7,7 @@ if [ "$WHEELHOUSE" = "" ]; then fi pip wheel --wheel-dir $WHEELHOUSE pip -pip wheel --wheel-dir $WHEELHOUSE --allow-external dirspec --allow-unverified dirspec -r pkg/requirements.pip +pip wheel --wheel-dir $WHEELHOUSE -r pkg/requirements.pip if [ -f pkg/requirements-testing.pip ]; then pip wheel --wheel-dir $WHEELHOUSE -r pkg/requirements-testing.pip fi diff --git a/client/pkg/pip_install_requirements.sh b/client/pkg/pip_install_requirements.sh index b97c826f..f4b5f67a 100755 --- a/client/pkg/pip_install_requirements.sh +++ b/client/pkg/pip_install_requirements.sh @@ -4,7 +4,7 @@ # Use at your own risk. # See $usage for help -insecure_packages="dirspec" +insecure_packages="" leap_wheelhouse=https://lizard.leap.se/wheels show_help() { diff --git a/client/pkg/requirements-latest.pip b/client/pkg/requirements-latest.pip index fa483db7..46a7ccba 100644 --- a/client/pkg/requirements-latest.pip +++ b/client/pkg/requirements-latest.pip @@ -1,7 +1,5 @@ --index-url https://pypi.python.org/simple/ ---allow-external dirspec --allow-unverified dirspec - -e 'git+https://github.com/pixelated-project/leap_pycommon.git@develop#egg=leap.common' -e '../common' -e . diff --git a/client/pkg/requirements.pip b/client/pkg/requirements.pip index 9596470f..2ae844e1 100644 --- a/client/pkg/requirements.pip +++ b/client/pkg/requirements.pip @@ -2,4 +2,3 @@ pysqlcipher>2.6.3 scrypt zope.proxy twisted - -- cgit v1.2.3 From d99198046e07abb0d19fde1695d22267bc5e1433 Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 8 Jul 2016 13:09:26 +0200 Subject: [bug] properly trap db errors and close resources SQLCipher database access errors can raise Soledad exceptions. Database access and multithreading resources are allocated in different places, so we have to be careful to close all multithreading mechanismis in case of database access errors. If we don't, zombie threads may haunt the reactor. This commit adds SQLCipher exception trapping and Soledad exception raising for database access errors, while properly shutting down multithreading resources. --- client/src/leap/soledad/client/adbapi.py | 29 ++++++++++++++++++----------- client/src/leap/soledad/client/api.py | 19 ++++++++++++++++--- client/src/leap/soledad/client/sqlcipher.py | 27 ++++++++++++++------------- 3 files changed, 48 insertions(+), 27 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 328b4762..234be6b6 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -29,8 +29,7 @@ from twisted.enterprise import adbapi from twisted.internet.defer import DeferredSemaphore from twisted.python import log from zope.proxy import ProxyBase, setProxiedObject -from pysqlcipher.dbapi2 import OperationalError -from pysqlcipher.dbapi2 import DatabaseError +from pysqlcipher import dbapi2 from leap.soledad.common.errors import DatabaseAccessError @@ -105,8 +104,10 @@ class U1DBConnection(adbapi.Connection): self._sync_enc_pool = sync_enc_pool try: adbapi.Connection.__init__(self, pool) - except DatabaseError: - raise DatabaseAccessError('Could not open sqlcipher database') + except dbapi2.DatabaseError as e: + raise DatabaseAccessError( + 'Error initializing connection to sqlcipher database: %s' + % str(e)) def reconnect(self): """ @@ -174,8 +175,9 @@ class U1DBConnectionPool(adbapi.ConnectionPool): self._sync_enc_pool = kwargs.pop("sync_enc_pool") try: adbapi.ConnectionPool.__init__(self, *args, **kwargs) - except DatabaseError: - raise DatabaseAccessError('Could not open sqlcipher database') + except dbapi2.DatabaseError as e: + raise DatabaseAccessError( + 'Error initializing u1db connection pool: %s' % str(e)) # all u1db connections, hashed by thread-id self._u1dbconnections = {} @@ -183,10 +185,15 @@ class U1DBConnectionPool(adbapi.ConnectionPool): # The replica uid, primed by the connections on init. self.replica_uid = ProxyBase(None) - conn = self.connectionFactory( - self, self._sync_enc_pool, init_u1db=True) - replica_uid = conn._u1db._real_replica_uid - setProxiedObject(self.replica_uid, replica_uid) + try: + conn = self.connectionFactory( + self, self._sync_enc_pool, init_u1db=True) + replica_uid = conn._u1db._real_replica_uid + setProxiedObject(self.replica_uid, replica_uid) + except DatabaseAccessError as e: + self.threadpool.stop() + raise DatabaseAccessError( + "Error initializing connection factory: %s" % str(e)) def runU1DBQuery(self, meth, *args, **kw): """ @@ -211,7 +218,7 @@ class U1DBConnectionPool(adbapi.ConnectionPool): self._runU1DBQuery, meth, *args, **kw) def _errback(failure): - failure.trap(OperationalError) + failure.trap(dbapi2.OperationalError) if failure.getErrorMessage() == "database is locked": should_retry = semaphore.acquire() if should_retry: diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 8c25243b..1bfbed8a 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -51,6 +51,7 @@ from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type from leap.soledad.common.l2db.remote import http_client from leap.soledad.common.l2db.remote.ssl_match_hostname import match_hostname +from leap.soledad.common.errors import DatabaseAccessError from leap.soledad.client import adbapi from leap.soledad.client import events as soledad_events @@ -213,10 +214,22 @@ class Soledad(object): self._init_secrets() self._crypto = SoledadCrypto(self._secrets.remote_storage_secret) - self._init_u1db_sqlcipher_backend() - if syncable: - self._init_u1db_syncer() + try: + # initialize database access, trap any problems so we can shutdown + # smoothly. + self._init_u1db_sqlcipher_backend() + if syncable: + self._init_u1db_syncer() + except DatabaseAccessError: + # oops! something went wrong with backend initialization. We + # have to close any thread-related stuff we have already opened + # here, otherwise there might be zombie threads that may clog the + # reactor. + self._sync_db.close() + if hasattr(self, '_dbpool'): + self._dbpool.close() + raise # # initialization/destruction methods diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index f36c0b6a..166c0783 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -58,6 +58,7 @@ from leap.soledad.common.document import SoledadDocument from leap.soledad.common import l2db from leap.soledad.common.l2db import errors as u1db_errors from leap.soledad.common.l2db.backends import sqlite_backend +from leap.soledad.common.errors import DatabaseAccessError from leap.soledad.client.http_target import SoledadHTTPSyncTarget from leap.soledad.client.sync import SoledadSynchronizer @@ -442,22 +443,19 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # format is the following: # # self._syncers = {'': ('', syncer), ...} - self._syncers = {} - - # Storage for the documents received during a sync + # storage for the documents received during a sync self.received_docs = [] self.running = False + self.shutdownID = None + self._db_handle = None + # initialize the main db before scheduling a start + self._initialize_main_db() self._reactor = reactor self._reactor.callWhenRunning(self._start) - self._db_handle = None - self._initialize_main_db() - - self.shutdownID = None - if DO_STATS: self.sync_phase = None @@ -472,11 +470,14 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self.running = True def _initialize_main_db(self): - self._db_handle = initialize_sqlcipher_db( - self._opts, check_same_thread=False) - self._real_replica_uid = None - self._ensure_schema() - self.set_document_factory(soledad_doc_factory) + try: + self._db_handle = initialize_sqlcipher_db( + self._opts, check_same_thread=False) + self._real_replica_uid = None + self._ensure_schema() + self.set_document_factory(soledad_doc_factory) + except sqlcipher_dbapi2.DatabaseError as e: + raise DatabaseAccessError(str(e)) @defer.inlineCallbacks def sync(self, url, creds=None, defer_decryption=True): -- cgit v1.2.3 From f406ccbaf2b79db1d65827463f830f4ffbe5856c Mon Sep 17 00:00:00 2001 From: drebs Date: Sun, 10 Jul 2016 10:38:04 +0200 Subject: [refactor] make u1db connection pool args explicit --- client/src/leap/soledad/client/adbapi.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) (limited to 'client') diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 234be6b6..ef0f9066 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -78,9 +78,11 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher", if openfun is None and driver == "pysqlcipher": openfun = partial(set_init_pragmas, opts=opts) return U1DBConnectionPool( - "%s.dbapi2" % driver, opts=opts, sync_enc_pool=sync_enc_pool, - database=opts.path, check_same_thread=False, cp_openfun=openfun, - timeout=SQLCIPHER_CONNECTION_TIMEOUT) + opts, sync_enc_pool, + # the following params are relayed "as is" to twisted's + # ConnectionPool. + "%s.dbapi2" % driver, opts.path, timeout=SQLCIPHER_CONNECTION_TIMEOUT, + check_same_thread=False, cp_openfun=openfun) class U1DBConnection(adbapi.Connection): @@ -166,13 +168,12 @@ class U1DBConnectionPool(adbapi.ConnectionPool): connectionFactory = U1DBConnection transactionFactory = U1DBTransaction - def __init__(self, *args, **kwargs): + def __init__(self, opts, sync_enc_pool, *args, **kwargs): """ Initialize the connection pool. """ - # extract soledad-specific objects from keyword arguments - self.opts = kwargs.pop("opts") - self._sync_enc_pool = kwargs.pop("sync_enc_pool") + self.opts = opts + self._sync_enc_pool = sync_enc_pool try: adbapi.ConnectionPool.__init__(self, *args, **kwargs) except dbapi2.DatabaseError as e: @@ -220,6 +221,7 @@ class U1DBConnectionPool(adbapi.ConnectionPool): def _errback(failure): failure.trap(dbapi2.OperationalError) if failure.getErrorMessage() == "database is locked": + logger.warning("Database operation timed out.") should_retry = semaphore.acquire() if should_retry: logger.warning( -- cgit v1.2.3 From 7d264548d6df756f2a157fe59cf58b3240825418 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 13 Jul 2016 19:36:14 +0200 Subject: [style] pep8 --- client/src/leap/soledad/client/http_target/support.py | 1 + 1 file changed, 1 insertion(+) (limited to 'client') diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index d82fe346..6ec98ed4 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -31,6 +31,7 @@ from leap.soledad.common.l2db.remote import http_errors # twisted. Because of that, we redefine the http body reader used by the HTTP # client below. + class ReadBodyProtocol(_ReadBodyProtocol): """ From original Twisted implementation, focused on adding our error -- cgit v1.2.3 From 6b23b3f3215f2443aa3e790559b63a41b3040072 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 14 Jul 2016 10:19:21 +0200 Subject: [pkg] bump changelog to 0.8.1 --- client/changes/next-changelog.rst | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) (limited to 'client') diff --git a/client/changes/next-changelog.rst b/client/changes/next-changelog.rst index 7ddb3a57..6c1c2a49 100644 --- a/client/changes/next-changelog.rst +++ b/client/changes/next-changelog.rst @@ -1,4 +1,4 @@ -0.8.1 - ... +0.8.2 - ... ++++++++++++++++++++ Please add lines to this file, they will be moved to the CHANGELOG.rst during @@ -10,26 +10,15 @@ I've added a new category `Misc` so we can track doc/style/packaging stuff. Features ~~~~~~~~ -- Add recovery document format version for future migrations. -- Use DeferredLock instead of its locking cousin. - `#1234 `_: Description of the new feature corresponding with issue #1234. -- New feature without related issue number. Bugfixes ~~~~~~~~ - `#1235 `_: Description for the fixed stuff corresponding with issue #1235. -- Remove document content conversion to unicode. Users of API are responsible - for only passing valid JSON to Soledad for storage. -- Bugfix without related issue number. Misc ~~~~ -- Add ability to get information about sync phases for profiling purposes. -- Add script for setting up develop environment. -- Refactor bootstrap to remove shared db lock. - `#1236 `_: Description of the new feature corresponding with issue #1236. -- Some change without issue number. -- Removed multiprocessing from encdecpool with some extra refactoring. Known Issues ~~~~~~~~~~~~ -- cgit v1.2.3