diff options
| -rw-r--r-- | client/changes/bug_6757_fix-order-of-insertion-when-syncing | 2 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/crypto.py | 552 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 673 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 8 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/target.py | 64 | ||||
| -rw-r--r-- | scripts/db_access/client_side_db.py | 13 | 
6 files changed, 715 insertions, 597 deletions
| diff --git a/client/changes/bug_6757_fix-order-of-insertion-when-syncing b/client/changes/bug_6757_fix-order-of-insertion-when-syncing new file mode 100644 index 00000000..c0470f5a --- /dev/null +++ b/client/changes/bug_6757_fix-order-of-insertion-when-syncing @@ -0,0 +1,2 @@ +  o Fix the order of insertion of documents when using workers for decrypting +    incoming documents during a sync. Closes #6757. diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index dd40b198..bdbaa8e0 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -23,21 +23,13 @@ import hmac  import hashlib  import json  import logging -import multiprocessing -import threading -import time  from pycryptopp.cipher.aes import AES  from pycryptopp.cipher.xsalsa20 import XSalsa20 -from zope.proxy import sameProxiedObjects - -from twisted.internet import defer -from twisted.internet.threads import deferToThread  from leap.soledad.common import soledad_assert  from leap.soledad.common import soledad_assert_type  from leap.soledad.common import crypto -from leap.soledad.common.document import SoledadDocument  logger = logging.getLogger(__name__) @@ -515,547 +507,3 @@ def is_symmetrically_encrypted(doc):                  == crypto.EncryptionSchemes.SYMKEY:              return True      return False - - -# -# Encrypt/decrypt pools of workers -# - -class SyncEncryptDecryptPool(object): -    """ -    Base class for encrypter/decrypter pools. -    """ -    WORKERS = multiprocessing.cpu_count() - -    def __init__(self, crypto, sync_db): -        """ -        Initialize the pool of encryption-workers. - -        :param crypto: A SoledadCryto instance to perform the encryption. -        :type crypto: leap.soledad.crypto.SoledadCrypto - -        :param sync_db: A database connection handle -        :type sync_db: pysqlcipher.dbapi2.Connection - -        :param write_lock: a write lock for controlling concurrent access -                           to the sync_db -        :type write_lock: threading.Lock -        """ -        self._pool = multiprocessing.Pool(self.WORKERS) -        self._crypto = crypto -        self._sync_db = sync_db - -    def close(self): -        """ -        Cleanly close the pool of workers. -        """ -        logger.debug("Closing %s" % (self.__class__.__name__,)) -        self._pool.close() -        try: -            self._pool.join() -        except Exception: -            pass - -    def terminate(self): -        """ -        Terminate the pool of workers. -        """ -        logger.debug("Terminating %s" % (self.__class__.__name__,)) -        self._pool.terminate() - - -def encrypt_doc_task(doc_id, doc_rev, content, key, secret): -    """ -    Encrypt the content of the given document. - -    :param doc_id: The document id. -    :type doc_id: str -    :param doc_rev: The document revision. -    :type doc_rev: str -    :param content: The serialized content of the document. -    :type content: str -    :param key: The encryption key. -    :type key: str -    :param secret: The Soledad storage secret (used for MAC auth). -    :type secret: str - -    :return: A tuple containing the doc id, revision and encrypted content. -    :rtype: tuple(str, str, str) -    """ -    encrypted_content = encrypt_docstr( -        content, doc_id, doc_rev, key, secret) -    return doc_id, doc_rev, encrypted_content - - -class SyncEncrypterPool(SyncEncryptDecryptPool): -    """ -    Pool of workers that spawn subprocesses to execute the symmetric encryption -    of documents to be synced. -    """ -    # TODO implement throttling to reduce cpu usage?? -    WORKERS = multiprocessing.cpu_count() -    TABLE_NAME = "docs_tosync" -    FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" - -    def encrypt_doc(self, doc, workers=True): -        """ -        Symmetrically encrypt a document. - -        :param doc: The document with contents to be encrypted. -        :type doc: SoledadDocument - -        :param workers: Whether to defer the decryption to the multiprocess -                        pool of workers. Useful for debugging purposes. -        :type workers: bool -        """ -        soledad_assert(self._crypto is not None, "need a crypto object") -        docstr = doc.get_json() -        key = self._crypto.doc_passphrase(doc.doc_id) -        secret = self._crypto.secret -        args = doc.doc_id, doc.rev, docstr, key, secret - -        try: -            if workers: -                res = self._pool.apply_async( -                    encrypt_doc_task, args, -                    callback=self.encrypt_doc_cb) -            else: -                # encrypt inline -                res = encrypt_doc_task(*args) -                self.encrypt_doc_cb(res) - -        except Exception as exc: -            logger.exception(exc) - -    def encrypt_doc_cb(self, result): -        """ -        Insert results of encryption routine into the local sync database. - -        :param result: A tuple containing the doc id, revision and encrypted -                       content. -        :type result: tuple(str, str, str) -        """ -        doc_id, doc_rev, content = result -        return self.insert_encrypted_local_doc(doc_id, doc_rev, content) - -    @defer.inlineCallbacks -    def insert_encrypted_local_doc(self, doc_id, doc_rev, content): -        """ -        Insert the contents of the encrypted doc into the local sync -        database. - -        :param doc_id: The document id. -        :type doc_id: str -        :param doc_rev: The document revision. -        :type doc_rev: str -        :param content: The serialized content of the document. -        :type content: str -        :param content: The encrypted document. -        :type content: str -        """ -        # FIXME --- callback should complete immediately since otherwise the -        # thread which handles the results will get blocked -        # Right now we're blocking the dispatcher with the writes to sqlite. -        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ -                % (self.TABLE_NAME,) -        yield self._sync_db.runQuery(query, (doc_id, doc_rev, content)) - - -def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): -    """ -    Decrypt the content of the given document. - -    :param doc_id: The document id. -    :type doc_id: str -    :param doc_rev: The document revision. -    :type doc_rev: str -    :param content: The encrypted content of the document. -    :type content: str -    :param gen: The generation corresponding to the modification of that -                document. -    :type gen: int -    :param trans_id: The transaction id corresponding to the modification of -                     that document. -    :type trans_id: str -    :param key: The encryption key. -    :type key: str -    :param secret: The Soledad storage secret (used for MAC auth). -    :type secret: str - -    :return: A tuple containing the doc id, revision and encrypted content. -    :rtype: tuple(str, str, str) -    """ -    decrypted_content = decrypt_doc_dict( -        content, doc_id, doc_rev, key, secret) -    return doc_id, doc_rev, decrypted_content, gen, trans_id - - -class SyncDecrypterPool(SyncEncryptDecryptPool): -    """ -    Pool of workers that spawn subprocesses to execute the symmetric decryption -    of documents that were received. - -    The decryption of the received documents is done in two steps: - -        1. All the encrypted docs are collected, together with their generation -           and transaction-id -        2. The docs are enqueued for decryption. When completed, they are -           inserted following the generation order. -    """ -    # TODO implement throttling to reduce cpu usage?? -    TABLE_NAME = "docs_received" -    FIELD_NAMES = "doc_id PRIMARY_KEY, rev, content, gen, trans_id, encrypted" - -    """ -    Period of recurrence of the periodic decrypting task, in seconds. -    """ -    DECRYPT_LOOP_PERIOD = 0.5 - -    def __init__(self, *args, **kwargs): -        """ -        Initialize the decrypter pool, and setup a dict for putting the -        results of the decrypted docs until they are picked by the insert -        routine that gets them in order. - -        :param insert_doc_cb: A callback for inserting received documents from -                              target. If not overriden, this will call u1db -                              insert_doc_from_target in synchronizer, which -                              implements the TAKE OTHER semantics. -        :type insert_doc_cb: function -        :param last_known_generation: Target's last known generation. -        :type last_known_generation: int -        """ -        self._insert_doc_cb = kwargs.pop("insert_doc_cb") -        self.source_replica_uid = kwargs.pop("source_replica_uid") -        SyncEncryptDecryptPool.__init__(self, *args, **kwargs) -        self._async_results = [] - -        self._stopped = threading.Event() -        self._deferred_loop = deferToThread(self._decrypt_and_process_docs) -        self._deferred_loop.addCallback( -            lambda _: logger.debug("Finished decryptor thread.")) - -    @defer.inlineCallbacks -    def insert_encrypted_received_doc(self, doc_id, doc_rev, content, -                                      gen, trans_id): -        """ -        Insert a received message with encrypted content, to be decrypted later -        on. - -        :param doc_id: The Document ID. -        :type doc_id: str -        :param doc_rev: The Document Revision -        :param doc_rev: str -        :param content: the Content of the document -        :type content: str -        :param gen: the Document Generation -        :type gen: int -        :param trans_id: Transaction ID -        :type trans_id: str -        """ -        docstr = json.dumps(content) -        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( -            self.TABLE_NAME,) -        yield self._sync_db.runQuery( -            query, -            (doc_id, doc_rev, docstr, gen, trans_id, 1)) - -    @defer.inlineCallbacks -    def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): -        """ -        Insert a document that is not symmetrically encrypted. -        We store it in the staging area (the decrypted_docs dictionary) to be -        picked up in order as the preceding documents are decrypted. - -        :param doc_id: The Document ID. -        :type doc_id: str -        :param doc_rev: The Document Revision -        :param doc_rev: str -        :param content: the Content of the document -        :type content: str -        :param gen: the Document Generation -        :type gen: int -        :param trans_id: Transaction ID -        :type trans_id: str -        """ -        if not isinstance(content, str): -            content = json.dumps(content) -        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( -            self.TABLE_NAME,) -        yield self._sync_db.runQuery( -            query, -            (doc_id, doc_rev, content, gen, trans_id, 0)) - -    @defer.inlineCallbacks -    def delete_received_doc(self, doc_id, doc_rev): -        """ -        Delete a received doc after it was inserted into the local db. - -        :param doc_id: Document ID. -        :type doc_id: str -        :param doc_rev: Document revision. -        :type doc_rev: str -        """ -        sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( -            self.TABLE_NAME,) -        yield self._sync_db.runQuery(sql_del, (doc_id, doc_rev)) - -    def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, -                     source_replica_uid, workers=True): -        """ -        Symmetrically decrypt a document. - -        :param doc_id: The ID for the document with contents to be encrypted. -        :type doc: str -        :param rev: The revision of the document. -        :type rev: str -        :param content: The serialized content of the document. -        :type content: str -        :param gen: The generation corresponding to the modification of that -                    document. -        :type gen: int -        :param trans_id: The transaction id corresponding to the modification -                         of that document. -        :type trans_id: str -        :param source_replica_uid: -        :type source_replica_uid: str - -        :param workers: Whether to defer the decryption to the multiprocess -                        pool of workers. Useful for debugging purposes. -        :type workers: bool -        """ -        self.source_replica_uid = source_replica_uid - -        # insert_doc_cb is a proxy object that gets updated with the right -        # insert function only when the sync_target invokes the sync_exchange -        # method. so, if we don't still have a non-empty callback, we refuse -        # to proceed. -        if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), -                              None): -            logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") -            return - -        soledad_assert(self._crypto is not None, "need a crypto object") - -        if len(content) == 0: -            # not encrypted payload -            return - -        content = json.loads(content) -        key = self._crypto.doc_passphrase(doc_id) -        secret = self._crypto.secret -        args = doc_id, rev, content, gen, trans_id, key, secret - -        if workers: -            # save the async result object so we can inspect it for failures -            self._async_results.append(self._pool.apply_async( -                decrypt_doc_task, args, -                callback=self._decrypt_doc_cb)) -        else: -            # decrypt inline -            res = decrypt_doc_task(*args) -            self._decrypt_doc_cb(res) - -    def _decrypt_doc_cb(self, result): -        """ -        Store the decryption result in the sync db from where it will later be -        picked by _process_decrypted. - -        :param result: A tuple containing the doc id, revision and encrypted -        content. -        :type result: tuple(str, str, str) -        """ -        doc_id, rev, content, gen, trans_id = result -        logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" -                     % (doc_id, rev, gen, trans_id)) -        return self.insert_received_doc(doc_id, rev, content, gen, trans_id) - -    def get_docs_by_generation(self, encrypted=None): -        """ -        Get all documents in the received table from the sync db, -        ordered by generation. - -        :param encrypted: If not None, only return documents with encrypted -                          field equal to given parameter. -        :type encrypted: bool or None - -        :return: list of doc_id, rev, generation, gen, trans_id -        :rtype: list -        """ -        sql = "SELECT doc_id, rev, content, gen, trans_id, encrypted FROM %s" \ -              % self.TABLE_NAME -        if encrypted is not None: -            sql += " WHERE encrypted = %d" % int(encrypted) -        sql += " ORDER BY gen ASC" -        return self._fetchall(sql) - -    @defer.inlineCallbacks -    def get_insertable_docs_by_gen(self): -        """ -        Return a list of non-encrypted documents ready to be inserted. -        """ -        # here, we compare the list of all available docs with the list of -        # decrypted docs and find the longest common prefix between these two -        # lists. Note that the order of lists fetch matters: if instead we -        # first fetch the list of decrypted docs and then the list of all -        # docs, then some document might have been decrypted between these two -        # calls, and if it is just the right doc then it might not be caught -        # by the next loop. -        all_docs = yield self.get_docs_by_generation() -        decrypted_docs = yield self.get_docs_by_generation(encrypted=False) -        insertable = [] -        for doc_id, rev, _, gen, trans_id, encrypted in all_docs: -            for next_doc_id, _, next_content, _, _, _ in decrypted_docs: -                if doc_id == next_doc_id: -                    content = next_content -                    insertable.append((doc_id, rev, content, gen, trans_id)) -                else: -                    break -        defer.returnValue(insertable) - -    @defer.inlineCallbacks -    def _count_docs_in_sync_db(self, encrypted=None): -        """ -        Count how many documents we have in the table for received docs. - -        :param encrypted: If not None, return count of documents with -                          encrypted field equal to given parameter. -        :type encrypted: bool or None - -        :return: The count of documents. -        :rtype: int -        """ -        query = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) -        if encrypted is not None: -            query += " WHERE encrypted = %d" % int(encrypted) -        res = yield self._sync_db.runQuery(query) -        if res: -            val = res.pop() -            defer.returnValue(val[0]) -        else: -            defer.returnValue(0) - -    @defer.inlineCallbacks -    def _decrypt_received_docs(self): -        """ -        Get all the encrypted documents from the sync database and dispatch a -        decrypt worker to decrypt each one of them. -        """ -        self._raise_in_case_of_failed_async_calls() -        docs_by_generation = yield self.get_docs_by_generation(encrypted=True) -        for doc_id, rev, content, gen, trans_id, _ in docs_by_generation: -            self._decrypt_doc( -                doc_id, rev, content, gen, trans_id, self.source_replica_uid) - -    @defer.inlineCallbacks -    def _process_decrypted(self): -        """ -        Process the already decrypted documents, and insert as many documents -        as can be taken from the expected order without finding a gap. - -        :return: Whether we have processed all the pending docs. -        :rtype: bool -        """ -        # Acquire the lock to avoid processing while we're still -        # getting data from the syncing stream, to avoid InvalidGeneration -        # problems. -        insertable = yield self.get_insertable_docs_by_gen() -        for doc_fields in insertable: -            yield self.insert_decrypted_local_doc(*doc_fields) - -    @defer.inlineCallbacks -    def insert_decrypted_local_doc(self, doc_id, doc_rev, content, -                                   gen, trans_id): -        """ -        Insert the decrypted document into the local sqlcipher database. -        Makes use of the passed callback `return_doc_cb` passed to the caller -        by u1db sync. - -        :param doc_id: The document id. -        :type doc_id: str -        :param doc_rev: The document revision. -        :type doc_rev: str -        :param content: The serialized content of the document. -        :type content: str -        :param gen: The generation corresponding to the modification of that -                    document. -        :type gen: int -        :param trans_id: The transaction id corresponding to the modification -                         of that document. -        :type trans_id: str -        """ -        # could pass source_replica in params for callback chain -        insert_fun = self._insert_doc_cb[self.source_replica_uid] -        logger.debug("Sync decrypter pool: inserting doc in local db: " -                     "%s:%s %s" % (doc_id, doc_rev, gen)) - -        # convert deleted documents to avoid error on document creation -        if content == 'null': -            content = None -        doc = SoledadDocument(doc_id, doc_rev, content) -        gen = int(gen) -        insert_fun(doc, gen, trans_id) - -        # If no errors found, remove it from the received database. -        yield self.delete_received_doc(doc_id, doc_rev) - -    @defer.inlineCallbacks -    def empty(self): -        """ -        Empty the received docs table of the sync database. -        """ -        sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) -        yield self._sync_db.runQuery(sql) - -    @defer.inlineCallbacks -    def _fetchall(self, *args, **kwargs): -        results = yield self._sync_db.runQuery(*args, **kwargs) -        defer.returnValue(results) - -    def _raise_in_case_of_failed_async_calls(self): -        """ -        Re-raise any exception raised by an async call. - -        :raise Exception: Raised if an async call has raised an exception. -        """ -        for res in self._async_results: -            if res.ready(): -                if not res.successful(): -                    # re-raise the exception raised by the remote call -                    res.get() - -    def _stop_decr_loop(self): -        """ -        """ -        self._stopped.set() - -    def close(self): -        """ -        """ -        self._stop_decr_loop() -        SyncEncryptDecryptPool.close(self) - -    def _decrypt_and_process_docs(self): -        """ -        Decrypt the documents received from remote replica and insert them -        into the local one. - -        Called periodically from LoopingCall self._sync_loop. -        """ -        while not self._stopped.is_set(): -            if sameProxiedObjects( -                    self._insert_doc_cb.get(self.source_replica_uid), -                    None): -                continue -            self._decrypt_received_docs() -            self._process_decrypted() -            time.sleep(self.DECRYPT_LOOP_PERIOD) - -    def wait(self): -        while not self.clear_to_sync(): -            time.sleep(self.DECRYPT_LOOP_PERIOD) - -    @defer.inlineCallbacks -    def clear_to_sync(self): -        count = yield self._count_docs_in_sync_db() -        defer.returnValue(count == 0) diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py new file mode 100644 index 00000000..0466ec5d --- /dev/null +++ b/client/src/leap/soledad/client/encdecpool.py @@ -0,0 +1,673 @@ +# -*- coding: utf-8 -*- +# encdecpool.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +A pool of encryption/decryption concurrent and parallel workers for using +during synchronization. +""" + + +import multiprocessing +import threading +import time +import json +import logging + +from zope.proxy import sameProxiedObjects + +from twisted.internet import defer +from twisted.internet.threads import deferToThread + +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common import soledad_assert + +from leap.soledad.client.crypto import encrypt_docstr +from leap.soledad.client.crypto import decrypt_doc_dict + + +logger = logging.getLogger(__name__) + + +# +# Encrypt/decrypt pools of workers +# + +class SyncEncryptDecryptPool(object): +    """ +    Base class for encrypter/decrypter pools. +    """ +    WORKERS = multiprocessing.cpu_count() + +    def __init__(self, crypto, sync_db): +        """ +        Initialize the pool of encryption-workers. + +        :param crypto: A SoledadCryto instance to perform the encryption. +        :type crypto: leap.soledad.crypto.SoledadCrypto + +        :param sync_db: A database connection handle +        :type sync_db: pysqlcipher.dbapi2.Connection +        """ +        self._pool = multiprocessing.Pool(self.WORKERS) +        self._crypto = crypto +        self._sync_db = sync_db + +    def close(self): +        """ +        Cleanly close the pool of workers. +        """ +        logger.debug("Closing %s" % (self.__class__.__name__,)) +        self._pool.close() +        try: +            self._pool.join() +        except Exception: +            pass + +    def terminate(self): +        """ +        Terminate the pool of workers. +        """ +        logger.debug("Terminating %s" % (self.__class__.__name__,)) +        self._pool.terminate() + + +def encrypt_doc_task(doc_id, doc_rev, content, key, secret): +    """ +    Encrypt the content of the given document. + +    :param doc_id: The document id. +    :type doc_id: str +    :param doc_rev: The document revision. +    :type doc_rev: str +    :param content: The serialized content of the document. +    :type content: str +    :param key: The encryption key. +    :type key: str +    :param secret: The Soledad storage secret (used for MAC auth). +    :type secret: str + +    :return: A tuple containing the doc id, revision and encrypted content. +    :rtype: tuple(str, str, str) +    """ +    encrypted_content = encrypt_docstr( +        content, doc_id, doc_rev, key, secret) +    return doc_id, doc_rev, encrypted_content + + +class SyncEncrypterPool(SyncEncryptDecryptPool): +    """ +    Pool of workers that spawn subprocesses to execute the symmetric encryption +    of documents to be synced. +    """ +    # TODO implement throttling to reduce cpu usage?? +    WORKERS = multiprocessing.cpu_count() +    TABLE_NAME = "docs_tosync" +    FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" + +    def encrypt_doc(self, doc, workers=True): +        """ +        Symmetrically encrypt a document. + +        :param doc: The document with contents to be encrypted. +        :type doc: SoledadDocument + +        :param workers: Whether to defer the decryption to the multiprocess +                        pool of workers. Useful for debugging purposes. +        :type workers: bool +        """ +        soledad_assert(self._crypto is not None, "need a crypto object") +        docstr = doc.get_json() +        key = self._crypto.doc_passphrase(doc.doc_id) +        secret = self._crypto.secret +        args = doc.doc_id, doc.rev, docstr, key, secret + +        try: +            if workers: +                res = self._pool.apply_async( +                    encrypt_doc_task, args, +                    callback=self.encrypt_doc_cb) +            else: +                # encrypt inline +                res = encrypt_doc_task(*args) +                self.encrypt_doc_cb(res) + +        except Exception as exc: +            logger.exception(exc) + +    def encrypt_doc_cb(self, result): +        """ +        Insert results of encryption routine into the local sync database. + +        :param result: A tuple containing the doc id, revision and encrypted +                       content. +        :type result: tuple(str, str, str) +        """ +        doc_id, doc_rev, content = result +        return self.insert_encrypted_local_doc(doc_id, doc_rev, content) + +    def insert_encrypted_local_doc(self, doc_id, doc_rev, content): +        """ +        Insert the contents of the encrypted doc into the local sync +        database. + +        :param doc_id: The document id. +        :type doc_id: str +        :param doc_rev: The document revision. +        :type doc_rev: str +        :param content: The serialized content of the document. +        :type content: str +        """ +        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ +                % (self.TABLE_NAME,) +        return self._sync_db.runOperation(query, (doc_id, doc_rev, content)) + + +def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, +                     idx): +    """ +    Decrypt the content of the given document. + +    :param doc_id: The document id. +    :type doc_id: str +    :param doc_rev: The document revision. +    :type doc_rev: str +    :param content: The encrypted content of the document. +    :type content: str +    :param gen: The generation corresponding to the modification of that +                document. +    :type gen: int +    :param trans_id: The transaction id corresponding to the modification of +                     that document. +    :type trans_id: str +    :param key: The encryption key. +    :type key: str +    :param secret: The Soledad storage secret (used for MAC auth). +    :type secret: str +    :param idx: The index of this document in the current sync process. +    :type idx: int + +    :return: A tuple containing the doc id, revision and encrypted content. +    :rtype: tuple(str, str, str) +    """ +    decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) +    return doc_id, doc_rev, decrypted_content, gen, trans_id, idx + + +class SyncDecrypterPool(SyncEncryptDecryptPool): +    """ +    Pool of workers that spawn subprocesses to execute the symmetric decryption +    of documents that were received. + +    The decryption of the received documents is done in two steps: + +        1. Encrypted documents are stored in the sync db by the actual soledad +           sync loop. +        2. The soledad sync loop tells us how many documents we should expect +           to process. +        3. We start a decrypt-and-process loop: + +            a. Encrypted documents are fetched. +            b. Encrypted documents are decrypted. +            c. The longest possible list of decrypted documents are inserted +               in the soledad db (this depends on which documents have already +               arrived and which documents have already been decrypte, because +               the order of insertion in the local soledad db matters). +            d. Processed documents are deleted from the database. + +        4. When we have processed as many documents as we should, the loop +           finishes. +    """ +    # TODO implement throttling to reduce cpu usage?? +    TABLE_NAME = "docs_received" +    FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ +                  "trans_id, encrypted, idx" + +    """ +    Period of recurrence of the periodic decrypting task, in seconds. +    """ +    DECRYPT_LOOP_PERIOD = 0.5 + +    def __init__(self, *args, **kwargs): +        """ +        Initialize the decrypter pool, and setup a dict for putting the +        results of the decrypted docs until they are picked by the insert +        routine that gets them in order. + +        :param insert_doc_cb: A callback for inserting received documents from +                              target. If not overriden, this will call u1db +                              insert_doc_from_target in synchronizer, which +                              implements the TAKE OTHER semantics. +        :type insert_doc_cb: function +        :param source_replica_uid: The source replica uid, used to find the +                                   correct callback for inserting documents. +        :type source_replica_uid: str +        """ +        self._insert_doc_cb = kwargs.pop("insert_doc_cb") +        self.source_replica_uid = kwargs.pop("source_replica_uid") +        SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + +        self._last_inserted_idx = 0 +        self._docs_to_process = None +        self._processed_docs = 0 + +        self._async_results = [] +        self._exception = None +        self._finished = threading.Event() + +        # clear the database before starting the sync +        self._empty_db = threading.Event() +        d = self._empty() +        d.addCallback(lambda _: self._empty_db.set()) + +        # start the decryption loop +        self._deferred_loop = deferToThread(self._decrypt_and_process_docs) +        self._deferred_loop.addCallback( +            lambda _: logger.debug("Finished decryptor thread.")) + +    def set_docs_to_process(self, docs_to_process): +        """ +        Set the number of documents we expect to process. + +        This should be called by the during the sync exchange process as soon +        as we know how many documents are arriving from the server. + +        :param docs_to_process: The number of documents to process. +        :type docs_to_process: int +        """ +        self._docs_to_process = docs_to_process + +    def insert_encrypted_received_doc( +            self, doc_id, doc_rev, content, gen, trans_id, idx): +        """ +        Insert a received message with encrypted content, to be decrypted later +        on. + +        :param doc_id: The Document ID. +        :type doc_id: str +        :param doc_rev: The Document Revision +        :param doc_rev: str +        :param content: the Content of the document +        :type content: str +        :param gen: the Document Generation +        :type gen: int +        :param trans_id: Transaction ID +        :type trans_id: str +        :param idx: The index of this document in the current sync process. +        :type idx: int + +        :return: A deferred that will fire when the operation in the database +                 has finished. +        :rtype: twisted.internet.defer.Deferred +        """ +        docstr = json.dumps(content) +        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ +                % self.TABLE_NAME +        return self._sync_db.runOperation( +            query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx)) + +    def insert_received_doc( +            self, doc_id, doc_rev, content, gen, trans_id, idx): +        """ +        Insert a document that is not symmetrically encrypted. +        We store it in the staging area (the decrypted_docs dictionary) to be +        picked up in order as the preceding documents are decrypted. + +        :param doc_id: The Document ID. +        :type doc_id: str +        :param doc_rev: The Document Revision +        :param doc_rev: str +        :param content: the Content of the document +        :type content: str +        :param gen: the Document Generation +        :type gen: int +        :param trans_id: Transaction ID +        :type trans_id: str +        :param idx: The index of this document in the current sync process. +        :type idx: int + +        :return: A deferred that will fire when the operation in the database +                 has finished. +        :rtype: twisted.internet.defer.Deferred +        """ +        if not isinstance(content, str): +            content = json.dumps(content) +        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ +                % self.TABLE_NAME +        return self._sync_db.runOperation( +            query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) + +    def _delete_received_doc(self, doc_id): +        """ +        Delete a received doc after it was inserted into the local db. + +        :param doc_id: Document ID. +        :type doc_id: str + +        :return: A deferred that will fire when the operation in the database +                 has finished. +        :rtype: twisted.internet.defer.Deferred +        """ +        query = "DELETE FROM '%s' WHERE doc_id=?" \ +                % self.TABLE_NAME +        return self._sync_db.runOperation(query, (doc_id,)) + +    def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx, +                     workers=True): +        """ +        Symmetrically decrypt a document and store in the sync db. + +        :param doc_id: The ID for the document with contents to be encrypted. +        :type doc: str +        :param rev: The revision of the document. +        :type rev: str +        :param content: The serialized content of the document. +        :type content: str +        :param gen: The generation corresponding to the modification of that +                    document. +        :type gen: int +        :param trans_id: The transaction id corresponding to the modification +                         of that document. +        :type trans_id: str +        :param idx: The index of this document in the current sync process. +        :type idx: int +        :param workers: Whether to defer the decryption to the multiprocess +                        pool of workers. Useful for debugging purposes. +        :type workers: bool + +        :return: A deferred that will fire after the document hasa been +                 decrypted and inserted in the sync db. +        :rtype: twisted.internet.defer.Deferred +        """ +        # insert_doc_cb is a proxy object that gets updated with the right +        # insert function only when the sync_target invokes the sync_exchange +        # method. so, if we don't still have a non-empty callback, we refuse +        # to proceed. +        if sameProxiedObjects( +                self._insert_doc_cb.get(self.source_replica_uid), +                None): +            logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") +            return + +        soledad_assert(self._crypto is not None, "need a crypto object") + +        content = json.loads(content) +        key = self._crypto.doc_passphrase(doc_id) +        secret = self._crypto.secret +        args = doc_id, rev, content, gen, trans_id, key, secret, idx + +        if workers: +            # when using multiprocessing, we need to wait for all parallel +            # processing to finish before continuing with the +            # decrypt-and-process loop. We do this by using an extra deferred +            # that will be fired by the multiprocessing callback when it has +            # finished processing. +            d1 = defer.Deferred() + +            def _multiprocessing_callback(result): +                d2 = self._decrypt_doc_cb(result) +                d2.addCallback(lambda defres: d1.callback(defres)) + +            # save the async result object so we can inspect it for failures +            self._async_results.append( +                self._pool.apply_async( +                    decrypt_doc_task, args, +                    callback=_multiprocessing_callback)) + +            return d1 +        else: +            # decrypt inline +            res = decrypt_doc_task(*args) +            return self._decrypt_doc_cb(res) + +    def _decrypt_doc_cb(self, result): +        """ +        Store the decryption result in the sync db from where it will later be +        picked by _process_decrypted. + +        :param result: A tuple containing the document's id, revision, +                       content, generation, transaction id and sync index. +        :type result: tuple(str, str, str, int, str, int) + +        :return: A deferred that will fire after the document has been +                 inserted in the sync db. +        :rtype: twisted.internet.defer.Deferred +        """ +        doc_id, rev, content, gen, trans_id, idx = result +        logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" +                     % (doc_id, rev, gen, trans_id)) +        return self.insert_received_doc( +            doc_id, rev, content, gen, trans_id, idx) + +    def _get_docs(self, encrypted=None, order_by='idx', order='ASC'): +        """ +        Get documents from the received docs table in the sync db. + +        :param encrypted: If not None, only return documents with encrypted +                          field equal to given parameter. +        :type encrypted: bool or None +        :param order_by: The name of the field to order results. +        :type order_by: str +        :param order: Whether the order should be ASC or DESC. +        :type order: str + +        :return: A deferred that will fire with the results of the database +                 query. +        :rtype: twisted.internet.defer.Deferred +        """ +        query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ +                "idx FROM %s" % self.TABLE_NAME +        if encrypted is not None: +            query += " WHERE encrypted = %d" % int(encrypted) +        query += " ORDER BY %s %s" % (order_by, order) +        return self._sync_db.runQuery(query) + +    @defer.inlineCallbacks +    def _get_insertable_docs(self): +        """ +        Return a list of non-encrypted documents ready to be inserted. + +        :return: A deferred that will fire with the list of insertable +                 documents. +        :rtype: twisted.internet.defer.Deferred +        """ +        # here, we fetch the list of decrypted documents and compare with the +        # index of the last succesfully processed document. +        decrypted_docs = yield self._get_docs(encrypted=False) +        insertable = [] +        last_idx = self._last_inserted_idx +        for doc_id, rev, content, gen, trans_id, encrypted, idx in \ +                decrypted_docs: +            # XXX for some reason, a document might not have been deleted from +            #     the database. This is a bug. In this point, already +            #     processed documents should have been removed from the sync +            #     database and we should not have to skip them here. We need +            #     to find out why this is happening, fix, and remove the +            #     skipping below. +            if (idx < last_idx + 1): +                continue +            if (idx != last_idx + 1): +                break +            insertable.append((doc_id, rev, content, gen, trans_id, idx)) +            last_idx += 1 +        defer.returnValue(insertable) + +    def _decrypt_received_docs(self): +        """ +        Get all the encrypted documents from the sync database and dispatch a +        decrypt worker to decrypt each one of them. + +        :return: A deferred that will fire after all documents have been +                 decrypted and inserted back in the sync db. +        :rtype: twisted.internet.defer.Deferred +        """ + +        def _callback(received_docs): +            deferreds = [] +            for doc_id, rev, content, gen, trans_id, _, idx in received_docs: +                deferreds.append( +                    self._decrypt_doc( +                        doc_id, rev, content, gen, trans_id, idx)) +            return defer.gatherResults(deferreds) + +        d = self._get_docs(encrypted=True) +        d.addCallback(_callback) +        return d + +    def _process_decrypted(self): +        """ +        Fetch as many decrypted documents as can be taken from the expected +        order and insert them in the database. + +        :return: A deferred that will fire with the list of inserted +                 documents. +        :rtype: twisted.internet.defer.Deferred +        """ + +        def _callback(insertable): +            for doc_fields in insertable: +                self._insert_decrypted_local_doc(*doc_fields) +            return insertable + +        d = self._get_insertable_docs() +        d.addCallback(_callback) +        return d + +    def _delete_processed_docs(self, inserted): +        """ +        Delete from the sync db documents that have been processed. + +        :param inserted: List of documents inserted in the previous process +                         step. +        :type inserted: list + +        :return: A list of deferreds that will fire when each operation in the +                 database has finished. +        :rtype: twisted.internet.defer.DeferredList +        """ +        deferreds = [] +        for doc_id, doc_rev, _, _, _, _ in inserted: +            deferreds.append( +                self._delete_received_doc(doc_id)) +        if not deferreds: +            return defer.succeed(None) +        return defer.gatherResults(deferreds) + +    def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, +                                    gen, trans_id, idx): +        """ +        Insert the decrypted document into the local sqlcipher database. +        Makes use of the passed callback `return_doc_cb` passed to the caller +        by u1db sync. + +        :param doc_id: The document id. +        :type doc_id: str +        :param doc_rev: The document revision. +        :type doc_rev: str +        :param content: The serialized content of the document. +        :type content: str +        :param gen: The generation corresponding to the modification of that +                    document. +        :type gen: int +        :param trans_id: The transaction id corresponding to the modification +                         of that document. +        :type trans_id: str +        """ +        # could pass source_replica in params for callback chain +        insert_fun = self._insert_doc_cb[self.source_replica_uid] +        logger.debug("Sync decrypter pool: inserting doc in local db: " +                     "%s:%s %s" % (doc_id, doc_rev, gen)) + +        # convert deleted documents to avoid error on document creation +        if content == 'null': +            content = None +        doc = SoledadDocument(doc_id, doc_rev, content) +        gen = int(gen) +        insert_fun(doc, gen, trans_id) + +        # store info about processed docs +        self._last_inserted_idx = idx +        self._processed_docs += 1 + +    def _empty(self): +        """ +        Empty the received docs table of the sync database. + +        :return: A deferred that will fire when the operation in the database +                 has finished. +        :rtype: twisted.internet.defer.Deferred +        """ +        query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) +        return self._sync_db.runOperation(query) + +    def _raise_if_async_fails(self): +        """ +        Raise any exception raised by a multiprocessing async decryption +        call. + +        :raise Exception: Raised if an async call has raised an exception. +        """ +        for res in self._async_results: +            if res.ready(): +                if not res.successful(): +                    # re-raise the exception raised by the remote call +                    res.get() + +    def _decrypt_and_process_docs(self): +        """ +        Decrypt the documents received from remote replica and insert them +        into the local one. + +        This method runs in its own thread, so sleeping will not interfere +        with the main thread. +        """ +        try: +            # wait for database to be emptied +            self._empty_db.wait() +            # wait until we know how many documents we need to process +            while self._docs_to_process is None: +                time.sleep(self.DECRYPT_LOOP_PERIOD) +            # because all database operations are asynchronous, we use an event to +            # make sure we don't start the next loop before the current one has +            # finished. +            event = threading.Event() +            # loop until we have processes as many docs as the number of changes +            while self._processed_docs < self._docs_to_process: +                if sameProxiedObjects( +                        self._insert_doc_cb.get(self.source_replica_uid), +                        None): +                    continue +                event.clear() +                d = self._decrypt_received_docs() +                d.addCallback(lambda _: self._raise_if_async_fails()) +                d.addCallback(lambda _: self._process_decrypted()) +                d.addCallback(self._delete_processed_docs) +                d.addCallback(lambda _: event.set()) +                event.wait() +                # sleep a bit to give time for some decryption work +                time.sleep(self.DECRYPT_LOOP_PERIOD) +        except Exception as e: +            self._exception = e +        self._finished.set() + +    def wait(self): +        """ +        Wait for the decrypt-and-process loop to finish. +        """ +        self._finished.wait() +        if self._exception: +            raise self._exception diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 4f7ecd1b..d3b3d01b 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -66,7 +66,7 @@ from twisted.python.threadpool import ThreadPool  from twisted.python import log  from twisted.enterprise import adbapi -from leap.soledad.client import crypto +from leap.soledad.client import encdecpool  from leap.soledad.client.target import SoledadSyncTarget  from leap.soledad.client.target import PendingReceivedDocsSyncError  from leap.soledad.client.sync import SoledadSynchronizer @@ -489,7 +489,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          if defer_encryption:              # initialize syncing queue encryption pool -            self._sync_enc_pool = crypto.SyncEncrypterPool( +            self._sync_enc_pool = encdecpool.SyncEncrypterPool(                  self._crypto, self._sync_db)              # ----------------------------------------------------------------- @@ -578,8 +578,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          :rtype: tuple of strings          """          maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)" -        encr = crypto.SyncEncrypterPool -        decr = crypto.SyncDecrypterPool +        encr = encdecpool.SyncEncrypterPool +        decr = encdecpool.SyncDecrypterPool          sql_encr_table_query = (maybe_create % (              encr.TABLE_NAME, encr.FIELD_NAMES))          sql_decr_table_query = (maybe_create % ( diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 06cef1ee..17ce718f 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -43,7 +43,8 @@ from leap.soledad.common.document import SoledadDocument  from leap.soledad.client.auth import TokenBasedAuth  from leap.soledad.client.crypto import is_symmetrically_encrypted  from leap.soledad.client.crypto import encrypt_doc, decrypt_doc -from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool +from leap.soledad.client.encdecpool import SyncEncrypterPool +from leap.soledad.client.encdecpool import SyncDecrypterPool  from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS  from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS  from leap.soledad.client.events import signal @@ -787,9 +788,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          self._crypto = crypto          self._stopped = True          self._stop_lock = threading.Lock() -        self._sync_exchange_lock = threading.Lock()          self.source_replica_uid = source_replica_uid -        self._defer_decryption = False          self._syncer_pool = None          # deferred decryption attributes @@ -813,9 +812,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          """          Tear down the SyncDecrypterPool.          """ -        if self._sync_decr_pool is not None: -            self._sync_decr_pool.close() -            self._sync_decr_pool = None +        self._sync_decr_pool.close() +        self._sync_decr_pool = None      def _get_replica_uid(self, url):          """ @@ -903,7 +901,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              doc = SoledadDocument(doc_id, rev, content)              if is_symmetrically_encrypted(doc):                  if self._queue_for_decrypt: -                    self._save_encrypted_received_doc( +                    self._enqueue_encrypted_received_doc(                          doc, gen, trans_id, idx, total)                  else:                      # defer_decryption is False or no-sync-db fallback @@ -913,7 +911,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                  # not symmetrically encrypted doc, insert it directly                  # or save it in the decrypted stage.                  if self._queue_for_decrypt: -                    self._save_received_doc(doc, gen, trans_id, idx, total) +                    self._enqueue_received_doc(doc, gen, trans_id, idx, total)                  else:                      self._return_doc_cb(doc, gen, trans_id)              # ------------------------------------------------------------- @@ -996,6 +994,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                  self.stop(fail=True)                  break +            if defer_decryption: +                self._setup_sync_decr_pool() +              t.doc_syncer.set_request_method(                  'get', idx, sync_id, last_known_generation,                  last_known_trans_id) @@ -1021,6 +1022,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                  t.join()                  if t.success:                      number_of_changes, _, _ = t.result +                    if defer_decryption and number_of_changes: +                        self._sync_decr_pool.set_docs_to_process( +                            number_of_changes)                  else:                      raise t.exception                  first_request = False @@ -1053,6 +1057,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                  new_generation = doc_data['gen']                  new_transaction_id = doc_data['trans_id'] +        # decrypt docs in case of deferred decryption +        if defer_decryption: +            self._sync_decr_pool.wait() +            self._teardown_sync_decr_pool() +          return new_generation, new_transaction_id      def sync_exchange(self, docs_by_generations, @@ -1103,14 +1112,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          """          self._ensure_callback = ensure_callback -        if defer_decryption and self._sync_db is not None: -            self._sync_exchange_lock.acquire() -            self._setup_sync_decr_pool() -            self._defer_decryption = True -        else: -            # fall back -            defer_decryption = False -          self.start()          if sync_id is None: @@ -1120,10 +1121,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          setProxiedObject(self._insert_doc_cb[source_replica_uid],                           return_doc_cb) -        # empty the database before starting a new sync -        if defer_decryption is True and not self.clear_to_sync(): -            self._sync_decr_pool.empty() -          self._ensure_connection()          if self._trace_hook:  # for tests              self._trace_hook('sync_exchange') @@ -1257,6 +1254,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              trans_id_after_send = response_dict['new_transaction_id']          # get docs from target +        if self._sync_db is None: +            defer_decryption = False          if self.stopped is False:              cur_target_gen, cur_target_trans_id = self._get_remote_docs(                  url, @@ -1266,12 +1265,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          self._syncer_pool.cleanup() -        # decrypt docs in case of deferred decryption -        if defer_decryption: -            self._sync_decr_pool.wait() -            self._teardown_sync_decr_pool() -            self._sync_exchange_lock.release() -          # update gen and trans id info in case we just sent and did not          # receive docs.          if gen_after_send is not None and gen_after_send > cur_target_gen: @@ -1357,7 +1350,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                      encr.TABLE_NAME,))                  self._sync_db.execute(sql, (doc_id, doc_rev)) -    def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): +    def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total):          """          Save a symmetrically encrypted incoming document into the received          docs table in the sync db. A decryption task will pick it up @@ -1378,9 +1371,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              "Enqueueing doc for decryption: %d/%d."              % (idx + 1, total))          self._sync_decr_pool.insert_encrypted_received_doc( -            doc.doc_id, doc.rev, doc.content, gen, trans_id) +            doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) -    def _save_received_doc(self, doc, gen, trans_id, idx, total): +    def _enqueue_received_doc(self, doc, gen, trans_id, idx, total):          """          Save any incoming document into the received docs table in the sync db. @@ -1399,23 +1392,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              "Enqueueing doc, no decryption needed: %d/%d."              % (idx + 1, total))          self._sync_decr_pool.insert_received_doc( -            doc.doc_id, doc.rev, doc.content, gen, trans_id) +            doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1)      #      # Symmetric decryption of syncing docs      # -    def clear_to_sync(self): -        """ -        Return whether sync can proceed (ie, the received db table is empty). - -        :return: Whether sync can proceed. -        :rtype: bool -        """ -        if self._sync_decr_pool: -            return self._sync_decr_pool.clear_to_sync() -        return True -      def set_decryption_callback(self, cb):          """          Set callback to be called when the decryption finishes. diff --git a/scripts/db_access/client_side_db.py b/scripts/db_access/client_side_db.py index d7c54b66..a047b522 100644 --- a/scripts/db_access/client_side_db.py +++ b/scripts/db_access/client_side_db.py @@ -10,6 +10,7 @@ import requests  import srp._pysrp as srp  import binascii  import logging +import json  from twisted.internet import reactor  from twisted.internet.defer import inlineCallbacks @@ -147,6 +148,9 @@ def _parse_args():          '--passphrase', '-p', default=None,          help='the user passphrase')      parser.add_argument( +        '--get-all-docs', '-a', action='store_true', +        help='get all documents from the local database') +    parser.add_argument(          '--sync', '-s', action='store_true',          help='synchronize with the server replica')      parser.add_argument( @@ -196,12 +200,21 @@ def _export_incoming_messages(soledad, directory):          i += 1 +@inlineCallbacks +def _get_all_docs(soledad): +    _, docs = yield soledad.get_all_docs() +    for doc in docs: +        print json.dumps(doc.content, indent=4) + +  # main program  @inlineCallbacks  def _main(soledad, km, args):      if args.sync:          yield soledad.sync() +    if args.get_all_docs: +        yield _get_all_docs(soledad)      if args.export_private_key:          yield _export_key(args, km, args.export_private_key, private=True)      if args.export_public_key: | 
