diff options
22 files changed, 1800 insertions, 2384 deletions
| diff --git a/client/changes/bug_6757_fix-order-of-insertion-when-syncing b/client/changes/bug_6757_fix-order-of-insertion-when-syncing new file mode 100644 index 00000000..c0470f5a --- /dev/null +++ b/client/changes/bug_6757_fix-order-of-insertion-when-syncing @@ -0,0 +1,2 @@ +  o Fix the order of insertion of documents when using workers for decrypting +    incoming documents during a sync. Closes #6757. diff --git a/client/changes/bug_6892_fix-log-message-for-local-secret b/client/changes/bug_6892_fix-log-message-for-local-secret new file mode 100644 index 00000000..39c13257 --- /dev/null +++ b/client/changes/bug_6892_fix-log-message-for-local-secret @@ -0,0 +1,2 @@ +  o Fix the log message when a local secret is not found so it's less +    confusing. Closes #6892. diff --git a/client/changes/bug_always-initialize-the-sync-db b/client/changes/bug_always-initialize-the-sync-db new file mode 100644 index 00000000..2b12989a --- /dev/null +++ b/client/changes/bug_always-initialize-the-sync-db @@ -0,0 +1,2 @@ +  o Always initialize the sync db to allow for both asynchronous encryption +    and asynchronous decryption when syncing. diff --git a/client/changes/bug_fix-async-decrypt b/client/changes/bug_fix-async-decrypt new file mode 100644 index 00000000..eb0ce7b5 --- /dev/null +++ b/client/changes/bug_fix-async-decrypt @@ -0,0 +1,2 @@ +  o Refactor asynchronous encryption/decryption code to its own file. +  o Fix logging and graceful failing when exceptions are raised during sync. diff --git a/client/changes/bug_improve-log-when-fetching-documents b/client/changes/bug_improve-log-when-fetching-documents new file mode 100644 index 00000000..a67ce028 --- /dev/null +++ b/client/changes/bug_improve-log-when-fetching-documents @@ -0,0 +1 @@ +  o Improve log messages when concurrently fetching documents from the server. diff --git a/client/changes/feature_add-pool-of-http-https-connections b/client/changes/feature_add-pool-of-http-https-connections new file mode 100644 index 00000000..7ff2a4ee --- /dev/null +++ b/client/changes/feature_add-pool-of-http-https-connections @@ -0,0 +1,2 @@ +  o Add a pool of HTTP/HTTPS connections that is able to verify the server +    certificate against a given CA certificate. diff --git a/client/changes/feature_use-twisted-adbapi-for-sync-db b/client/changes/feature_use-twisted-adbapi-for-sync-db new file mode 100644 index 00000000..41e5e6e3 --- /dev/null +++ b/client/changes/feature_use-twisted-adbapi-for-sync-db @@ -0,0 +1 @@ +  o Use twisted.enterprise.adbapi for access to the sync database. diff --git a/client/changes/feature_use-twisted-web-for-client-sync b/client/changes/feature_use-twisted-web-for-client-sync new file mode 100644 index 00000000..b4d1d4a4 --- /dev/null +++ b/client/changes/feature_use-twisted-web-for-client-sync @@ -0,0 +1 @@ +  o Use twisted.web.client for client sync. diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 7ad10db5..5b882bbe 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -32,6 +32,7 @@ from zope.proxy import ProxyBase, setProxiedObject  from pysqlcipher.dbapi2 import OperationalError  from leap.soledad.client import sqlcipher as soledad_sqlcipher +from leap.soledad.client.pragmas import set_init_pragmas  logger = logging.getLogger(name=__name__) @@ -72,7 +73,7 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher"):      :rtype: U1DBConnectionPool      """      if openfun is None and driver == "pysqlcipher": -        openfun = partial(soledad_sqlcipher.set_init_pragmas, opts=opts) +        openfun = partial(set_init_pragmas, opts=opts)      return U1DBConnectionPool(          "%s.dbapi2" % driver, database=opts.path,          check_same_thread=False, cp_openfun=openfun, diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 0f29503f..91e0a4a0 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -272,7 +272,8 @@ class Soledad(object):          replica_uid = self._dbpool.replica_uid          self._dbsyncer = SQLCipherU1DBSync(              self._sqlcipher_opts, self._crypto, replica_uid, -            self._defer_encryption) +            SOLEDAD_CERT, +            defer_encryption=self._defer_encryption)      #      # Closing methods @@ -630,6 +631,7 @@ class Soledad(object):              Whether to defer decryption of documents, or do it inline while              syncing.          :type defer_decryption: bool +          :return: A deferred whose callback will be invoked with the local              generation before the synchronization was performed.          :rtype: twisted.internet.defer.Deferred @@ -650,7 +652,7 @@ class Soledad(object):          sync_url = urlparse.urljoin(self._server_url, 'user-%s' % self.uuid)          d = self._dbsyncer.sync(              sync_url, -            creds=self._creds, autocreate=False, +            creds=self._creds,              defer_decryption=defer_decryption)          def _sync_callback(local_gen): @@ -658,21 +660,16 @@ class Soledad(object):                  soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid)              return local_gen -        # prevent sync failures from crashing the app by adding an errback -        # that logs the failure and does not propagate it down the callback -        # chain          def _sync_errback(failure):              s = StringIO()              failure.printDetailedTraceback(file=s)              msg = "Soledad exception when syncing!\n" + s.getvalue()              logger.error(msg) +            return failure          d.addCallbacks(_sync_callback, _sync_errback)          return d -    def stop_sync(self): -        self._dbsyncer.stop_sync() -      @property      def syncing(self):          """ diff --git a/client/src/leap/soledad/client/auth.py b/client/src/leap/soledad/client/auth.py index 72ab0008..6dfabeb4 100644 --- a/client/src/leap/soledad/client/auth.py +++ b/client/src/leap/soledad/client/auth.py @@ -14,15 +14,13 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. - -  """  Methods for token-based authentication.  These methods have to be included in all classes that extend HTTPClient so  they can do token-based auth requests to the Soledad server.  """ - +import base64  from u1db import errors @@ -49,7 +47,7 @@ class TokenBasedAuth(object):          Return an authorization header to be included in the HTTP request, in          the form: -            [('Authorization', 'Token <base64 encoded creds')] +            [('Authorization', 'Token <(base64 encoded) uuid:token>')]          :param method: The HTTP method.          :type method: str @@ -64,7 +62,8 @@ class TokenBasedAuth(object):          if 'token' in self._creds:              uuid, token = self._creds['token']              auth = '%s:%s' % (uuid, token) -            return [('Authorization', 'Token %s' % auth.encode('base64')[:-1])] +            b64_token = base64.b64encode(auth) +            return [('Authorization', 'Token %s' % b64_token)]          else:              raise errors.UnknownAuthMethod(                  'Wrong credentials: %s' % self._creds) diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 107bf7f1..bdbaa8e0 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -23,17 +23,13 @@ import hmac  import hashlib  import json  import logging -import multiprocessing -import threading  from pycryptopp.cipher.aes import AES  from pycryptopp.cipher.xsalsa20 import XSalsa20 -from zope.proxy import sameProxiedObjects  from leap.soledad.common import soledad_assert  from leap.soledad.common import soledad_assert_type  from leap.soledad.common import crypto -from leap.soledad.common.document import SoledadDocument  logger = logging.getLogger(__name__) @@ -227,7 +223,7 @@ class SoledadCrypto(object):  #  def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv, -        mac_method, secret): +            mac_method, secret):      """      Calculate a MAC for C{doc} using C{ciphertext}. @@ -378,7 +374,7 @@ def decrypt_doc(crypto, doc):  def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, -        enc_iv, mac_method, secret, doc_mac): +                    enc_iv, mac_method, secret, doc_mac):      """      Verify that C{doc_mac} is a correct MAC for the given document. @@ -511,525 +507,3 @@ def is_symmetrically_encrypted(doc):                  == crypto.EncryptionSchemes.SYMKEY:              return True      return False - - -# -# Encrypt/decrypt pools of workers -# - -class SyncEncryptDecryptPool(object): -    """ -    Base class for encrypter/decrypter pools. -    """ -    WORKERS = multiprocessing.cpu_count() - -    def __init__(self, crypto, sync_db, write_lock): -        """ -        Initialize the pool of encryption-workers. - -        :param crypto: A SoledadCryto instance to perform the encryption. -        :type crypto: leap.soledad.crypto.SoledadCrypto - -        :param sync_db: A database connection handle -        :type sync_db: pysqlcipher.dbapi2.Connection - -        :param write_lock: a write lock for controlling concurrent access -                           to the sync_db -        :type write_lock: threading.Lock -        """ -        self._pool = multiprocessing.Pool(self.WORKERS) -        self._crypto = crypto -        self._sync_db = sync_db -        self._sync_db_write_lock = write_lock - -    def close(self): -        """ -        Cleanly close the pool of workers. -        """ -        logger.debug("Closing %s" % (self.__class__.__name__,)) -        self._pool.close() -        try: -            self._pool.join() -        except Exception: -            pass - -    def terminate(self): -        """ -        Terminate the pool of workers. -        """ -        logger.debug("Terminating %s" % (self.__class__.__name__,)) -        self._pool.terminate() - - -def encrypt_doc_task(doc_id, doc_rev, content, key, secret): -    """ -    Encrypt the content of the given document. - -    :param doc_id: The document id. -    :type doc_id: str -    :param doc_rev: The document revision. -    :type doc_rev: str -    :param content: The serialized content of the document. -    :type content: str -    :param key: The encryption key. -    :type key: str -    :param secret: The Soledad storage secret (used for MAC auth). -    :type secret: str - -    :return: A tuple containing the doc id, revision and encrypted content. -    :rtype: tuple(str, str, str) -    """ -    encrypted_content = encrypt_docstr( -        content, doc_id, doc_rev, key, secret) -    return doc_id, doc_rev, encrypted_content - - -class SyncEncrypterPool(SyncEncryptDecryptPool): -    """ -    Pool of workers that spawn subprocesses to execute the symmetric encryption -    of documents to be synced. -    """ -    # TODO implement throttling to reduce cpu usage?? -    WORKERS = multiprocessing.cpu_count() -    TABLE_NAME = "docs_tosync" -    FIELD_NAMES = "doc_id, rev, content" - -    def encrypt_doc(self, doc, workers=True): -        """ -        Symmetrically encrypt a document. - -        :param doc: The document with contents to be encrypted. -        :type doc: SoledadDocument - -        :param workers: Whether to defer the decryption to the multiprocess -                        pool of workers. Useful for debugging purposes. -        :type workers: bool -        """ -        soledad_assert(self._crypto is not None, "need a crypto object") -        docstr = doc.get_json() -        key = self._crypto.doc_passphrase(doc.doc_id) -        secret = self._crypto.secret -        args = doc.doc_id, doc.rev, docstr, key, secret - -        try: -            if workers: -                res = self._pool.apply_async( -                    encrypt_doc_task, args, -                    callback=self.encrypt_doc_cb) -            else: -                # encrypt inline -                res = encrypt_doc_task(*args) -                self.encrypt_doc_cb(res) - -        except Exception as exc: -            logger.exception(exc) - -    def encrypt_doc_cb(self, result): -        """ -        Insert results of encryption routine into the local sync database. - -        :param result: A tuple containing the doc id, revision and encrypted -                       content. -        :type result: tuple(str, str, str) -        """ -        doc_id, doc_rev, content = result -        self.insert_encrypted_local_doc(doc_id, doc_rev, content) - -    def insert_encrypted_local_doc(self, doc_id, doc_rev, content): -        """ -        Insert the contents of the encrypted doc into the local sync -        database. - -        :param doc_id: The document id. -        :type doc_id: str -        :param doc_rev: The document revision. -        :type doc_rev: str -        :param content: The serialized content of the document. -        :type content: str -        :param content: The encrypted document. -        :type content: str -        """ -        # FIXME --- callback should complete immediately since otherwise the -        # thread which handles the results will get blocked -        # Right now we're blocking the dispatcher with the writes to sqlite. -        sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) -        sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,) - -        con = self._sync_db -        with self._sync_db_write_lock: -            con.execute(sql_del, (doc_id, )) -            con.execute(sql_ins, (doc_id, doc_rev, content)) - - -def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): -    """ -    Decrypt the content of the given document. - -    :param doc_id: The document id. -    :type doc_id: str -    :param doc_rev: The document revision. -    :type doc_rev: str -    :param content: The encrypted content of the document. -    :type content: str -    :param gen: The generation corresponding to the modification of that -                document. -    :type gen: int -    :param trans_id: The transaction id corresponding to the modification of -                     that document. -    :type trans_id: str -    :param key: The encryption key. -    :type key: str -    :param secret: The Soledad storage secret (used for MAC auth). -    :type secret: str - -    :return: A tuple containing the doc id, revision and encrypted content. -    :rtype: tuple(str, str, str) -    """ -    decrypted_content = decrypt_doc_dict( -        content, doc_id, doc_rev, key, secret) -    return doc_id, doc_rev, decrypted_content, gen, trans_id - - -class SyncDecrypterPool(SyncEncryptDecryptPool): -    """ -    Pool of workers that spawn subprocesses to execute the symmetric decryption -    of documents that were received. - -    The decryption of the received documents is done in two steps: - -        1. All the encrypted docs are collected, together with their generation -           and transaction-id -        2. The docs are enqueued for decryption. When completed, they are -           inserted following the generation order. -    """ -    # TODO implement throttling to reduce cpu usage?? -    TABLE_NAME = "docs_received" -    FIELD_NAMES = "doc_id, rev, content, gen, trans_id, encrypted" - -    write_encrypted_lock = threading.Lock() - -    def __init__(self, *args, **kwargs): -        """ -        Initialize the decrypter pool, and setup a dict for putting the -        results of the decrypted docs until they are picked by the insert -        routine that gets them in order. - -        :param insert_doc_cb: A callback for inserting received documents from -                              target. If not overriden, this will call u1db -                              insert_doc_from_target in synchronizer, which -                              implements the TAKE OTHER semantics. -        :type insert_doc_cb: function -        :param last_known_generation: Target's last known generation. -        :type last_known_generation: int -        """ -        self._insert_doc_cb = kwargs.pop("insert_doc_cb") -        SyncEncryptDecryptPool.__init__(self, *args, **kwargs) -        self.source_replica_uid = None -        self._async_results = [] - -    def set_source_replica_uid(self, source_replica_uid): -        """ -        Set the source replica uid for this decrypter pool instance. - -        :param source_replica_uid: The uid of the source replica. -        :type source_replica_uid: str -        """ -        self.source_replica_uid = source_replica_uid - -    def insert_encrypted_received_doc(self, doc_id, doc_rev, content, -                                      gen, trans_id): -        """ -        Insert a received message with encrypted content, to be decrypted later -        on. - -        :param doc_id: The Document ID. -        :type doc_id: str -        :param doc_rev: The Document Revision -        :param doc_rev: str -        :param content: the Content of the document -        :type content: str -        :param gen: the Document Generation -        :type gen: int -        :param trans_id: Transaction ID -        :type trans_id: str -        """ -        docstr = json.dumps(content) -        sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) -        sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( -            self.TABLE_NAME,) - -        con = self._sync_db -        with self._sync_db_write_lock: -            con.execute(sql_del, (doc_id, )) -            con.execute( -                sql_ins, -                (doc_id, doc_rev, docstr, gen, trans_id, 1)) - -    def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): -        """ -        Insert a document that is not symmetrically encrypted. -        We store it in the staging area (the decrypted_docs dictionary) to be -        picked up in order as the preceding documents are decrypted. - -        :param doc_id: The Document ID. -        :type doc_id: str -        :param doc_rev: The Document Revision -        :param doc_rev: str -        :param content: the Content of the document -        :type content: str -        :param gen: the Document Generation -        :type gen: int -        :param trans_id: Transaction ID -        :type trans_id: str -        """ -        if not isinstance(content, str): -            content = json.dumps(content) -        sql_del = "DELETE FROM '%s' WHERE doc_id=?" % ( -            self.TABLE_NAME,) -        sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( -            self.TABLE_NAME,) -        con = self._sync_db -        with self._sync_db_write_lock: -            con.execute(sql_del, (doc_id,)) -            con.execute( -                sql_ins, -                (doc_id, doc_rev, content, gen, trans_id, 0)) - -    def delete_received_doc(self, doc_id, doc_rev): -        """ -        Delete a received doc after it was inserted into the local db. - -        :param doc_id: Document ID. -        :type doc_id: str -        :param doc_rev: Document revision. -        :type doc_rev: str -        """ -        sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( -            self.TABLE_NAME,) -        con = self._sync_db -        with self._sync_db_write_lock: -            con.execute(sql_del, (doc_id, doc_rev)) - -    def decrypt_doc(self, doc_id, rev, content, gen, trans_id, -                    source_replica_uid, workers=True): -        """ -        Symmetrically decrypt a document. - -        :param doc_id: The ID for the document with contents to be encrypted. -        :type doc: str -        :param rev: The revision of the document. -        :type rev: str -        :param content: The serialized content of the document. -        :type content: str -        :param gen: The generation corresponding to the modification of that -                    document. -        :type gen: int -        :param trans_id: The transaction id corresponding to the modification -                         of that document. -        :type trans_id: str -        :param source_replica_uid: -        :type source_replica_uid: str - -        :param workers: Whether to defer the decryption to the multiprocess -                        pool of workers. Useful for debugging purposes. -        :type workers: bool -        """ -        self.source_replica_uid = source_replica_uid - -        # insert_doc_cb is a proxy object that gets updated with the right -        # insert function only when the sync_target invokes the sync_exchange -        # method. so, if we don't still have a non-empty callback, we refuse -        # to proceed. -        if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), -                              None): -            logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") -            return - -        soledad_assert(self._crypto is not None, "need a crypto object") - -        if len(content) == 0: -            # not encrypted payload -            return - -        content = json.loads(content) -        key = self._crypto.doc_passphrase(doc_id) -        secret = self._crypto.secret -        args = doc_id, rev, content, gen, trans_id, key, secret - -        if workers: -            # save the async result object so we can inspect it for failures -            self._async_results.append(self._pool.apply_async( -                decrypt_doc_task, args, -                callback=self.decrypt_doc_cb)) -        else: -            # decrypt inline -            res = decrypt_doc_task(*args) -            self.decrypt_doc_cb(res) - -    def decrypt_doc_cb(self, result): -        """ -        Store the decryption result in the sync db from where it will later be -        picked by process_decrypted. - -        :param result: A tuple containing the doc id, revision and encrypted -        content. -        :type result: tuple(str, str, str) -        """ -        doc_id, rev, content, gen, trans_id = result -        logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" -                     % (doc_id, rev, gen, trans_id)) -        self.insert_received_doc(doc_id, rev, content, gen, trans_id) - -    def get_docs_by_generation(self, encrypted=None): -        """ -        Get all documents in the received table from the sync db, -        ordered by generation. - -        :param encrypted: If not None, only return documents with encrypted -                          field equal to given parameter. -        :type encrypted: bool or None - -        :return: list of doc_id, rev, generation, gen, trans_id -        :rtype: list -        """ -        sql = "SELECT doc_id, rev, content, gen, trans_id, encrypted FROM %s" \ -              % self.TABLE_NAME -        if encrypted is not None: -            sql += " WHERE encrypted = %d" % int(encrypted) -        sql += " ORDER BY gen ASC" -        return self._fetchall(sql) - -    def get_insertable_docs_by_gen(self): -        """ -        Return a list of non-encrypted documents ready to be inserted. -        """ -        # here, we compare the list of all available docs with the list of -        # decrypted docs and find the longest common prefix between these two -        # lists. Note that the order of lists fetch matters: if instead we -        # first fetch the list of decrypted docs and then the list of all -        # docs, then some document might have been decrypted between these two -        # calls, and if it is just the right doc then it might not be caught -        # by the next loop. -        all_docs = self.get_docs_by_generation() -        decrypted_docs = self.get_docs_by_generation(encrypted=False) -        insertable = [] -        for doc_id, rev, _, gen, trans_id, encrypted in all_docs: -            for next_doc_id, _, next_content, _, _, _ in decrypted_docs: -                if doc_id == next_doc_id: -                    content = next_content -                    insertable.append((doc_id, rev, content, gen, trans_id)) -                else: -                    break -        return insertable - -    def count_docs_in_sync_db(self, encrypted=None): -        """ -        Count how many documents we have in the table for received docs. - -        :param encrypted: If not None, return count of documents with -                          encrypted field equal to given parameter. -        :type encrypted: bool or None - -        :return: The count of documents. -        :rtype: int -        """ -        if self._sync_db is None: -            logger.warning("cannot return count with null sync_db") -            return -        sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) -        if encrypted is not None: -            sql += " WHERE encrypted = %d" % int(encrypted) -        res = self._fetchall(sql) -        if res: -            val = res.pop() -            return val[0] -        else: -            return 0 - -    def decrypt_received_docs(self): -        """ -        Get all the encrypted documents from the sync database and dispatch a -        decrypt worker to decrypt each one of them. -        """ -        docs_by_generation = self.get_docs_by_generation(encrypted=True) -        for doc_id, rev, content, gen, trans_id, _ \ -                in filter(None, docs_by_generation): -            self.decrypt_doc( -                doc_id, rev, content, gen, trans_id, self.source_replica_uid) - -    def process_decrypted(self): -        """ -        Process the already decrypted documents, and insert as many documents -        as can be taken from the expected order without finding a gap. - -        :return: Whether we have processed all the pending docs. -        :rtype: bool -        """ -        # Acquire the lock to avoid processing while we're still -        # getting data from the syncing stream, to avoid InvalidGeneration -        # problems. -        with self.write_encrypted_lock: -            for doc_fields in self.get_insertable_docs_by_gen(): -                self.insert_decrypted_local_doc(*doc_fields) -        remaining = self.count_docs_in_sync_db() -        return remaining == 0 - -    def insert_decrypted_local_doc(self, doc_id, doc_rev, content, -                                   gen, trans_id): -        """ -        Insert the decrypted document into the local sqlcipher database. -        Makes use of the passed callback `return_doc_cb` passed to the caller -        by u1db sync. - -        :param doc_id: The document id. -        :type doc_id: str -        :param doc_rev: The document revision. -        :type doc_rev: str -        :param content: The serialized content of the document. -        :type content: str -        :param gen: The generation corresponding to the modification of that -                    document. -        :type gen: int -        :param trans_id: The transaction id corresponding to the modification -                         of that document. -        :type trans_id: str -        """ -        # could pass source_replica in params for callback chain -        insert_fun = self._insert_doc_cb[self.source_replica_uid] -        logger.debug("Sync decrypter pool: inserting doc in local db: " -                     "%s:%s %s" % (doc_id, doc_rev, gen)) - -        # convert deleted documents to avoid error on document creation -        if content == 'null': -            content = None -        doc = SoledadDocument(doc_id, doc_rev, content) -        gen = int(gen) -        insert_fun(doc, gen, trans_id) - -        # If no errors found, remove it from the received database. -        self.delete_received_doc(doc_id, doc_rev) - -    def empty(self): -        """ -        Empty the received docs table of the sync database. -        """ -        sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) -        self._sync_db.execute(sql) - -    def _fetchall(self, *args, **kwargs): -        with self._sync_db: -            c = self._sync_db.cursor() -            c.execute(*args, **kwargs) -            return c.fetchall() - -    def raise_in_case_of_failed_async_calls(self): -        """ -        Re-raise any exception raised by an async call. - -        :raise Exception: Raised if an async call has raised an exception. -        """ -        for res in self._async_results: -            if res.ready(): -                if not res.successful(): -                    # re-raise the exception raised by the remote call -                    res.get() diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py new file mode 100644 index 00000000..c0a05d38 --- /dev/null +++ b/client/src/leap/soledad/client/encdecpool.py @@ -0,0 +1,745 @@ +# -*- coding: utf-8 -*- +# encdecpool.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +A pool of encryption/decryption concurrent and parallel workers for using +during synchronization. +""" + + +import multiprocessing +import json +import logging + +from twisted.internet import reactor +from twisted.internet import defer +from twisted.internet.threads import deferToThread + +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common import soledad_assert + +from leap.soledad.client.crypto import encrypt_docstr +from leap.soledad.client.crypto import decrypt_doc_dict + + +logger = logging.getLogger(__name__) + + +# +# Encrypt/decrypt pools of workers +# + +class SyncEncryptDecryptPool(object): +    """ +    Base class for encrypter/decrypter pools. +    """ + +    # TODO implement throttling to reduce cpu usage?? +    WORKERS = multiprocessing.cpu_count() + +    def __init__(self, crypto, sync_db): +        """ +        Initialize the pool of encryption-workers. + +        :param crypto: A SoledadCryto instance to perform the encryption. +        :type crypto: leap.soledad.crypto.SoledadCrypto + +        :param sync_db: A database connection handle +        :type sync_db: pysqlcipher.dbapi2.Connection +        """ +        self._crypto = crypto +        self._sync_db = sync_db +        self._pool = multiprocessing.Pool(self.WORKERS) + +    def close(self): +        """ +        Cleanly close the pool of workers. +        """ +        logger.debug("Closing %s" % (self.__class__.__name__,)) +        self._pool.close() +        try: +            self._pool.join() +        except Exception: +            pass + +    def terminate(self): +        """ +        Terminate the pool of workers. +        """ +        logger.debug("Terminating %s" % (self.__class__.__name__,)) +        self._pool.terminate() + +    def _runOperation(self, query, *args): +        """ +        Run an operation on the sync db. + +        :param query: The query to be executed. +        :type query: str +        :param args: A list of query arguments. +        :type args: list + +        :return: A deferred that will fire when the operation in the database +                 has finished. +        :rtype: twisted.internet.defer.Deferred +        """ +        return self._sync_db.runOperation(query, *args) + +    def _runQuery(self, query, *args): +        """ +        Run a query on the sync db. + +        :param query: The query to be executed. +        :type query: str +        :param args: A list of query arguments. +        :type args: list + +        :return: A deferred that will fire with the results of the database +                 query. +        :rtype: twisted.internet.defer.Deferred +        """ +        return self._sync_db.runQuery(query, *args) + + +def encrypt_doc_task(doc_id, doc_rev, content, key, secret): +    """ +    Encrypt the content of the given document. + +    :param doc_id: The document id. +    :type doc_id: str +    :param doc_rev: The document revision. +    :type doc_rev: str +    :param content: The serialized content of the document. +    :type content: str +    :param key: The encryption key. +    :type key: str +    :param secret: The Soledad storage secret (used for MAC auth). +    :type secret: str + +    :return: A tuple containing the doc id, revision and encrypted content. +    :rtype: tuple(str, str, str) +    """ +    encrypted_content = encrypt_docstr( +        content, doc_id, doc_rev, key, secret) +    return doc_id, doc_rev, encrypted_content + + +class SyncEncrypterPool(SyncEncryptDecryptPool): +    """ +    Pool of workers that spawn subprocesses to execute the symmetric encryption +    of documents to be synced. +    """ +    TABLE_NAME = "docs_tosync" +    FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" + +    ENCRYPT_LOOP_PERIOD = 0.5 + +    def __init__(self, *args, **kwargs): +        """ +        Initialize the sync encrypter pool. +        """ +        SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + +        self._stopped = False +        self._sync_queue = multiprocessing.Queue() + +        # start the encryption loop +        self._deferred_loop = deferToThread(self._encrypt_docs_loop) +        self._deferred_loop.addCallback( +            lambda _: logger.debug("Finished encrypter thread.")) + +    def enqueue_doc_for_encryption(self, doc): +        """ +        Enqueue a document for encryption. + +        :param doc: The document to be encrypted. +        :type doc: SoledadDocument +        """ +        try: +            self.sync_queue.put_nowait(doc) +        except multiprocessing.Queue.Full: +            # do not asynchronously encrypt this file if the queue is full +            pass + +    def _encrypt_docs_loop(self): +        """ +        Process the syncing queue and send the documents there to be encrypted +        in the sync db. They will be read by the SoledadSyncTarget during the +        sync_exchange. +        """ +        logger.debug("Starting encrypter thread.") +        while not self._stopped: +            try: +                doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD) +                self._encrypt_doc(doc) +            except multiprocessing.Queue.Empty: +                pass + +    def _encrypt_doc(self, doc): +        """ +        Symmetrically encrypt a document. + +        :param doc: The document with contents to be encrypted. +        :type doc: SoledadDocument + +        :param workers: Whether to defer the decryption to the multiprocess +                        pool of workers. Useful for debugging purposes. +        :type workers: bool +        """ +        soledad_assert(self._crypto is not None, "need a crypto object") +        docstr = doc.get_json() +        key = self._crypto.doc_passphrase(doc.doc_id) +        secret = self._crypto.secret +        args = doc.doc_id, doc.rev, docstr, key, secret +        # encrypt asynchronously +        self._pool.apply_async( +            encrypt_doc_task, args, +            callback=self._encrypt_doc_cb) + +    def _encrypt_doc_cb(self, result): +        """ +        Insert results of encryption routine into the local sync database. + +        :param result: A tuple containing the doc id, revision and encrypted +                       content. +        :type result: tuple(str, str, str) +        """ +        doc_id, doc_rev, content = result +        return self._insert_encrypted_local_doc(doc_id, doc_rev, content) + +    def _insert_encrypted_local_doc(self, doc_id, doc_rev, content): +        """ +        Insert the contents of the encrypted doc into the local sync +        database. + +        :param doc_id: The document id. +        :type doc_id: str +        :param doc_rev: The document revision. +        :type doc_rev: str +        :param content: The serialized content of the document. +        :type content: str +        """ +        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ +                % (self.TABLE_NAME,) +        return self._runOperation(query, (doc_id, doc_rev, content)) + +    @defer.inlineCallbacks +    def get_encrypted_doc(self, doc_id, doc_rev): +        """ +        Get an encrypted document from the sync db. + +        :param doc_id: The id of the document. +        :type doc_id: str +        :param doc_rev: The revision of the document. +        :type doc_rev: str + +        :return: A deferred that will fire with the encrypted content of the +                 document or None if the document was not found in the sync +                 db. +        :rtype: twisted.internet.defer.Deferred +        """ +        logger.debug("Trying to get encrypted doc from sync db: %s" % doc_id) +        query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \ +                % self.TABLE_NAME +        result = yield self._runQuery(query, (doc_id, doc_rev)) +        if result: +            val = result.pop() +            defer.returnValue(val[0]) +        defer.returnValue(None) + +    def delete_encrypted_doc(self, doc_id, doc_rev): +        """ +        Delete an encrypted document from the sync db. + +        :param doc_id: The id of the document. +        :type doc_id: str +        :param doc_rev: The revision of the document. +        :type doc_rev: str + +        :return: A deferred that will fire when the operation in the database +                 has finished. +        :rtype: twisted.internet.defer.Deferred +        """ +        query = "DELETE FROM %s WHERE doc_id=? and rev=?" \ +                % self.TABLE_NAME +        self._runOperation(query, (doc_id, doc_rev)) + +    def close(self): +        """ +        Close the encrypter pool. +        """ +        self._stopped = True +        self._sync_queue.close() +        q = self._sync_queue +        del q +        self._sync_queue = None + + +def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, +                     idx): +    """ +    Decrypt the content of the given document. + +    :param doc_id: The document id. +    :type doc_id: str +    :param doc_rev: The document revision. +    :type doc_rev: str +    :param content: The encrypted content of the document. +    :type content: str +    :param gen: The generation corresponding to the modification of that +                document. +    :type gen: int +    :param trans_id: The transaction id corresponding to the modification of +                     that document. +    :type trans_id: str +    :param key: The encryption key. +    :type key: str +    :param secret: The Soledad storage secret (used for MAC auth). +    :type secret: str +    :param idx: The index of this document in the current sync process. +    :type idx: int + +    :return: A tuple containing the doc id, revision and encrypted content. +    :rtype: tuple(str, str, str) +    """ +    decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) +    return doc_id, doc_rev, decrypted_content, gen, trans_id, idx + + +class SyncDecrypterPool(SyncEncryptDecryptPool): +    """ +    Pool of workers that spawn subprocesses to execute the symmetric decryption +    of documents that were received. + +    The decryption of the received documents is done in two steps: + +        1. Encrypted documents are stored in the sync db by the actual soledad +           sync loop. +        2. The soledad sync loop tells us how many documents we should expect +           to process. +        3. We start a decrypt-and-process loop: + +            a. Encrypted documents are fetched. +            b. Encrypted documents are decrypted. +            c. The longest possible list of decrypted documents are inserted +               in the soledad db (this depends on which documents have already +               arrived and which documents have already been decrypte, because +               the order of insertion in the local soledad db matters). +            d. Processed documents are deleted from the database. + +        4. When we have processed as many documents as we should, the loop +           finishes. +    """ +    # TODO implement throttling to reduce cpu usage?? +    TABLE_NAME = "docs_received" +    FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ +                  "trans_id, encrypted, idx" + +    """ +    Period of recurrence of the periodic decrypting task, in seconds. +    """ +    DECRYPT_LOOP_PERIOD = 0.5 + +    def __init__(self, *args, **kwargs): +        """ +        Initialize the decrypter pool, and setup a dict for putting the +        results of the decrypted docs until they are picked by the insert +        routine that gets them in order. + +        :param insert_doc_cb: A callback for inserting received documents from +                              target. If not overriden, this will call u1db +                              insert_doc_from_target in synchronizer, which +                              implements the TAKE OTHER semantics. +        :type insert_doc_cb: function +        :param source_replica_uid: The source replica uid, used to find the +                                   correct callback for inserting documents. +        :type source_replica_uid: str +        """ +        self._insert_doc_cb = kwargs.pop("insert_doc_cb") +        self.source_replica_uid = kwargs.pop("source_replica_uid") +        SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + +        self._last_inserted_idx = 0 +        self._docs_to_process = None +        self._processed_docs = 0 + +        self._async_results = [] +        self._failure = None +        self._finished = False + +        # XXX we want to empty the database before starting, but this is an +        #     asynchronous call, so we have to somehow make sure that it is +        #     executed before any other call to the database, without +        #     blocking. +        self._empty() + +    def _launch_decrypt_and_process(self): +        d = self._decrypt_and_process_docs() +        d.addErrback(lambda f: self._set_failure(f)) + +    def _schedule_decrypt_and_process(self): +        reactor.callLater( +            self.DECRYPT_LOOP_PERIOD, +            self._launch_decrypt_and_process) + +    @property +    def failure(self): +        return self._failure + +    def _set_failure(self, failure): +        self._failure = failure +        self._finished = True + +    def failed(self): +        return bool(self._failure) + +    def start(self, docs_to_process): +        """ +        Set the number of documents we expect to process. + +        This should be called by the during the sync exchange process as soon +        as we know how many documents are arriving from the server. + +        :param docs_to_process: The number of documents to process. +        :type docs_to_process: int +        """ +        self._docs_to_process = docs_to_process +        self._schedule_decrypt_and_process() + +    def insert_encrypted_received_doc( +            self, doc_id, doc_rev, content, gen, trans_id, idx): +        """ +        Insert a received message with encrypted content, to be decrypted later +        on. + +        :param doc_id: The Document ID. +        :type doc_id: str +        :param doc_rev: The Document Revision +        :param doc_rev: str +        :param content: the Content of the document +        :type content: str +        :param gen: the Document Generation +        :type gen: int +        :param trans_id: Transaction ID +        :type trans_id: str +        :param idx: The index of this document in the current sync process. +        :type idx: int + +        :return: A deferred that will fire when the operation in the database +                 has finished. +        :rtype: twisted.internet.defer.Deferred +        """ +        docstr = json.dumps(content) +        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ +                % self.TABLE_NAME +        return self._runOperation( +            query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx)) + +    def insert_received_doc( +            self, doc_id, doc_rev, content, gen, trans_id, idx): +        """ +        Insert a document that is not symmetrically encrypted. +        We store it in the staging area (the decrypted_docs dictionary) to be +        picked up in order as the preceding documents are decrypted. + +        :param doc_id: The Document ID. +        :type doc_id: str +        :param doc_rev: The Document Revision +        :param doc_rev: str +        :param content: the Content of the document +        :type content: str +        :param gen: the Document Generation +        :type gen: int +        :param trans_id: Transaction ID +        :type trans_id: str +        :param idx: The index of this document in the current sync process. +        :type idx: int + +        :return: A deferred that will fire when the operation in the database +                 has finished. +        :rtype: twisted.internet.defer.Deferred +        """ +        if not isinstance(content, str): +            content = json.dumps(content) +        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ +                % self.TABLE_NAME +        return self._runOperation( +            query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) + +    def _delete_received_doc(self, doc_id): +        """ +        Delete a received doc after it was inserted into the local db. + +        :param doc_id: Document ID. +        :type doc_id: str + +        :return: A deferred that will fire when the operation in the database +                 has finished. +        :rtype: twisted.internet.defer.Deferred +        """ +        query = "DELETE FROM '%s' WHERE doc_id=?" \ +                % self.TABLE_NAME +        return self._runOperation(query, (doc_id,)) + +    def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx): +        """ +        Dispatch an asynchronous document decrypting routine and save the +        result object. + +        :param doc_id: The ID for the document with contents to be encrypted. +        :type doc: str +        :param rev: The revision of the document. +        :type rev: str +        :param content: The serialized content of the document. +        :type content: str +        :param gen: The generation corresponding to the modification of that +                    document. +        :type gen: int +        :param trans_id: The transaction id corresponding to the modification +                         of that document. +        :type trans_id: str +        :param idx: The index of this document in the current sync process. +        :type idx: int + +        :return: A deferred that will fire after the document hasa been +                 decrypted and inserted in the sync db. +        :rtype: twisted.internet.defer.Deferred +        """ +        soledad_assert(self._crypto is not None, "need a crypto object") + +        content = json.loads(content) +        key = self._crypto.doc_passphrase(doc_id) +        secret = self._crypto.secret +        args = doc_id, rev, content, gen, trans_id, key, secret, idx +        # decrypt asynchronously +        self._async_results.append( +            self._pool.apply_async( +                decrypt_doc_task, args)) + +    def _decrypt_doc_cb(self, result): +        """ +        Store the decryption result in the sync db from where it will later be +        picked by _process_decrypted_docs. + +        :param result: A tuple containing the document's id, revision, +                       content, generation, transaction id and sync index. +        :type result: tuple(str, str, str, int, str, int) + +        :return: A deferred that will fire after the document has been +                 inserted in the sync db. +        :rtype: twisted.internet.defer.Deferred +        """ +        doc_id, rev, content, gen, trans_id, idx = result +        logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" +                     % (doc_id, rev, gen, trans_id)) +        return self.insert_received_doc( +            doc_id, rev, content, gen, trans_id, idx) + +    def _get_docs(self, encrypted=None, order_by='idx', order='ASC'): +        """ +        Get documents from the received docs table in the sync db. + +        :param encrypted: If not None, only return documents with encrypted +                          field equal to given parameter. +        :type encrypted: bool or None +        :param order_by: The name of the field to order results. +        :type order_by: str +        :param order: Whether the order should be ASC or DESC. +        :type order: str + +        :return: A deferred that will fire with the results of the database +                 query. +        :rtype: twisted.internet.defer.Deferred +        """ +        query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ +                "idx FROM %s" % self.TABLE_NAME +        if encrypted is not None: +            query += " WHERE encrypted = %d" % int(encrypted) +        query += " ORDER BY %s %s" % (order_by, order) +        return self._runQuery(query) + +    @defer.inlineCallbacks +    def _get_insertable_docs(self): +        """ +        Return a list of non-encrypted documents ready to be inserted. + +        :return: A deferred that will fire with the list of insertable +                 documents. +        :rtype: twisted.internet.defer.Deferred +        """ +        # here, we fetch the list of decrypted documents and compare with the +        # index of the last succesfully processed document. +        decrypted_docs = yield self._get_docs(encrypted=False) +        insertable = [] +        last_idx = self._last_inserted_idx +        for doc_id, rev, content, gen, trans_id, encrypted, idx in \ +                decrypted_docs: +            # XXX for some reason, a document might not have been deleted from +            #     the database. This is a bug. In this point, already +            #     processed documents should have been removed from the sync +            #     database and we should not have to skip them here. We need +            #     to find out why this is happening, fix, and remove the +            #     skipping below. +            if (idx < last_idx + 1): +                continue +            if (idx != last_idx + 1): +                break +            insertable.append((doc_id, rev, content, gen, trans_id, idx)) +            last_idx += 1 +        defer.returnValue(insertable) + +    @defer.inlineCallbacks +    def _async_decrypt_received_docs(self): +        """ +        Get all the encrypted documents from the sync database and dispatch a +        decrypt worker to decrypt each one of them. + +        :return: A deferred that will fire after all documents have been +                 decrypted and inserted back in the sync db. +        :rtype: twisted.internet.defer.Deferred +        """ +        docs = yield self._get_docs(encrypted=True) +        for doc_id, rev, content, gen, trans_id, _, idx in docs: +            self._async_decrypt_doc( +                doc_id, rev, content, gen, trans_id, idx) + +    @defer.inlineCallbacks +    def _process_decrypted_docs(self): +        """ +        Fetch as many decrypted documents as can be taken from the expected +        order and insert them in the local replica. + +        :return: A deferred that will fire with the list of inserted +                 documents. +        :rtype: twisted.internet.defer.Deferred +        """ +        insertable = yield self._get_insertable_docs() +        for doc_fields in insertable: +            self._insert_decrypted_local_doc(*doc_fields) +        defer.returnValue(insertable) + +    def _delete_processed_docs(self, inserted): +        """ +        Delete from the sync db documents that have been processed. + +        :param inserted: List of documents inserted in the previous process +                         step. +        :type inserted: list + +        :return: A list of deferreds that will fire when each operation in the +                 database has finished. +        :rtype: twisted.internet.defer.DeferredList +        """ +        deferreds = [] +        for doc_id, doc_rev, _, _, _, _ in inserted: +            deferreds.append( +                self._delete_received_doc(doc_id)) +        if not deferreds: +            return defer.succeed(None) +        return defer.gatherResults(deferreds) + +    def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, +                                    gen, trans_id, idx): +        """ +        Insert the decrypted document into the local replica. + +        Make use of the passed callback `insert_doc_cb` passed to the caller +        by u1db sync. + +        :param doc_id: The document id. +        :type doc_id: str +        :param doc_rev: The document revision. +        :type doc_rev: str +        :param content: The serialized content of the document. +        :type content: str +        :param gen: The generation corresponding to the modification of that +                    document. +        :type gen: int +        :param trans_id: The transaction id corresponding to the modification +                         of that document. +        :type trans_id: str +        """ +        # could pass source_replica in params for callback chain +        logger.debug("Sync decrypter pool: inserting doc in local db: " +                     "%s:%s %s" % (doc_id, doc_rev, gen)) + +        # convert deleted documents to avoid error on document creation +        if content == 'null': +            content = None +        doc = SoledadDocument(doc_id, doc_rev, content) +        gen = int(gen) +        self._insert_doc_cb(doc, gen, trans_id) + +        # store info about processed docs +        self._last_inserted_idx = idx +        self._processed_docs += 1 + +    def _empty(self): +        """ +        Empty the received docs table of the sync database. + +        :return: A deferred that will fire when the operation in the database +                 has finished. +        :rtype: twisted.internet.defer.Deferred +        """ +        query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) +        return self._runOperation(query) + +    def _collect_async_decryption_results(self): +        """ +        Collect the results of the asynchronous doc decryptions and re-raise +        any exception raised by a multiprocessing async decryption call. + +        :raise Exception: Raised if an async call has raised an exception. +        """ +        async_results = self._async_results[:] +        for res in async_results: +            if res.ready(): +                self._decrypt_doc_cb(res.get())  # might raise an exception! +                self._async_results.remove(res) + +    @defer.inlineCallbacks +    def _decrypt_and_process_docs(self): +        """ +        Decrypt the documents received from remote replica and insert them +        into the local one. + +        This method implicitelly returns a defferred (see the decorator +        above). It should only be called by _launch_decrypt_and_process(). +        because this way any exceptions raised here will be stored by the +        errback attached to the deferred returned. + +        :return: A deferred which will fire after all decrypt, process and +                 delete operations have been executed. +        :rtype: twisted.internet.defer.Deferred +        """ +        if not self.failed(): +            if self._processed_docs < self._docs_to_process: +                yield self._async_decrypt_received_docs() +                yield self._collect_async_decryption_results() +                docs = yield self._process_decrypted_docs() +                yield self._delete_processed_docs(docs) +                # recurse +                self._schedule_decrypt_and_process() +            else: +                self._finished = True + +    def has_finished(self): +        """ +        Return whether the decrypter has finished its work. +        """ +        return self._finished diff --git a/client/src/leap/soledad/client/http_client.py b/client/src/leap/soledad/client/http_client.py new file mode 100644 index 00000000..b08d199e --- /dev/null +++ b/client/src/leap/soledad/client/http_client.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +# http_client.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Twisted HTTP/HTTPS client. +""" + +import os + +from zope.interface import implements + +from OpenSSL.crypto import load_certificate +from OpenSSL.crypto import FILETYPE_PEM + +from twisted.internet import reactor +from twisted.internet.ssl import ClientContextFactory +from twisted.internet.ssl import CertificateOptions +from twisted.internet.defer import succeed + +from twisted.web.client import Agent +from twisted.web.client import HTTPConnectionPool +from twisted.web.client import readBody +from twisted.web.http_headers import Headers +from twisted.web.error import Error +from twisted.web.iweb import IBodyProducer + + +from leap.soledad.common.errors import InvalidAuthTokenError + + +# +# Setup a pool of connections +# + +_pool = HTTPConnectionPool(reactor, persistent=True) +_pool.maxPersistentPerHost = 10 +_agent = None + +# if we ever want to trust the system's CAs, we should use an agent like this: +# from twisted.web.client import BrowserLikePolicyForHTTPS +# _agent = Agent(reactor, BrowserLikePolicyForHTTPS(), pool=_pool) + + +# +# SSL/TLS certificate configuration +# + +def configure_certificate(cert_file): +    """ +    Configure an agent that verifies server certificates against a CA cert +    file. + +    :param cert_file: The path to the certificate file. +    :type cert_file: str +    """ +    global _agent +    cert = _load_cert(cert_file) +    _agent = Agent( +        reactor, +        SoledadClientContextFactory(cert), +        pool=_pool) + + +def _load_cert(cert_file): +    """ +    Load a X509 certificate from a file. + +    :param cert_file: The path to the certificate file. +    :type cert_file: str + +    :return: The X509 certificate. +    :rtype: OpenSSL.crypto.X509 +    """ +    if os.path.exists(cert_file): +        with open(cert_file) as f: +            data = f.read() +            return load_certificate(FILETYPE_PEM, data) + + +class SoledadClientContextFactory(ClientContextFactory): +    """ +    A context factory that will verify the server's certificate against a +    given CA certificate. +    """ + +    def __init__(self, cacert): +        """ +        Initialize the context factory. + +        :param cacert: The CA certificate. +        :type cacert: OpenSSL.crypto.X509 +        """ +        self._cacert = cacert + +    def getContext(self, hostname, port): +        opts = CertificateOptions(verify=True, caCerts=[self._cacert]) +        return opts.getContext() + + +# +# HTTP request facilities +# + +def _unauth_to_invalid_token_error(failure): +    """ +    An errback to translate unauthorized errors to our own invalid token +    class. + +    :param failure: The original failure. +    :type failure: twisted.python.failure.Failure + +    :return: Either the original failure or an invalid auth token error. +    :rtype: twisted.python.failure.Failure +    """ +    failure.trap(Error) +    if failure.getErrorMessage() == "401 Unauthorized": +        raise InvalidAuthTokenError +    return failure + + +class StringBodyProducer(object): +    """ +    A producer that writes the body of a request to a consumer. +    """ + +    implements(IBodyProducer) + +    def __init__(self, body): +        """ +        Initialize the string produer. + +        :param body: The body of the request. +        :type body: str +        """ +        self.body = body +        self.length = len(body) + +    def startProducing(self, consumer): +        """ +        Write the body to the consumer. + +        :param consumer: Any IConsumer provider. +        :type consumer: twisted.internet.interfaces.IConsumer + +        :return: A successful deferred. +        :rtype: twisted.internet.defer.Deferred +        """ +        consumer.write(self.body) +        return succeed(None) + +    def pauseProducing(self): +        pass + +    def stopProducing(self): +        pass + + +def httpRequest(url, method='GET', body=None, headers={}): +    """ +    Perform an HTTP request. + +    :param url: The URL for the request. +    :type url: str +    :param method: The HTTP method of the request. +    :type method: str +    :param body: The body of the request, if any. +    :type body: str +    :param headers: The headers of the request. +    :type headers: dict + +    :return: A deferred that fires with the body of the request. +    :rtype: twisted.internet.defer.Deferred +    """ +    if body: +        body = StringBodyProducer(body) +    d = _agent.request( +        method, url, headers=Headers(headers), bodyProducer=body) +    d.addCallbacks(readBody, _unauth_to_invalid_token_error) +    return d diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py new file mode 100644 index 00000000..dc6c0e0a --- /dev/null +++ b/client/src/leap/soledad/client/http_target.py @@ -0,0 +1,598 @@ +# -*- coding: utf-8 -*- +# http_target.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +A U1DB backend for encrypting data before sending to server and decrypting +after receiving. +""" + + +import json +import base64 +import logging + +from uuid import uuid4 +from functools import partial + +from twisted.internet import defer +from twisted.internet import reactor + +from u1db import errors +from u1db import SyncTarget +from u1db.remote import utils + +from leap.soledad.common.document import SoledadDocument + +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.crypto import encrypt_doc +from leap.soledad.client.crypto import decrypt_doc +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import signal +from leap.soledad.client.encdecpool import SyncDecrypterPool +from leap.soledad.client.http_client import httpRequest +from leap.soledad.client.http_client import configure_certificate + + +logger = logging.getLogger(__name__) + + +class SoledadHTTPSyncTarget(SyncTarget): +    """ +    A SyncTarget that encrypts data before sending and decrypts data after +    receiving. + +    Normally encryption will have been written to the sync database upon +    document modification. The sync database is also used to write temporarily +    the parsed documents that the remote send us, before being decrypted and +    written to the main database. +    """ + +    def __init__(self, url, source_replica_uid, creds, crypto, cert_file, +                 sync_db=None, sync_enc_pool=None): +        """ +        Initialize the sync target. + +        :param url: The server sync url. +        :type url: str +        :param source_replica_uid: The source replica uid which we use when +                                   deferring decryption. +        :type source_replica_uid: str +        :param url: The url of the target replica to sync with. +        :type url: str +        :param creds: A dictionary containing the uuid and token. +        :type creds: creds +        :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt +                        document contents when syncing. +        :type crypto: soledad.crypto.SoledadCrypto +        :param cert_file: Path to the certificate of the ca used to validate +                          the SSL certificate used by the remote soledad +                          server. +        :type cert_file: str +        :param sync_db: Optional. handler for the db with the symmetric +                        encryption of the syncing documents. If +                        None, encryption will be done in-place, +                        instead of retreiving it from the dedicated +                        database. +        :type sync_db: Sqlite handler +        :param verify_ssl: Whether we should perform SSL server certificate +                           verification. +        :type verify_ssl: bool +        """ +        if url.endswith("/"): +            url = url[:-1] +        self._url = str(url) + "/sync-from/" + source_replica_uid +        self.source_replica_uid = source_replica_uid +        self._auth_header = None +        self.set_creds(creds) +        self._crypto = crypto +        self._sync_db = sync_db +        self._sync_enc_pool = sync_enc_pool +        self._insert_doc_cb = None +        # asynchronous encryption/decryption attributes +        self._decryption_callback = None +        self._sync_decr_pool = None +        configure_certificate(cert_file) + +    def set_creds(self, creds): +        """ +        Update credentials. + +        :param creds: A dictionary containing the uuid and token. +        :type creds: dict +        """ +        uuid = creds['token']['uuid'] +        token = creds['token']['token'] +        auth = '%s:%s' % (uuid, token) +        b64_token = base64.b64encode(auth) +        self._auth_header = {'Authorization': ['Token %s' % b64_token]} + +    @property +    def _defer_encryption(self): +        return self._sync_enc_pool is not None + +    # +    # SyncTarget API +    # + +    @defer.inlineCallbacks +    def get_sync_info(self, source_replica_uid): +        """ +        Return information about known state of remote database. + +        Return the replica_uid and the current database generation of the +        remote database, and its last-seen database generation for the client +        replica. + +        :param source_replica_uid: The client-size replica uid. +        :type source_replica_uid: str + +        :return: A deferred which fires with (target_replica_uid, +                 target_replica_generation, target_trans_id, +                 source_replica_last_known_generation, +                 source_replica_last_known_transaction_id) +        :rtype: twisted.internet.defer.Deferred +        """ +        raw = yield httpRequest(self._url, headers=self._auth_header) +        res = json.loads(raw) +        defer.returnValue([ +            res['target_replica_uid'], +            res['target_replica_generation'], +            res['target_replica_transaction_id'], +            res['source_replica_generation'], +            res['source_transaction_id'] +        ]) + +    def record_sync_info( +            self, source_replica_uid, source_replica_generation, +            source_replica_transaction_id): +        """ +        Record tip information for another replica. + +        After sync_exchange has been processed, the caller will have +        received new content from this replica. This call allows the +        source replica instigating the sync to inform us what their +        generation became after applying the documents we returned. + +        This is used to allow future sync operations to not need to repeat data +        that we just talked about. It also means that if this is called at the +        wrong time, there can be database records that will never be +        synchronized. + +        :param source_replica_uid: The identifier for the source replica. +        :type source_replica_uid: str +        :param source_replica_generation: The database generation for the +                                          source replica. +        :type source_replica_generation: int +        :param source_replica_transaction_id: The transaction id associated +                                              with the source replica +                                              generation. +        :type source_replica_transaction_id: str + +        :return: A deferred which fires with the result of the query. +        :rtype: twisted.internet.defer.Deferred +        """ +        data = json.dumps({ +            'generation': source_replica_generation, +            'transaction_id': source_replica_transaction_id +        }) +        headers = self._auth_header.copy() +        headers.update({'content-type': ['application/json']}) +        return httpRequest( +            self._url, +            method='PUT', +            headers=headers, +            body=data) + +    @defer.inlineCallbacks +    def sync_exchange(self, docs_by_generation, source_replica_uid, +                      last_known_generation, last_known_trans_id, +                      insert_doc_cb, ensure_callback=None, +                      defer_decryption=True, sync_id=None): +        """ +        Find out which documents the remote database does not know about, +        encrypt and send them. After that, receive documents from the remote +        database. + +        :param docs_by_generations: A list of (doc_id, generation, trans_id) +                                    of local documents that were changed since +                                    the last local generation the remote +                                    replica knows about. +        :type docs_by_generations: list of tuples + +        :param source_replica_uid: The uid of the source replica. +        :type source_replica_uid: str + +        :param last_known_generation: Target's last known generation. +        :type last_known_generation: int + +        :param last_known_trans_id: Target's last known transaction id. +        :type last_known_trans_id: str + +        :param insert_doc_cb: A callback for inserting received documents from +                              target. If not overriden, this will call u1db +                              insert_doc_from_target in synchronizer, which +                              implements the TAKE OTHER semantics. +        :type insert_doc_cb: function + +        :param ensure_callback: A callback that ensures we know the target +                                replica uid if the target replica was just +                                created. +        :type ensure_callback: function + +        :param defer_decryption: Whether to defer the decryption process using +                                 the intermediate database. If False, +                                 decryption will be done inline. +        :type defer_decryption: bool + +        :return: A deferred which fires with the new generation and +                 transaction id of the target replica. +        :rtype: twisted.internet.defer.Deferred +        """ + +        self._ensure_callback = ensure_callback + +        if sync_id is None: +            sync_id = str(uuid4()) +        self.source_replica_uid = source_replica_uid + +        # save a reference to the callback so we can use it after decrypting +        self._insert_doc_cb = insert_doc_cb + +        gen_after_send, trans_id_after_send = yield self._send_docs( +            docs_by_generation, +            last_known_generation, +            last_known_trans_id, +            sync_id) + +        cur_target_gen, cur_target_trans_id = yield self._receive_docs( +            last_known_generation, last_known_trans_id, +            ensure_callback, sync_id, +            defer_decryption=defer_decryption) + +        # update gen and trans id info in case we just sent and did not +        # receive docs. +        if gen_after_send is not None and gen_after_send > cur_target_gen: +            cur_target_gen = gen_after_send +            cur_target_trans_id = trans_id_after_send + +        defer.returnValue([cur_target_gen, cur_target_trans_id]) + +    # +    # methods to send docs +    # + +    def _prepare(self, comma, entries, **dic): +        entry = comma + '\r\n' + json.dumps(dic) +        entries.append(entry) +        return len(entry) + +    @defer.inlineCallbacks +    def _send_docs(self, docs_by_generation, last_known_generation, +                   last_known_trans_id, sync_id): + +        if not docs_by_generation: +            defer.returnValue([None, None]) + +        headers = self._auth_header.copy() +        headers.update({'content-type': ['application/x-soledad-sync-put']}) +        # add remote replica metadata to the request +        first_entries = ['['] +        self._prepare( +            '', first_entries, +            last_known_generation=last_known_generation, +            last_known_trans_id=last_known_trans_id, +            sync_id=sync_id, +            ensure=self._ensure_callback is not None) +        idx = 0 +        total = len(docs_by_generation) +        for doc, gen, trans_id in docs_by_generation: +            idx += 1 +            result = yield self._send_one_doc( +                headers, first_entries, doc, +                gen, trans_id, total, idx) +            if self._defer_encryption: +                self._sync_enc_pool.delete_encrypted_doc( +                    doc.doc_id, doc.rev) +            signal(SOLEDAD_SYNC_SEND_STATUS, +                   "Soledad sync send status: %d/%d" +                   % (idx, total)) +        response_dict = json.loads(result)[0] +        gen_after_send = response_dict['new_generation'] +        trans_id_after_send = response_dict['new_transaction_id'] +        defer.returnValue([gen_after_send, trans_id_after_send]) + +    @defer.inlineCallbacks +    def _send_one_doc(self, headers, first_entries, doc, gen, trans_id, +                      number_of_docs, doc_idx): +        entries = first_entries[:] +        # add the document to the request +        content = yield self._encrypt_doc(doc) +        self._prepare( +            ',', entries, +            id=doc.doc_id, rev=doc.rev, content=content, gen=gen, +            trans_id=trans_id, number_of_docs=number_of_docs, +            doc_idx=doc_idx) +        entries.append('\r\n]') +        data = ''.join(entries) +        result = yield httpRequest( +            self._url, +            method='POST', +            headers=headers, +            body=data) +        defer.returnValue(result) + +    def _encrypt_doc(self, doc): +        d = None +        if doc.is_tombstone(): +            d = defer.succeed(None) +        elif not self._defer_encryption: +            # fallback case, for tests +            d = defer.succeed(encrypt_doc(self._crypto, doc)) +        else: + +            def _maybe_encrypt_doc_inline(doc_json): +                if doc_json is None: +                    # the document is not marked as tombstone, but we got +                    # nothing from the sync db. As it is not encrypted +                    # yet, we force inline encryption. +                    return encrypt_doc(self._crypto, doc) +                return doc_json + +            d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) +            d.addCallback(_maybe_encrypt_doc_inline) +        return d + +    # +    # methods to receive doc +    # + +    @defer.inlineCallbacks +    def _receive_docs(self, last_known_generation, last_known_trans_id, +                      ensure_callback, sync_id, defer_decryption): + +        self._queue_for_decrypt = defer_decryption \ +            and self._sync_db is not None + +        new_generation = last_known_generation +        new_transaction_id = last_known_trans_id + +        if self._queue_for_decrypt: +            logger.debug( +                "Soledad sync: will queue received docs for decrypting.") + +        if defer_decryption: +            self._setup_sync_decr_pool() + +        headers = self._auth_header.copy() +        headers.update({'content-type': ['application/x-soledad-sync-get']}) + +        #--------------------------------------------------------------------- +        # maybe receive the first document +        #--------------------------------------------------------------------- + +        # we fetch the first document before fetching the rest because we need +        # to know the total number of documents to be received, and this +        # information comes as metadata to each request. + +        d = self._receive_one_doc( +            headers, last_known_generation, last_known_trans_id, +            sync_id, 0) +        d.addCallback(partial(self._insert_received_doc, 1, 1)) +        number_of_changes, ngen, ntrans = yield d + +        if defer_decryption: +            self._sync_decr_pool.start(number_of_changes) + +        #--------------------------------------------------------------------- +        # maybe receive the rest of the documents +        #--------------------------------------------------------------------- + +        # launch many asynchronous fetches and inserts of received documents +        # in the temporary sync db. Will wait for all results before +        # continuing. + +        received = 1 +        deferreds = [] +        while received < number_of_changes: +            d = self._receive_one_doc( +                headers, last_known_generation, +                last_known_trans_id, sync_id, received) +            d.addCallback( +                partial( +                    self._insert_received_doc, +                    received + 1,  # the index of the current received doc +                    number_of_changes)) +            deferreds.append(d) +            received += 1 +        results = yield defer.gatherResults(deferreds) + +        # get generation and transaction id of target after insertions +        if deferreds: +            _, new_generation, new_transaction_id = results.pop() + +        #--------------------------------------------------------------------- +        # wait for async decryption to finish +        #--------------------------------------------------------------------- + +        # below we do a trick so we can wait for the SyncDecrypterPool to +        # finish its work before finally returning the new generation and +        # transaction id of the remote replica. To achieve that, we create a +        # Deferred that will return the results of the sync and, if we are +        # decrypting asynchronously, we use reactor.callLater() to +        # periodically poll the decrypter and check if it has finished its +        # work. When it has finished, we either call the callback or errback +        # of that deferred. In case we are not asynchronously decrypting, we +        # just fire the deferred. + +        def _shutdown_and_finish(res): +            self._sync_decr_pool.close() +            return new_generation, new_transaction_id + +        d = defer.Deferred() +        d.addCallback(_shutdown_and_finish) + +        def _wait_or_finish(): +            if not self._sync_decr_pool.has_finished(): +                reactor.callLater( +                    SyncDecrypterPool.DECRYPT_LOOP_PERIOD, +                    _wait_or_finish) +            else: +                if not self._sync_decr_pool.failed(): +                    d.callback(None) +                else: +                    d.errback(self._sync_decr_pool.failure) + +        if defer_decryption: +            _wait_or_finish() +        else: +            d.callback(None) + +        new_generation, new_transaction_id = yield d +        defer.returnValue([new_generation, new_transaction_id]) + +    def _receive_one_doc(self, headers, last_known_generation, +                         last_known_trans_id, sync_id, received): +        entries = ['['] +        # add remote replica metadata to the request +        self._prepare( +            '', entries, +            last_known_generation=last_known_generation, +            last_known_trans_id=last_known_trans_id, +            sync_id=sync_id, +            ensure=self._ensure_callback is not None) +        # inform server of how many documents have already been received +        self._prepare( +            ',', entries, received=received) +        entries.append('\r\n]') +        # send headers +        return httpRequest( +            self._url, +            method='POST', +            headers=headers, +            body=''.join(entries)) + +    def _insert_received_doc(self, idx, total, response): +        """ +        Insert a received document into the local replica. + +        :param idx: The index count of the current operation. +        :type idx: int +        :param total: The total number of operations. +        :type total: int +        :param response: The body and headers of the response. +        :type response: tuple(str, dict) +        """ +        new_generation, new_transaction_id, number_of_changes, doc_id, \ +            rev, content, gen, trans_id = \ +            self._parse_received_doc_response(response) +        if doc_id is not None: +            # decrypt incoming document and insert into local database +            # ------------------------------------------------------------- +            # symmetric decryption of document's contents +            # ------------------------------------------------------------- +            # If arriving content was symmetrically encrypted, we decrypt it. +            # We do it inline if defer_decryption flag is False or no sync_db +            # was defined, otherwise we defer it writing it to the received +            # docs table. +            doc = SoledadDocument(doc_id, rev, content) +            if is_symmetrically_encrypted(doc): +                if self._queue_for_decrypt: +                    self._sync_decr_pool.insert_encrypted_received_doc( +                        doc.doc_id, doc.rev, doc.content, gen, trans_id, +                        idx) +                else: +                    # defer_decryption is False or no-sync-db fallback +                    doc.set_json(decrypt_doc(self._crypto, doc)) +                    self._insert_doc_cb(doc, gen, trans_id) +            else: +                # not symmetrically encrypted doc, insert it directly +                # or save it in the decrypted stage. +                if self._queue_for_decrypt: +                    self._sync_decr_pool.insert_received_doc( +                        doc.doc_id, doc.rev, doc.content, gen, trans_id, +                        idx) +                else: +                    self._insert_doc_cb(doc, gen, trans_id) +            # ------------------------------------------------------------- +            # end of symmetric decryption +            # ------------------------------------------------------------- +        msg = "%d/%d" % (idx, total) +        signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg) +        logger.debug("Soledad sync receive status: %s" % msg) +        return number_of_changes, new_generation, new_transaction_id + +    def _parse_received_doc_response(self, response): +        """ +        Parse the response from the server containing the received document. + +        :param response: The body and headers of the response. +        :type response: tuple(str, dict) + +        :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, +                 content, gen, trans_id) +        :rtype: tuple +        """ +        # decode incoming stream +        parts = response.splitlines() +        if not parts or parts[0] != '[' or parts[-1] != ']': +            raise errors.BrokenSyncStream +        data = parts[1:-1] +        # decode metadata +        line, comma = utils.check_and_strip_comma(data[0]) +        metadata = None +        try: +            metadata = json.loads(line) +            new_generation = metadata['new_generation'] +            new_transaction_id = metadata['new_transaction_id'] +            number_of_changes = metadata['number_of_changes'] +        except (json.JSONDecodeError, KeyError): +            raise errors.BrokenSyncStream +        # make sure we have replica_uid from fresh new dbs +        if self._ensure_callback and 'replica_uid' in metadata: +            self._ensure_callback(metadata['replica_uid']) +        # parse incoming document info +        doc_id = None +        rev = None +        content = None +        gen = None +        trans_id = None +        if number_of_changes > 0: +            try: +                entry = json.loads(data[1]) +                doc_id = entry['id'] +                rev = entry['rev'] +                content = entry['content'] +                gen = entry['gen'] +                trans_id = entry['trans_id'] +            except (IndexError, KeyError): +                raise errors.BrokenSyncStream +        return new_generation, new_transaction_id, number_of_changes, \ +            doc_id, rev, content, gen, trans_id + +    def _setup_sync_decr_pool(self): +        """ +        Set up the SyncDecrypterPool for deferred decryption. +        """ +        if self._sync_decr_pool is None and self._sync_db is not None: +            # initialize syncing queue decryption pool +            self._sync_decr_pool = SyncDecrypterPool( +                self._crypto, +                self._sync_db, +                insert_doc_cb=self._insert_doc_cb, +                source_replica_uid=self.source_replica_uid) diff --git a/client/src/leap/soledad/client/pragmas.py b/client/src/leap/soledad/client/pragmas.py index 2e9c53a3..55397d10 100644 --- a/client/src/leap/soledad/client/pragmas.py +++ b/client/src/leap/soledad/client/pragmas.py @@ -19,10 +19,53 @@ Different pragmas used in the initialization of the SQLCipher database.  """  import logging  import string +import threading +import os + +from leap.soledad.common import soledad_assert +  logger = logging.getLogger(__name__) +_db_init_lock = threading.Lock() + + +def set_init_pragmas(conn, opts=None, extra_queries=None): +    """ +    Set the initialization pragmas. + +    This includes the crypto pragmas, and any other options that must +    be passed early to sqlcipher db. +    """ +    soledad_assert(opts is not None) +    extra_queries = [] if extra_queries is None else extra_queries +    with _db_init_lock: +        # only one execution path should initialize the db +        _set_init_pragmas(conn, opts, extra_queries) + + +def _set_init_pragmas(conn, opts, extra_queries): + +    sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') +    memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') +    nowal = os.environ.get('LEAP_SQLITE_NOWAL') + +    set_crypto_pragmas(conn, opts) + +    if not nowal: +        set_write_ahead_logging(conn) +    if sync_off: +        set_synchronous_off(conn) +    else: +        set_synchronous_normal(conn) +    if memstore: +        set_mem_temp_store(conn) + +    for query in extra_queries: +        conn.cursor().execute(query) + +  def set_crypto_pragmas(db_handle, sqlcipher_opts):      """      Set cryptographic params (key, cipher, KDF number of iterations and diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index af781a26..96f7e906 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -246,22 +246,26 @@ class SoledadSecrets(object):          :return: Whether there's a storage secret for symmetric encryption.          :rtype: bool          """ -        if self._secret_id is None or self._secret_id not in self._secrets: +        logger.info("Checking if there's a secret in local storage...") +        if (self._secret_id is None or self._secret_id not in self._secrets) \ +                and os.path.isfile(self._secrets_path):              try:                  self._load_secrets()  # try to load from disk              except IOError as e:                  logger.warning(                      'IOError while loading secrets from disk: %s' % str(e)) -                return False -        return self.storage_secret is not None + +        if self.storage_secret is not None: +            logger.info("Found a secret in local storage.") +            return True + +        logger.info("Could not find a secret in local storage.") +        return False      def _load_secrets(self):          """          Load storage secrets from local file.          """ -        # does the file exist in disk? -        if not os.path.isfile(self._secrets_path): -            raise IOError('File does not exist: %s' % self._secrets_path)          # read storage secrets from file          content = None          with open(self._secrets_path, 'r') as f: diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index db3cb5cb..8e7d39c2 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -42,7 +42,6 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases  handled by Soledad should be created by SQLCipher >= 2.0.  """  import logging -import multiprocessing  import os  import threading  import json @@ -54,19 +53,17 @@ from u1db.backends import sqlite_backend  from hashlib import sha256  from contextlib import contextmanager  from collections import defaultdict -from httplib import CannotSendRequest +from functools import partial  from pysqlcipher import dbapi2 as sqlcipher_dbapi2  from twisted.internet import reactor -from twisted.internet.task import LoopingCall  from twisted.internet.threads import deferToThreadPool  from twisted.python.threadpool import ThreadPool -from twisted.python import log +from twisted.enterprise import adbapi -from leap.soledad.client import crypto -from leap.soledad.client.target import SoledadSyncTarget -from leap.soledad.client.target import PendingReceivedDocsSyncError +from leap.soledad.client import encdecpool +from leap.soledad.client.http_target import SoledadHTTPSyncTarget  from leap.soledad.client.sync import SoledadSynchronizer  from leap.soledad.client import pragmas @@ -102,46 +99,14 @@ def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True):      conn = sqlcipher_dbapi2.connect(          opts.path, check_same_thread=check_same_thread) -    set_init_pragmas(conn, opts, extra_queries=on_init) +    pragmas.set_init_pragmas(conn, opts, extra_queries=on_init)      return conn -_db_init_lock = threading.Lock() - - -def set_init_pragmas(conn, opts=None, extra_queries=None): -    """ -    Set the initialization pragmas. - -    This includes the crypto pragmas, and any other options that must -    be passed early to sqlcipher db. -    """ -    soledad_assert(opts is not None) -    extra_queries = [] if extra_queries is None else extra_queries -    with _db_init_lock: -        # only one execution path should initialize the db -        _set_init_pragmas(conn, opts, extra_queries) - - -def _set_init_pragmas(conn, opts, extra_queries): - -    sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') -    memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') -    nowal = os.environ.get('LEAP_SQLITE_NOWAL') - -    pragmas.set_crypto_pragmas(conn, opts) - -    if not nowal: -        pragmas.set_write_ahead_logging(conn) -    if sync_off: -        pragmas.set_synchronous_off(conn) -    else: -        pragmas.set_synchronous_normal(conn) -    if memstore: -        pragmas.set_mem_temp_store(conn) - -    for query in extra_queries: -        conn.cursor().execute(query) +def initialize_sqlcipher_adbapi_db(opts, extra_queries=None): +    from leap.soledad.client import sqlcipher_adbapi +    return sqlcipher_adbapi.getConnectionPool( +        opts, extra_queries=extra_queries)  class SQLCipherOptions(object): @@ -151,22 +116,32 @@ class SQLCipherOptions(object):      @classmethod      def copy(cls, source, path=None, key=None, create=None, -            is_raw_key=None, cipher=None, kdf_iter=None, cipher_page_size=None, -            defer_encryption=None, sync_db_key=None): +             is_raw_key=None, cipher=None, kdf_iter=None, +             cipher_page_size=None, defer_encryption=None, sync_db_key=None):          """          Return a copy of C{source} with parameters different than None          replaced by new values.          """ -        return SQLCipherOptions( -            path if path else source.path, -            key if key else source.key, -            create=create if create else source.create, -            is_raw_key=is_raw_key if is_raw_key else source.is_raw_key, -            cipher=cipher if cipher else source.cipher, -            kdf_iter=kdf_iter if kdf_iter else source.kdf_iter, -            cipher_page_size=cipher_page_size if cipher_page_size else source.cipher_page_size, -            defer_encryption=defer_encryption if defer_encryption else source.defer_encryption, -            sync_db_key=sync_db_key if sync_db_key else source.sync_db_key) +        local_vars = locals() +        args = [] +        kwargs = {} + +        for name in ["path", "key"]: +            val = local_vars[name] +            if val is not None: +                args.append(val) +            else: +                args.append(getattr(source, name)) + +        for name in ["create", "is_raw_key", "cipher", "kdf_iter", +                     "cipher_page_size", "defer_encryption", "sync_db_key"]: +            val = local_vars[name] +            if val is not None: +                kwargs[name] = val +            else: +                kwargs[name] = getattr(source, name) + +        return SQLCipherOptions(*args, **kwargs)      def __init__(self, path, key, create=True, is_raw_key=False,                   cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024, @@ -307,10 +282,9 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          :rtype: str          """          doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) - -        # TODO XXX move to API XXX          if self.defer_encryption: -            self.sync_queue.put_nowait(doc) +            # TODO move to api? +            self._sync_enc_pool.enqueue_doc_for_encryption(doc)          return doc_rev      # @@ -440,7 +414,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):      Soledad syncer implementation.      """ -    _sync_loop = None      _sync_enc_pool = None      """ @@ -450,13 +423,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):      LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db'      """ -    A dictionary that hold locks which avoid multiple sync attempts from the -    same database replica. -    """ -    # XXX We do not need the lock here now. Remove. -    encrypting_lock = threading.Lock() - -    """      Period or recurrence of the Looping Call that will do the encryption to the      syncdb (in seconds).      """ @@ -468,19 +434,18 @@ class SQLCipherU1DBSync(SQLCipherDatabase):      """      syncing_lock = defaultdict(threading.Lock) -    def __init__(self, opts, soledad_crypto, replica_uid, +    def __init__(self, opts, soledad_crypto, replica_uid, cert_file,                   defer_encryption=False):          self._opts = opts          self._path = opts.path          self._crypto = soledad_crypto          self.__replica_uid = replica_uid +        self._cert_file = cert_file          self._sync_db_key = opts.sync_db_key          self._sync_db = None -        self._sync_db_write_lock = None          self._sync_enc_pool = None -        self.sync_queue = None          # we store syncers in a dictionary indexed by the target URL. We also          # store a hash of the auth info in case auth info expires and we need @@ -490,8 +455,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          #  self._syncers = {'<url>': ('<auth_hash>', syncer), ...}          self._syncers = {} -        self._sync_db_write_lock = threading.Lock() -        self.sync_queue = multiprocessing.Queue()          self.running = False          self._sync_threadpool = None @@ -503,25 +466,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          self._db_handle = None          self._initialize_main_db() -        if defer_encryption: -            self._initialize_sync_db(opts) +        # the sync_db is used both for deferred encryption and decryption, so +        # we want to initialize it anyway to allow for all combinations of +        # deferred encryption and decryption configurations. +        self._initialize_sync_db(opts) +        if defer_encryption:              # initialize syncing queue encryption pool -            self._sync_enc_pool = crypto.SyncEncrypterPool( -                self._crypto, self._sync_db, self._sync_db_write_lock) - -            # ----------------------------------------------------------------- -            # From the documentation: If f returns a deferred, rescheduling -            # will not take place until the deferred has fired. The result -            # value is ignored. - -            # TODO use this to avoid multiple sync attempts if the sync has not -            # finished! -            # ----------------------------------------------------------------- - -            # XXX this was called sync_watcher --- trace any remnants -            self._sync_loop = LoopingCall(self._encrypt_syncing_docs) -            self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD) +            self._sync_enc_pool = encdecpool.SyncEncrypterPool( +                self._crypto, self._sync_db)          self.shutdownID = None @@ -584,11 +537,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          # somewhere else          sync_opts = SQLCipherOptions.copy(              opts, path=sync_db_path, create=True) -        self._sync_db = initialize_sqlcipher_db( -            sync_opts, on_init=self._sync_db_extra_init, -            check_same_thread=False) -        pragmas.set_crypto_pragmas(self._sync_db, opts) -        # --------------------------------------------------------- +        self._sync_db = getConnectionPool( +            sync_opts, extra_queries=self._sync_db_extra_init)      @property      def _sync_db_extra_init(self): @@ -599,15 +549,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          :rtype: tuple of strings          """          maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)" -        encr = crypto.SyncEncrypterPool -        decr = crypto.SyncDecrypterPool +        encr = encdecpool.SyncEncrypterPool +        decr = encdecpool.SyncDecrypterPool          sql_encr_table_query = (maybe_create % (              encr.TABLE_NAME, encr.FIELD_NAMES))          sql_decr_table_query = (maybe_create % (              decr.TABLE_NAME, decr.FIELD_NAMES))          return (sql_encr_table_query, sql_decr_table_query) -    def sync(self, url, creds=None, autocreate=True, defer_decryption=True): +    def sync(self, url, creds=None, defer_decryption=True):          """          Synchronize documents with remote replica exposed at url. @@ -621,12 +571,9 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          :param url: The url of the target replica to sync with.          :type url: str -        :param creds: -            optional dictionary giving credentials. -            to authorize the operation with the server. +        :param creds: optional dictionary giving credentials to authorize the +                      operation with the server.          :type creds: dict -        :param autocreate: Ask the target to create the db if non-existent. -        :type autocreate: bool          :param defer_decryption:              Whether to defer the decryption process using the intermediate              database. If False, decryption will be done inline. @@ -637,49 +584,11 @@ class SQLCipherU1DBSync(SQLCipherDatabase):              before the synchronisation was performed.          :rtype: Deferred          """ -        kwargs = {'creds': creds, 'autocreate': autocreate, -                  'defer_decryption': defer_decryption} -        return self._defer_to_sync_threadpool(self._sync, url, **kwargs) - -    def _sync(self, url, creds=None, autocreate=True, defer_decryption=True): -        res = None -          # the following context manager blocks until the syncing lock can be          # acquired. -        # TODO review, I think this is no longer needed with a 1-thread -        # threadpool. - -        log.msg("in _sync") -        self.__url = url          with self._syncer(url, creds=creds) as syncer:              # XXX could mark the critical section here... -            try: -                log.msg('syncer sync...') -                res = syncer.sync(autocreate=autocreate, -                                  defer_decryption=defer_decryption) - -            except PendingReceivedDocsSyncError: -                logger.warning("Local sync db is not clear, skipping sync...") -                return -            except CannotSendRequest: -                logger.warning("Connection with sync target couldn't be " -                               "established. Resetting connection...") -                # closing the connection it will be recreated in the next try -                syncer.sync_target.close() -                return - -        return res - -    def stop_sync(self): -        """ -        Interrupt all ongoing syncs. -        """ -        self._stop_sync() - -    def _stop_sync(self): -        for url in self._syncers: -            _, syncer = self._syncers[url] -            syncer.stop() +            return syncer.sync(defer_decryption=defer_decryption)      @contextmanager      def _syncer(self, url, creds=None): @@ -690,6 +599,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          one instance synchronizing the same database replica at the same time.          Because of that, this method blocks until the syncing lock can be          acquired. + +        :param creds: optional dictionary giving credentials to authorize the +                      operation with the server. +        :type creds: dict          """          with self.syncing_lock[self._path]:              syncer = self._get_syncer(url, creds=creds) @@ -723,16 +636,17 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          h = sha256(json.dumps([url, creds])).hexdigest()          cur_h, syncer = self._syncers.get(url, (None, None))          if syncer is None or h != cur_h: -            wlock = self._sync_db_write_lock              syncer = SoledadSynchronizer(                  self, -                SoledadSyncTarget(url, -                                  # XXX is the replica_uid ready? -                                  self._replica_uid, -                                  creds=creds, -                                  crypto=self._crypto, -                                  sync_db=self._sync_db, -                                  sync_db_write_lock=wlock)) +                SoledadHTTPSyncTarget( +                    url, +                    # XXX is the replica_uid ready? +                    self._replica_uid, +                    creds=creds, +                    crypto=self._crypto, +                    cert_file=self._cert_file, +                    sync_db=self._sync_db, +                    sync_enc_pool=self._sync_enc_pool))              self._syncers[url] = (h, syncer)          # in order to reuse the same synchronizer multiple times we have to          # reset its state (i.e. the number of documents received from target @@ -744,34 +658,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):      # Symmetric encryption of syncing docs      # -    def _encrypt_syncing_docs(self): -        """ -        Process the syncing queue and send the documents there -        to be encrypted in the sync db. They will be read by the -        SoledadSyncTarget during the sync_exchange. - -        Called periodically from the LoopingCall self._sync_loop. -        """ -        # TODO should return a deferred that would firewhen the encryption is -        # done. See note on __init__ - -        lock = self.encrypting_lock -        # optional wait flag used to avoid blocking -        if not lock.acquire(False): -            return -        else: -            queue = self.sync_queue -            try: -                while not queue.empty(): -                    doc = queue.get_nowait() -                    self._sync_enc_pool.encrypt_doc(doc) - -            except Exception as exc: -                logger.error("Error while  encrypting docs to sync") -                logger.exception(exc) -            finally: -                lock.release() -      def get_generation(self):          # FIXME          # XXX this SHOULD BE a callback @@ -789,16 +675,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          """          Close the syncer and syncdb orderly          """ -        # stop the sync loop for deferred encryption -        if self._sync_loop is not None: -            self._sync_loop.reset() -            self._sync_loop.stop() -            self._sync_loop = None          # close all open syncers          for url in self._syncers: -            _, syncer = self._syncers[url] -            syncer.close() -        self._syncers = [] +            del self._syncers[url] +          # stop the encryption pool          if self._sync_enc_pool is not None:              self._sync_enc_pool.close() @@ -808,11 +688,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          if self._sync_db is not None:              self._sync_db.close()              self._sync_db = None -        # close the sync queue -        if self.sync_queue is not None: -            self.sync_queue.close() -            del self.sync_queue -            self.sync_queue = None  class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): @@ -903,3 +778,40 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False,                             has_conflicts=has_conflicts, syncable=syncable)  sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) + + +# +# twisted.enterprise.adbapi SQLCipher implementation +# + +SQLCIPHER_CONNECTION_TIMEOUT = 10 + + +def getConnectionPool(opts, extra_queries=None): +    openfun = partial( +        pragmas.set_init_pragmas, +        opts=opts, +        extra_queries=extra_queries) +    return SQLCipherConnectionPool( +        database=opts.path, +        check_same_thread=False, +        cp_openfun=openfun, +        timeout=SQLCIPHER_CONNECTION_TIMEOUT) + + +class SQLCipherConnection(adbapi.Connection): +    pass + + +class SQLCipherTransaction(adbapi.Transaction): +    pass + + +class SQLCipherConnectionPool(adbapi.ConnectionPool): + +    connectionFactory = SQLCipherConnection +    transactionFactory = SQLCipherTransaction + +    def __init__(self, *args, **kwargs): +        adbapi.ConnectionPool.__init__( +            self, "pysqlcipher.dbapi2", *args, **kwargs) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index d3f106da..53172f31 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -16,17 +16,10 @@  # along with this program. If not, see <http://www.gnu.org/licenses/>.  """  Soledad synchronization utilities. - -Extend u1db Synchronizer with the ability to: - -    * Postpone the update of the known replica uid until all the decryption of -      the incoming messages has been processed. - -    * Be interrupted and recovered.  """  import logging -import traceback -from threading import Lock + +from twisted.internet import defer  from u1db import errors  from u1db.sync import Synchronizer @@ -47,17 +40,8 @@ class SoledadSynchronizer(Synchronizer):      Also modified to allow for interrupting the synchronization process.      """ -    # TODO can delegate the syncing to the api object, living in the reactor -    # thread, and use a simple flag. -    syncing_lock = Lock() - -    def stop(self): -        """ -        Stop the current sync in progress. -        """ -        self.sync_target.stop() - -    def sync(self, autocreate=False, defer_decryption=True): +    @defer.inlineCallbacks +    def sync(self, defer_decryption=True):          """          Synchronize documents between source and target. @@ -69,48 +53,22 @@ class SoledadSynchronizer(Synchronizer):          This is done to allow the ongoing parallel decryption of the incoming          docs to proceed without `InvalidGeneration` conflicts. -        :param autocreate: Whether the target replica should be created or not. -        :type autocreate: bool          :param defer_decryption: Whether to defer the decryption process using                                   the intermediate database. If False,                                   decryption will be done inline.          :type defer_decryption: bool -        """ -        self.syncing_lock.acquire() -        try: -            return self._sync(autocreate=autocreate, -                              defer_decryption=defer_decryption) -        except Exception: -            # we want this exception to reach either SQLCipherU1DBSync.sync or -            # the Solead api object itself, so it is poperly handled and/or -            # logged... -            raise -        finally: -            # ... but we also want to release the syncing lock so this -            # Synchronizer may be reused later. -            self.release_syncing_lock() - -    def _sync(self, autocreate=False, defer_decryption=True): -        """ -        Helper function, called from the main `sync` method. -        See `sync` docstring. + +        :return: A deferred which will fire after the sync has finished. +        :rtype: twisted.internet.defer.Deferred          """          sync_target = self.sync_target          # get target identifier, its current generation,          # and its last-seen database generation for this source          ensure_callback = None -        try: -            (self.target_replica_uid, target_gen, target_trans_id, -             target_my_gen, target_my_trans_id) = \ -                sync_target.get_sync_info(self.source._replica_uid) -        except errors.DatabaseDoesNotExist: -            if not autocreate: -                raise -            # will try to ask sync_exchange() to create the db -            self.target_replica_uid = None -            target_gen, target_trans_id = (0, '') -            target_my_gen, target_my_trans_id = (0, '') +        (self.target_replica_uid, target_gen, target_trans_id, +         target_my_gen, target_my_trans_id) = yield \ +            sync_target.get_sync_info(self.source._replica_uid)          logger.debug(              "Soledad target sync info:\n" @@ -151,15 +109,15 @@ class SoledadSynchronizer(Synchronizer):                      self.target_replica_uid)          logger.debug(              "Soledad source sync info:\n" -            "  source target gen: %d\n" -            "  source target trans_id: %s" +            "  last target gen known to source: %d\n" +            "  last target trans_id known to source: %s"              % (target_last_known_gen, target_last_known_trans_id))          # validate transaction ids          if not changes and target_last_known_gen == target_gen:              if target_trans_id != target_last_known_trans_id:                  raise errors.InvalidTransactionId -            return my_gen +            defer.returnValue(my_gen)          # prepare to send all the changed docs          changed_doc_ids = [doc_id for doc_id, _, _ in changes] @@ -174,40 +132,26 @@ class SoledadSynchronizer(Synchronizer):          # exchange documents and try to insert the returned ones with          # the target, return target synced-up-to gen. -        # -        # The sync_exchange method may be interrupted, in which case it will -        # return a tuple of Nones. -        try: -            new_gen, new_trans_id = sync_target.sync_exchange( -                docs_by_generation, self.source._replica_uid, -                target_last_known_gen, target_last_known_trans_id, -                self._insert_doc_from_target, ensure_callback=ensure_callback, -                defer_decryption=defer_decryption) -            logger.debug( -                "Soledad source sync info after sync exchange:\n" -                "  source target gen: %d\n" -                "  source target trans_id: %s" -                % (new_gen, new_trans_id)) -            info = { -                "target_replica_uid": self.target_replica_uid, -                "new_gen": new_gen, -                "new_trans_id": new_trans_id, -                "my_gen": my_gen -            } -            self._syncing_info = info -            if defer_decryption and not sync_target.has_syncdb(): -                logger.debug("Sync target has no valid sync db, " -                             "aborting defer_decryption") -                defer_decryption = False -            self.complete_sync() -        except Exception as e: -            logger.error("Soledad sync error: %s" % str(e)) -            logger.error(traceback.format_exc()) -            sync_target.stop() -        finally: -            sync_target.close() - -        return my_gen +        new_gen, new_trans_id = yield sync_target.sync_exchange( +            docs_by_generation, self.source._replica_uid, +            target_last_known_gen, target_last_known_trans_id, +            self._insert_doc_from_target, ensure_callback=ensure_callback, +            defer_decryption=defer_decryption) +        logger.debug( +            "Soledad source sync info after sync exchange:\n" +            "  source known target gen: %d\n" +            "  source known target trans_id: %s" +            % (new_gen, new_trans_id)) +        info = { +            "target_replica_uid": self.target_replica_uid, +            "new_gen": new_gen, +            "new_trans_id": new_trans_id, +            "my_gen": my_gen +        } +        self._syncing_info = info +        yield self.complete_sync() + +        defer.returnValue(my_gen)      def complete_sync(self):          """ @@ -215,6 +159,9 @@ class SoledadSynchronizer(Synchronizer):              (a) record last known generation and transaction uid for the remote              replica, and              (b) make target aware of our current reached generation. + +        :return: A deferred which will fire when the sync has been completed. +        :rtype: twisted.internet.defer.Deferred          """          logger.debug("Completing deferred last step in SYNC...") @@ -225,39 +172,23 @@ class SoledadSynchronizer(Synchronizer):              info["target_replica_uid"], info["new_gen"], info["new_trans_id"])          # if gapless record current reached generation with target -        self._record_sync_info_with_the_target(info["my_gen"]) - -    @property -    def syncing(self): -        """ -        Return True if a sync is ongoing, False otherwise. -        :rtype: bool -        """ -        # XXX FIXME  we need some mechanism for timeout: should cleanup and -        # release if something in the syncdb-decrypt goes wrong. we could keep -        # track of the release date and cleanup unrealistic sync entries after -        # some time. +        return self._record_sync_info_with_the_target(info["my_gen"]) -        # TODO use cancellable deferreds instead -        locked = self.syncing_lock.locked() -        return locked - -    def release_syncing_lock(self): -        """ -        Release syncing lock if it's locked. +    def _record_sync_info_with_the_target(self, start_generation):          """ -        if self.syncing_lock.locked(): -            self.syncing_lock.release() +        Store local replica metadata in server. -    def close(self): -        """ -        Close sync target pool of workers. -        """ -        self.release_syncing_lock() -        self.sync_target.close() +        :param start_generation: The local generation when the sync was +                                 started. +        :type start_generation: int -    def __del__(self): -        """ -        Cleanup: release lock. +        :return: A deferred which will fire when the operation has been +                 completed. +        :rtype: twisted.internet.defer.Deferred          """ -        self.release_syncing_lock() +        cur_gen, trans_id = self.source._get_generation_info() +        if (cur_gen == start_generation + self.num_inserted +                and self.num_inserted > 0): +            return self.sync_target.record_sync_info( +                self.source._replica_uid, cur_gen, trans_id) +        return defer.succeed(None) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py deleted file mode 100644 index 986bd991..00000000 --- a/client/src/leap/soledad/client/target.py +++ /dev/null @@ -1,1517 +0,0 @@ -# -*- coding: utf-8 -*- -# target.py -# Copyright (C) 2013, 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -A U1DB backend for encrypting data before sending to server and decrypting -after receiving. -""" -import cStringIO -import gzip -import logging -import re -import urllib -import threading - -from collections import defaultdict -from time import sleep -from uuid import uuid4 - -import simplejson as json - -from u1db import errors -from u1db.remote import utils, http_errors -from u1db.remote.http_target import HTTPSyncTarget -from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase -from zope.proxy import ProxyBase -from zope.proxy import sameProxiedObjects, setProxiedObject - -from twisted.internet.task import LoopingCall - -from leap.soledad.common.document import SoledadDocument -from leap.soledad.client.auth import TokenBasedAuth -from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.crypto import encrypt_doc, decrypt_doc -from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool -from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS -from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS -from leap.soledad.client.events import signal - - -logger = logging.getLogger(__name__) - - -def _gunzip(data): -    """ -    Uncompress data that is gzipped. - -    :param data: gzipped data -    :type data: basestring -    """ -    buffer = cStringIO.StringIO() -    buffer.write(data) -    buffer.seek(0) -    try: -        data = gzip.GzipFile(mode='r', fileobj=buffer).read() -    except Exception: -        logger.warning("Error while decrypting gzipped data") -    buffer.close() -    return data - - -class PendingReceivedDocsSyncError(Exception): -    pass - - -class DocumentSyncerThread(threading.Thread): -    """ -    A thread that knowns how to either send or receive a document during the -    sync process. -    """ - -    def __init__(self, doc_syncer, release_method, failed_method, -                 idx, total, last_request_lock=None, last_callback_lock=None): -        """ -        Initialize a new syncer thread. - -        :param doc_syncer: A document syncer. -        :type doc_syncer: HTTPDocumentSyncer -        :param release_method: A method to be called when finished running. -        :type release_method: callable(DocumentSyncerThread) -        :param failed_method: A method to be called when we failed. -        :type failed_method: callable(DocumentSyncerThread) -        :param idx: The index count of the current operation. -        :type idx: int -        :param total: The total number of operations. -        :type total: int -        :param last_request_lock: A lock to wait for before actually performing -                                  the request. -        :type last_request_lock: threading.Lock -        :param last_callback_lock: A lock to wait for before actually running -                                  the success callback. -        :type last_callback_lock: threading.Lock -        """ -        threading.Thread.__init__(self) -        self._doc_syncer = doc_syncer -        self._release_method = release_method -        self._failed_method = failed_method -        self._idx = idx -        self._total = total -        self._last_request_lock = last_request_lock -        self._last_callback_lock = last_callback_lock -        self._response = None -        self._exception = None -        self._result = None -        self._success = False -        # a lock so we can signal when we're finished -        self._request_lock = threading.Lock() -        self._request_lock.acquire() -        self._callback_lock = threading.Lock() -        self._callback_lock.acquire() -        # make thread interruptable -        self._stopped = None -        self._stop_lock = threading.Lock() - -    def run(self): -        """ -        Run the HTTP request and store results. - -        This method will block and wait for an eventual previous operation to -        finish before actually performing the request. It also traps any -        exception and register any failure with the request. -        """ -        with self._stop_lock: -            if self._stopped is None: -                self._stopped = False -            else: -                return - -        # eventually wait for the previous thread to finish -        if self._last_request_lock is not None: -            self._last_request_lock.acquire() - -        # bail out in case we've been interrupted -        if self.stopped is True: -            return - -        try: -            self._response = self._doc_syncer.do_request() -            self._request_lock.release() - -            # run success callback -            if self._doc_syncer.success_callback is not None: - -                # eventually wait for callback lock release -                if self._last_callback_lock is not None: -                    self._last_callback_lock.acquire() - -                # bail out in case we've been interrupted -                if self._stopped is True: -                    return - -                self._result = self._doc_syncer.success_callback( -                    self._idx, self._total, self._response) -                self._success = True -                doc_syncer = self._doc_syncer -                self._release_method(self, doc_syncer) -                self._doc_syncer = None -                # let next thread executed its callback -                self._callback_lock.release() - -        # trap any exception and signal failure -        except Exception as e: -            self._exception = e -            self._success = False -            # run failure callback -            if self._doc_syncer.failure_callback is not None: - -                # eventually wait for callback lock release -                if self._last_callback_lock is not None: -                    self._last_callback_lock.acquire() - -                # bail out in case we've been interrupted -                if self.stopped is True: -                    return - -                self._doc_syncer.failure_callback( -                    self._idx, self._total, self._exception) - -                self._failed_method() -                # we do not release the callback lock here because we -                # failed and so we don't want other threads to succeed. - -    @property -    def doc_syncer(self): -        return self._doc_syncer - -    @property -    def response(self): -        return self._response - -    @property -    def exception(self): -        return self._exception - -    @property -    def callback_lock(self): -        return self._callback_lock - -    @property -    def request_lock(self): -        return self._request_lock - -    @property -    def success(self): -        return self._success - -    def stop(self): -        with self._stop_lock: -            self._stopped = True - -    @property -    def stopped(self): -        with self._stop_lock: -            return self._stopped - -    @property -    def result(self): -        return self._result - - -class DocumentSyncerPool(object): -    """ -    A pool of reusable document syncers. -    """ - -    POOL_SIZE = 10 -    """ -    The maximum amount of syncer threads running at the same time. -    """ - -    def __init__(self, raw_url, raw_creds, query_string, headers, -                 ensure_callback, stop_method): -        """ -        Initialize the document syncer pool. - -        :param raw_url: The complete raw URL for the HTTP request. -        :type raw_url: str -        :param raw_creds: The credentials for the HTTP request. -        :type raw_creds: dict -        :param query_string: The query string for the HTTP request. -        :type query_string: str -        :param headers: The headers for the HTTP request. -        :type headers: dict -        :param ensure_callback: A callback to ensure we have the correct -                                target_replica_uid, if it was just created. -        :type ensure_callback: callable - -        """ -        # save syncer params -        self._raw_url = raw_url -        self._raw_creds = raw_creds -        self._query_string = query_string -        self._headers = headers -        self._ensure_callback = ensure_callback -        self._stop_method = stop_method -        # pool attributes -        self._failures = False -        self._semaphore_pool = threading.BoundedSemaphore( -            DocumentSyncerPool.POOL_SIZE) -        self._pool_access_lock = threading.Lock() -        self._doc_syncers = [] -        self._threads = [] - -    def new_syncer_thread(self, idx, total, last_request_lock=None, -                          last_callback_lock=None): -        """ -        Yield a new document syncer thread. - -        :param idx: The index count of the current operation. -        :type idx: int -        :param total: The total number of operations. -        :type total: int -        :param last_request_lock: A lock to wait for before actually performing -                                  the request. -        :type last_request_lock: threading.Lock -        :param last_callback_lock: A lock to wait for before actually running -                                   the success callback. -        :type last_callback_lock: threading.Lock -        """ -        t = None -        # wait for available threads -        self._semaphore_pool.acquire() -        with self._pool_access_lock: -            if self._failures is True: -                return None -            # get a syncer -            doc_syncer = self._get_syncer() -            # we rely on DocumentSyncerThread.run() to release the lock using -            # self.release_syncer so we can launch a new thread. -            t = DocumentSyncerThread( -                doc_syncer, self.release_syncer, self.cancel_threads, -                idx, total, -                last_request_lock=last_request_lock, -                last_callback_lock=last_callback_lock) -            self._threads.append(t) -            return t - -    def _failed(self): -        with self._pool_access_lock: -            self._failures = True - -    @property -    def failures(self): -        return self._failures - -    def _get_syncer(self): -        """ -        Get a document syncer from the pool. - -        This method will create a new syncer whenever there is no syncer -        available in the pool. - -        :return: A syncer. -        :rtype: HTTPDocumentSyncer -        """ -        syncer = None -        # get an available syncer or create a new one -        try: -            syncer = self._doc_syncers.pop() -        except IndexError: -            syncer = HTTPDocumentSyncer( -                self._raw_url, self._raw_creds, self._query_string, -                self._headers, self._ensure_callback) -        return syncer - -    def release_syncer(self, syncer_thread, doc_syncer): -        """ -        Return a syncer to the pool after use and check for any failures. - -        :param syncer: The syncer to be returned to the pool. -        :type syncer: HTTPDocumentSyncer -        """ -        with self._pool_access_lock: -            self._doc_syncers.append(doc_syncer) -            if syncer_thread.success is True: -                self._threads.remove(syncer_thread) -            self._semaphore_pool.release() - -    def cancel_threads(self): -        """ -        Stop all threads in the pool. -        """ -        # stop sync -        self._stop_method() -        stopped = [] -        # stop all threads -        logger.warning("Soledad sync: cancelling sync threads...") -        with self._pool_access_lock: -            self._failures = True -            while self._threads: -                t = self._threads.pop(0) -                t.stop() -                self._doc_syncers.append(t.doc_syncer) -                stopped.append(t) -        # release locks and join -        while stopped: -            t = stopped.pop(0) -            t.request_lock.acquire(False)   # just in case -            t.request_lock.release() -            t.callback_lock.acquire(False)  # just in case -            t.callback_lock.release() -        # release any blocking semaphores -        for i in xrange(DocumentSyncerPool.POOL_SIZE): -            try: -                self._semaphore_pool.release() -            except ValueError: -                break -        logger.warning("Soledad sync: cancelled sync threads.") - -    def cleanup(self): -        """ -        Close and remove any syncers from the pool. -        """ -        with self._pool_access_lock: -            while self._doc_syncers: -                syncer = self._doc_syncers.pop() -                syncer.close() -                del syncer - - -class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): - -    def __init__(self, raw_url, creds, query_string, headers, ensure_callback): -        """ -        Initialize the client. - -        :param raw_url: The raw URL of the target HTTP server. -        :type raw_url: str -        :param creds: Authentication credentials. -        :type creds: dict -        :param query_string: The query string for the HTTP request. -        :type query_string: str -        :param headers: The headers for the HTTP request. -        :type headers: dict -        :param ensure_callback: A callback to ensure we have the correct -                                target_replica_uid, if it was just created. -        :type ensure_callback: callable -        """ -        HTTPClientBase.__init__(self, raw_url, creds=creds) -        # info needed to perform the request -        self._query_string = query_string -        self._headers = headers -        self._ensure_callback = ensure_callback -        # the actual request method -        self._request_method = None -        self._success_callback = None -        self._failure_callback = None - -    def _reset(self): -        """ -        Reset this document syncer so we can reuse it. -        """ -        self._request_method = None -        self._success_callback = None -        self._failure_callback = None -        self._request_method = None - -    def set_request_method(self, method, *args, **kwargs): -        """ -        Set the actual method to perform the request. - -        :param method: Either 'get' or 'put'. -        :type method: str -        :param args: Arguments for the request method. -        :type args: list -        :param kwargs: Keyworded arguments for the request method. -        :type kwargs: dict -        """ -        self._reset() -        # resolve request method -        if method is 'get': -            self._request_method = self._get_doc -        elif method is 'put': -            self._request_method = self._put_doc -        else: -            raise Exception -        # store request method args -        self._args = args -        self._kwargs = kwargs - -    def set_success_callback(self, callback): -        self._success_callback = callback - -    def set_failure_callback(self, callback): -        self._failure_callback = callback - -    @property -    def success_callback(self): -        return self._success_callback - -    @property -    def failure_callback(self): -        return self._failure_callback - -    def do_request(self): -        """ -        Actually perform the request. - -        :return: The body and headers of the response. -        :rtype: tuple -        """ -        self._ensure_connection() -        args = self._args -        kwargs = self._kwargs -        return self._request_method(*args, **kwargs) - -    def _request(self, method, url_parts, params=None, body=None, -                 content_type=None): -        """ -        Perform an HTTP request. - -        :param method: The HTTP request method. -        :type method: str -        :param url_parts: A list representing the request path. -        :type url_parts: list -        :param params: Parameters for the URL query string. -        :type params: dict -        :param body: The body of the request. -        :type body: str -        :param content-type: The content-type of the request. -        :type content-type: str - -        :return: The body and headers of the response. -        :rtype: tuple - -        :raise errors.Unavailable: Raised after a number of unsuccesful -                                   request attempts. -        :raise Exception: Raised for any other exception ocurring during the -                          request. -        """ - -        self._ensure_connection() -        unquoted_url = url_query = self._url.path -        if url_parts: -            if not url_query.endswith('/'): -                url_query += '/' -                unquoted_url = url_query -            url_query += '/'.join(urllib.quote(part, safe='') -                                  for part in url_parts) -            # oauth performs its own quoting -            unquoted_url += '/'.join(url_parts) -        encoded_params = {} -        if params: -            for key, value in params.items(): -                key = unicode(key).encode('utf-8') -                encoded_params[key] = _encode_query_parameter(value) -            url_query += ('?' + urllib.urlencode(encoded_params)) -        if body is not None and not isinstance(body, basestring): -            body = json.dumps(body) -            content_type = 'application/json' -        headers = {} -        if content_type: -            headers['content-type'] = content_type - -        # Patched: We would like to receive gzip pretty please -        # ---------------------------------------------------- -        headers['accept-encoding'] = "gzip" -        # ---------------------------------------------------- - -        headers.update( -            self._sign_request(method, unquoted_url, encoded_params)) - -        for delay in self._delays: -            try: -                self._conn.request(method, url_query, body, headers) -                return self._response() -            except errors.Unavailable, e: -                sleep(delay) -        raise e - -    def _response(self): -        """ -        Return the response of the (possibly gzipped) HTTP request. - -        :return: The body and headers of the response. -        :rtype: tuple -        """ -        resp = self._conn.getresponse() -        body = resp.read() -        headers = dict(resp.getheaders()) - -        # Patched: We would like to decode gzip -        # ---------------------------------------------------- -        encoding = headers.get('content-encoding', '') -        if "gzip" in encoding: -            body = _gunzip(body) -        # ---------------------------------------------------- - -        if resp.status in (200, 201): -            return body, headers -        elif resp.status in http_errors.ERROR_STATUSES: -            try: -                respdic = json.loads(body) -            except ValueError: -                pass -            else: -                self._error(respdic) -        # special case -        if resp.status == 503: -            raise errors.Unavailable(body, headers) -        raise errors.HTTPError(resp.status, body, headers) - -    def _prepare(self, comma, entries, **dic): -        """ -        Prepare an entry to be sent through a syncing POST request. - -        :param comma: A string to be prepended to the current entry. -        :type comma: str -        :param entries: A list of entries accumulated to be sent on the -                        request. -        :type entries: list -        :param dic: The data to be included in this entry. -        :type dic: dict - -        :return: The size of the prepared entry. -        :rtype: int -        """ -        entry = comma + '\r\n' + json.dumps(dic) -        entries.append(entry) -        return len(entry) - -    def _init_post_request(self, action, content_length): -        """ -        Initiate a syncing POST request. - -        :param url: The syncing URL. -        :type url: str -        :param action: The syncing action, either 'get' or 'receive'. -        :type action: str -        :param headers: The initial headers to be sent on this request. -        :type headers: dict -        :param content_length: The content-length of the request. -        :type content_length: int -        """ -        self._conn.putrequest('POST', self._query_string) -        self._conn.putheader( -            'content-type', 'application/x-soledad-sync-%s' % action) -        for header_name, header_value in self._headers: -            self._conn.putheader(header_name, header_value) -        self._conn.putheader('accept-encoding', 'gzip') -        self._conn.putheader('content-length', str(content_length)) -        self._conn.endheaders() - -    def _get_doc(self, received, sync_id, last_known_generation, -                 last_known_trans_id): -        """ -        Get a sync document from server by means of a POST request. - -        :param received: The number of documents already received in the -                         current sync session. -        :type received: int -        :param sync_id: The id for the current sync session. -        :type sync_id: str -        :param last_known_generation: Target's last known generation. -        :type last_known_generation: int -        :param last_known_trans_id: Target's last known transaction id. -        :type last_known_trans_id: str - -        :return: The body and headers of the response. -        :rtype: tuple -        """ -        entries = ['['] -        size = 1 -        # add remote replica metadata to the request -        size += self._prepare( -            '', entries, -            last_known_generation=last_known_generation, -            last_known_trans_id=last_known_trans_id, -            sync_id=sync_id, -            ensure=self._ensure_callback is not None) -        # inform server of how many documents have already been received -        size += self._prepare( -            ',', entries, received=received) -        entries.append('\r\n]') -        size += len(entries[-1]) -        # send headers -        self._init_post_request('get', size) -        # get document -        for entry in entries: -            self._conn.send(entry) -        return self._response() - -    def _put_doc(self, sync_id, last_known_generation, last_known_trans_id, -                 id, rev, content, gen, trans_id, number_of_docs, doc_idx): -        """ -        Put a sync document on server by means of a POST request. - -        :param sync_id: The id for the current sync session. -        :type sync_id: str -        :param last_known_generation: Target's last known generation. -        :type last_known_generation: int -        :param last_known_trans_id: Target's last known transaction id. -        :type last_known_trans_id: str -        :param id: The document id. -        :type id: str -        :param rev: The document revision. -        :type rev: str -        :param content: The serialized document content. -        :type content: str -        :param gen: The generation of the modification of the document. -        :type gen: int -        :param trans_id: The transaction id of the modification of the -                         document. -        :type trans_id: str -        :param number_of_docs: The total amount of documents sent on this sync -                               session. -        :type number_of_docs: int -        :param doc_idx: The index of the current document being sent. -        :type doc_idx: int - -        :return: The body and headers of the response. -        :rtype: tuple -        """ -        # prepare to send the document -        entries = ['['] -        size = 1 -        # add remote replica metadata to the request -        size += self._prepare( -            '', entries, -            last_known_generation=last_known_generation, -            last_known_trans_id=last_known_trans_id, -            sync_id=sync_id, -            ensure=self._ensure_callback is not None) -        # add the document to the request -        size += self._prepare( -            ',', entries, -            id=id, rev=rev, content=content, gen=gen, trans_id=trans_id, -            number_of_docs=number_of_docs, doc_idx=doc_idx) -        entries.append('\r\n]') -        size += len(entries[-1]) -        # send headers -        self._init_post_request('put', size) -        # send document -        for entry in entries: -            self._conn.send(entry) -        return self._response() - -    def _sign_request(self, method, url_query, params): -        """ -        Return an authorization header to be included in the HTTP request. - -        :param method: The HTTP method. -        :type method: str -        :param url_query: The URL query string. -        :type url_query: str -        :param params: A list with encoded query parameters. -        :type param: list - -        :return: The Authorization header. -        :rtype: list of tuple -        """ -        return TokenBasedAuth._sign_request(self, method, url_query, params) - -    def set_token_credentials(self, uuid, token): -        """ -        Store given credentials so we can sign the request later. - -        :param uuid: The user's uuid. -        :type uuid: str -        :param token: The authentication token. -        :type token: str -        """ -        TokenBasedAuth.set_token_credentials(self, uuid, token) - - -class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): -    """ -    A SyncTarget that encrypts data before sending and decrypts data after -    receiving. - -    Normally encryption will have been written to the sync database upon -    document modification. The sync database is also used to write temporarily -    the parsed documents that the remote send us, before being decrypted and -    written to the main database. -    """ - -    # will later keep a reference to the insert-doc callback -    # passed to sync_exchange -    _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) - -    """ -    Period of recurrence of the periodic decrypting task, in seconds. -    """ -    DECRYPT_LOOP_PERIOD = 0.5 - -    # -    # Modified HTTPSyncTarget methods. -    # - -    def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, -                 sync_db=None, sync_db_write_lock=None): -        """ -        Initialize the SoledadSyncTarget. - -        :param source_replica_uid: The source replica uid which we use when -                                   deferring decryption. -        :type source_replica_uid: str -        :param url: The url of the target replica to sync with. -        :type url: str -        :param creds: Optional dictionary giving credentials. -                      to authorize the operation with the server. -        :type creds: dict -        :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt -                        document contents when syncing. -        :type crypto: soledad.crypto.SoledadCrypto -        :param sync_db: Optional. handler for the db with the symmetric -                        encryption of the syncing documents. If -                        None, encryption will be done in-place, -                        instead of retreiving it from the dedicated -                        database. -        :type sync_db: Sqlite handler -        :param sync_db_write_lock: a write lock for controlling concurrent -                                   access to the sync_db -        :type sync_db_write_lock: threading.Lock -        """ -        HTTPSyncTarget.__init__(self, url, creds) -        self._raw_url = url -        self._raw_creds = creds -        self._crypto = crypto -        self._stopped = True -        self._stop_lock = threading.Lock() -        self._sync_exchange_lock = threading.Lock() -        self.source_replica_uid = source_replica_uid -        self._defer_decryption = False -        self._syncer_pool = None - -        # deferred decryption attributes -        self._sync_db = None -        self._sync_db_write_lock = None -        self._decryption_callback = None -        self._sync_decr_pool = None -        self._sync_loop = None -        if sync_db and sync_db_write_lock is not None: -            self._sync_db = sync_db -            self._sync_db_write_lock = sync_db_write_lock - -    def _setup_sync_decr_pool(self): -        """ -        Set up the SyncDecrypterPool for deferred decryption. -        """ -        if self._sync_decr_pool is None: -            # initialize syncing queue decryption pool -            self._sync_decr_pool = SyncDecrypterPool( -                self._crypto, self._sync_db, -                self._sync_db_write_lock, -                insert_doc_cb=self._insert_doc_cb) -            self._sync_decr_pool.set_source_replica_uid( -                self.source_replica_uid) - -    def _teardown_sync_decr_pool(self): -        """ -        Tear down the SyncDecrypterPool. -        """ -        if self._sync_decr_pool is not None: -            self._sync_decr_pool.close() -            self._sync_decr_pool = None - -    def _setup_sync_loop(self): -        """ -        Set up the sync loop for deferred decryption. -        """ -        if self._sync_loop is None: -            self._sync_loop = LoopingCall( -                self._decrypt_syncing_received_docs) -            self._sync_loop.start(self.DECRYPT_LOOP_PERIOD) - -    def _teardown_sync_loop(self): -        """ -        Tear down the sync loop. -        """ -        if self._sync_loop is not None: -            self._sync_loop.stop() -            self._sync_loop = None - -    def _get_replica_uid(self, url): -        """ -        Return replica uid from the url, or None. - -        :param url: the replica url -        :type url: str -        """ -        replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url) -        return replica_uid_match[0] if len(replica_uid_match) > 0 else None - -    @staticmethod -    def connect(url, source_replica_uid=None, crypto=None): -        return SoledadSyncTarget( -            url, source_replica_uid=source_replica_uid, crypto=crypto) - -    def _parse_received_doc_response(self, response): -        """ -        Parse the response from the server containing the received document. - -        :param response: The body and headers of the response. -        :type response: tuple(str, dict) -        """ -        data, _ = response -        # decode incoming stream -        parts = data.splitlines() -        if not parts or parts[0] != '[' or parts[-1] != ']': -            raise errors.BrokenSyncStream -        data = parts[1:-1] -        # decode metadata -        line, comma = utils.check_and_strip_comma(data[0]) -        metadata = None -        try: -            metadata = json.loads(line) -            new_generation = metadata['new_generation'] -            new_transaction_id = metadata['new_transaction_id'] -            number_of_changes = metadata['number_of_changes'] -        except (json.JSONDecodeError, KeyError): -            raise errors.BrokenSyncStream -        # make sure we have replica_uid from fresh new dbs -        if self._ensure_callback and 'replica_uid' in metadata: -            self._ensure_callback(metadata['replica_uid']) -        # parse incoming document info -        doc_id = None -        rev = None -        content = None -        gen = None -        trans_id = None -        if number_of_changes > 0: -            try: -                entry = json.loads(data[1]) -                doc_id = entry['id'] -                rev = entry['rev'] -                content = entry['content'] -                gen = entry['gen'] -                trans_id = entry['trans_id'] -            except (IndexError, KeyError): -                raise errors.BrokenSyncStream -        return new_generation, new_transaction_id, number_of_changes, \ -            doc_id, rev, content, gen, trans_id - -    def _insert_received_doc(self, idx, total, response): -        """ -        Insert a received document into the local replica. - -        :param idx: The index count of the current operation. -        :type idx: int -        :param total: The total number of operations. -        :type total: int -        :param response: The body and headers of the response. -        :type response: tuple(str, dict) -        """ -        new_generation, new_transaction_id, number_of_changes, doc_id, \ -            rev, content, gen, trans_id = \ -            self._parse_received_doc_response(response) -        if doc_id is not None: -            # decrypt incoming document and insert into local database -            # ------------------------------------------------------------- -            # symmetric decryption of document's contents -            # ------------------------------------------------------------- -            # If arriving content was symmetrically encrypted, we decrypt it. -            # We do it inline if defer_decryption flag is False or no sync_db -            # was defined, otherwise we defer it writing it to the received -            # docs table. -            doc = SoledadDocument(doc_id, rev, content) -            if is_symmetrically_encrypted(doc): -                if self._queue_for_decrypt: -                    self._save_encrypted_received_doc( -                        doc, gen, trans_id, idx, total) -                else: -                    # defer_decryption is False or no-sync-db fallback -                    doc.set_json(decrypt_doc(self._crypto, doc)) -                    self._return_doc_cb(doc, gen, trans_id) -            else: -                # not symmetrically encrypted doc, insert it directly -                # or save it in the decrypted stage. -                if self._queue_for_decrypt: -                    self._save_received_doc(doc, gen, trans_id, idx, total) -                else: -                    self._return_doc_cb(doc, gen, trans_id) -            # ------------------------------------------------------------- -            # end of symmetric decryption -            # ------------------------------------------------------------- -        msg = "%d/%d" % (idx + 1, total) -        signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg) -        logger.debug("Soledad sync receive status: %s" % msg) -        return number_of_changes, new_generation, new_transaction_id - -    def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, -                         headers, return_doc_cb, ensure_callback, sync_id, -                         defer_decryption=False): -        """ -        Fetch sync documents from the remote database and insert them in the -        local database. - -        If an incoming document's encryption scheme is equal to -        EncryptionSchemes.SYMKEY, then this method will decrypt it with -        Soledad's symmetric key. - -        :param url: The syncing URL. -        :type url: str -        :param last_known_generation: Target's last known generation. -        :type last_known_generation: int -        :param last_known_trans_id: Target's last known transaction id. -        :type last_known_trans_id: str -        :param headers: The headers of the HTTP request. -        :type headers: dict -        :param return_doc_cb: A callback to insert docs from target. -        :type return_doc_cb: callable -        :param ensure_callback: A callback to ensure we have the correct -                                target_replica_uid, if it was just created. -        :type ensure_callback: callable -        :param sync_id: The id for the current sync session. -        :type sync_id: str -        :param defer_decryption: Whether to defer the decryption process using -                                 the intermediate database. If False, -                                 decryption will be done inline. -        :type defer_decryption: bool - -        :raise BrokenSyncStream: If `data` is malformed. - -        :return: A dictionary representing the first line of the response got -                 from remote replica. -        :rtype: dict -        """ -        # we keep a reference to the callback in case we defer the decryption -        self._return_doc_cb = return_doc_cb -        self._queue_for_decrypt = defer_decryption \ -            and self._sync_db is not None - -        new_generation = last_known_generation -        new_transaction_id = last_known_trans_id - -        if self._queue_for_decrypt: -            logger.debug( -                "Soledad sync: will queue received docs for decrypting.") - -        idx = 0 -        number_of_changes = 1 - -        first_request = True -        last_callback_lock = None -        threads = [] - -        # get incoming documents -        while idx < number_of_changes: -            # bail out if sync process was interrupted -            if self.stopped is True: -                break - -            # launch a thread to fetch one document from target -            t = self._syncer_pool.new_syncer_thread( -                idx, number_of_changes, -                last_callback_lock=last_callback_lock) - -            # bail out if any thread failed -            if t is None: -                self.stop() -                break - -            t.doc_syncer.set_request_method( -                'get', idx, sync_id, last_known_generation, -                last_known_trans_id) -            t.doc_syncer.set_success_callback(self._insert_received_doc) - -            def _failure_callback(idx, total, exception): -                _failure_msg = "Soledad sync: error while getting document " \ -                    "%d/%d: %s" \ -                    % (idx + 1, total, exception) -                logger.warning("%s" % _failure_msg) -                logger.warning("Soledad sync: failing gracefully, will " -                               "recover on next sync.") - -            t.doc_syncer.set_failure_callback(_failure_callback) -            threads.append(t) -            t.start() -            last_callback_lock = t.callback_lock -            idx += 1 - -            # if this is the first request, wait to update the number of -            # changes -            if first_request is True: -                t.join() -                if t.success: -                    number_of_changes, _, _ = t.result -                else: -                    raise t.exception -                first_request = False - -        # make sure all threads finished and we have up-to-date info -        last_successful_thread = None -        while threads: -            # check if there are failures -            t = threads.pop(0) -            t.join() -            if t.success: -                last_successful_thread = t -            else: -                raise t.exception - -        # get information about last successful thread -        if last_successful_thread is not None: -            body, _ = last_successful_thread.response -            parsed_body = json.loads(body) -            # get current target gen and trans id in case no documents were -            # transferred -            if len(parsed_body) == 1: -                metadata = parsed_body[0] -                new_generation = metadata['new_generation'] -                new_transaction_id = metadata['new_transaction_id'] -            # get current target gen and trans id from last transferred -            # document -            else: -                doc_data = parsed_body[1] -                new_generation = doc_data['gen'] -                new_transaction_id = doc_data['trans_id'] - -        return new_generation, new_transaction_id - -    def sync_exchange(self, docs_by_generations, -                      source_replica_uid, last_known_generation, -                      last_known_trans_id, return_doc_cb, -                      ensure_callback=None, defer_decryption=True, -                      sync_id=None): -        """ -        Find out which documents the remote database does not know about, -        encrypt and send them. - -        This does the same as the parent's method but encrypts content before -        syncing. - -        :param docs_by_generations: A list of (doc_id, generation, trans_id) -                                    of local documents that were changed since -                                    the last local generation the remote -                                    replica knows about. -        :type docs_by_generations: list of tuples - -        :param source_replica_uid: The uid of the source replica. -        :type source_replica_uid: str - -        :param last_known_generation: Target's last known generation. -        :type last_known_generation: int - -        :param last_known_trans_id: Target's last known transaction id. -        :type last_known_trans_id: str - -        :param return_doc_cb: A callback for inserting received documents from -                              target. If not overriden, this will call u1db -                              insert_doc_from_target in synchronizer, which -                              implements the TAKE OTHER semantics. -        :type return_doc_cb: function - -        :param ensure_callback: A callback that ensures we know the target -                                replica uid if the target replica was just -                                created. -        :type ensure_callback: function - -        :param defer_decryption: Whether to defer the decryption process using -                                 the intermediate database. If False, -                                 decryption will be done inline. -        :type defer_decryption: bool - -        :return: The new generation and transaction id of the target replica. -        :rtype: tuple -        """ -        self._ensure_callback = ensure_callback - -        if defer_decryption and self._sync_db is not None: -            self._sync_exchange_lock.acquire() -            self._setup_sync_decr_pool() -            self._setup_sync_loop() -            self._defer_decryption = True -        else: -            # fall back -            defer_decryption = False - -        self.start() - -        if sync_id is None: -            sync_id = str(uuid4()) -        self.source_replica_uid = source_replica_uid -        # let the decrypter pool access the passed callback to insert docs -        setProxiedObject(self._insert_doc_cb[source_replica_uid], -                         return_doc_cb) - -        # empty the database before starting a new sync -        if defer_decryption is True and not self.clear_to_sync(): -            self._sync_decr_pool.empty() - -        self._ensure_connection() -        if self._trace_hook:  # for tests -            self._trace_hook('sync_exchange') -        url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) -        headers = self._sign_request('POST', url, {}) - -        cur_target_gen = last_known_generation -        cur_target_trans_id = last_known_trans_id - -        # send docs -        msg = "%d/%d" % (0, len(docs_by_generations)) -        signal(SOLEDAD_SYNC_SEND_STATUS, msg) -        logger.debug("Soledad sync send status: %s" % msg) - -        defer_encryption = self._sync_db is not None -        self._syncer_pool = DocumentSyncerPool( -            self._raw_url, self._raw_creds, url, headers, ensure_callback, -            self.stop_syncer) -        threads = [] -        last_callback_lock = None -        sent = 0 -        total = len(docs_by_generations) - -        synced = [] -        number_of_docs = len(docs_by_generations) - -        last_request_lock = None -        for doc, gen, trans_id in docs_by_generations: -            # allow for interrupting the sync process -            if self.stopped is True: -                break - -            # skip non-syncable docs -            if isinstance(doc, SoledadDocument) and not doc.syncable: -                continue - -            # ------------------------------------------------------------- -            # symmetric encryption of document's contents -            # ------------------------------------------------------------- -            doc_json = doc.get_json() -            if not doc.is_tombstone(): -                if not defer_encryption: -                    # fallback case, for tests -                    doc_json = encrypt_doc(self._crypto, doc) -                else: -                    try: -                        doc_json = self.get_encrypted_doc_from_db( -                            doc.doc_id, doc.rev) -                    except Exception as exc: -                        logger.error("Error while getting " -                                     "encrypted doc from db") -                        logger.exception(exc) -                        continue -                    if doc_json is None: -                        # Not marked as tombstone, but we got nothing -                        # from the sync db. As it is not encrypted yet, we -                        # force inline encryption. -                        # TODO: implement a queue to deal with these cases. -                        doc_json = encrypt_doc(self._crypto, doc) -            # ------------------------------------------------------------- -            # end of symmetric encryption -            # ------------------------------------------------------------- -            t = self._syncer_pool.new_syncer_thread( -                sent + 1, total, last_request_lock=last_request_lock, -                last_callback_lock=last_callback_lock) - -            # bail out if any thread failed -            if t is None: -                self.stop() -                break - -            # set the request method -            t.doc_syncer.set_request_method( -                'put', sync_id, cur_target_gen, cur_target_trans_id, -                id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, -                trans_id=trans_id, number_of_docs=number_of_docs, -                doc_idx=sent + 1) -            # set the success calback - -            def _success_callback(idx, total, response): -                _success_msg = "Soledad sync send status: %d/%d" \ -                               % (idx, total) -                signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) -                logger.debug(_success_msg) - -            t.doc_syncer.set_success_callback(_success_callback) - -            # set the failure callback -            def _failure_callback(idx, total, exception): -                _failure_msg = "Soledad sync: error while sending document " \ -                               "%d/%d: %s" % (idx, total, exception) -                logger.warning("%s" % _failure_msg) -                logger.warning("Soledad sync: failing gracefully, will " -                               "recover on next sync.") - -            t.doc_syncer.set_failure_callback(_failure_callback) - -            # save thread and append -            t.start() -            threads.append((t, doc)) - -            # update lock references so they can be used in next call to -            # syncer_pool.new_syncer_thread() above -            last_callback_lock = t.callback_lock -            last_request_lock = t.request_lock - -            sent += 1 - -        # make sure all threads finished and we have up-to-date info -        last_successful_thread = None -        while threads: -            # check if there are failures -            t, doc = threads.pop(0) -            t.join() -            if t.success: -                synced.append((doc.doc_id, doc.rev)) -                last_successful_thread = t -            else: -                raise t.exception - -        # delete documents from the sync database -        if defer_encryption: -            self.delete_encrypted_docs_from_db(synced) - -        # get target gen and trans_id after docs -        gen_after_send = None -        trans_id_after_send = None -        if last_successful_thread is not None: -            response_dict = json.loads(last_successful_thread.response[0])[0] -            gen_after_send = response_dict['new_generation'] -            trans_id_after_send = response_dict['new_transaction_id'] - -        # get docs from target -        if self.stopped is False: -            cur_target_gen, cur_target_trans_id = self._get_remote_docs( -                url, -                last_known_generation, last_known_trans_id, headers, -                return_doc_cb, ensure_callback, sync_id, -                defer_decryption=defer_decryption) - -        self._syncer_pool.cleanup() - -        # decrypt docs in case of deferred decryption -        if defer_decryption: -            while not self.clear_to_sync(): -                sleep(self.DECRYPT_LOOP_PERIOD) -            self._teardown_sync_loop() -            self._teardown_sync_decr_pool() -            self._sync_exchange_lock.release() - -        # update gen and trans id info in case we just sent and did not -        # receive docs. -        if gen_after_send is not None and gen_after_send > cur_target_gen: -            cur_target_gen = gen_after_send -            cur_target_trans_id = trans_id_after_send - -        self.stop() -        self._syncer_pool = None -        return cur_target_gen, cur_target_trans_id - -    def start(self): -        """ -        Mark current sync session as running. -        """ -        with self._stop_lock: -            self._stopped = False - - -    def stop_syncer(self): -        with self._stop_lock: -            self._stopped = True - -    def stop(self): -        """ -        Mark current sync session as stopped. - -        This will eventually interrupt the sync_exchange() method and return -        enough information to the synchronizer so the sync session can be -        recovered afterwards. -        """ -        self.stop_syncer() -        if self._syncer_pool: -            self._syncer_pool.cancel_threads() - -    @property -    def stopped(self): -        """ -        Return whether this sync session is stopped. - -        :return: Whether this sync session is stopped. -        :rtype: bool -        """ -        with self._stop_lock: -            return self._stopped is True - -    def get_encrypted_doc_from_db(self, doc_id, doc_rev): -        """ -        Retrieve encrypted document from the database of encrypted docs for -        sync. - -        :param doc_id: The Document id. -        :type doc_id: str - -        :param doc_rev: The document revision -        :type doc_rev: str -        """ -        encr = SyncEncrypterPool -        sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( -            encr.TABLE_NAME,)) -        res = self._fetchall(sql, (doc_id, doc_rev)) -        if res: -            val = res.pop() -            return val[0] -        else: -            # no doc found -            return None - -    def delete_encrypted_docs_from_db(self, docs_ids): -        """ -        Delete several encrypted documents from the database of symmetrically -        encrypted docs to sync. - -        :param docs_ids: an iterable with (doc_id, doc_rev) for all documents -                         to be deleted. -        :type docs_ids: any iterable of tuples of str -        """ -        if docs_ids: -            encr = SyncEncrypterPool -            for doc_id, doc_rev in docs_ids: -                sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( -                    encr.TABLE_NAME,)) -                self._sync_db.execute(sql, (doc_id, doc_rev)) - -    def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): -        """ -        Save a symmetrically encrypted incoming document into the received -        docs table in the sync db. A decryption task will pick it up -        from here in turn. - -        :param doc: The document to save. -        :type doc: SoledadDocument -        :param gen: The generation. -        :type gen: str -        :param  trans_id: Transacion id. -        :type gen: str -        :param idx: The index count of the current operation. -        :type idx: int -        :param total: The total number of operations. -        :type total: int -        """ -        logger.debug( -            "Enqueueing doc for decryption: %d/%d." -            % (idx + 1, total)) -        self._sync_decr_pool.insert_encrypted_received_doc( -            doc.doc_id, doc.rev, doc.content, gen, trans_id) - -    def _save_received_doc(self, doc, gen, trans_id, idx, total): -        """ -        Save any incoming document into the received docs table in the sync db. - -        :param doc: The document to save. -        :type doc: SoledadDocument -        :param gen: The generation. -        :type gen: str -        :param  trans_id: Transacion id. -        :type gen: str -        :param idx: The index count of the current operation. -        :type idx: int -        :param total: The total number of operations. -        :type total: int -        """ -        logger.debug( -            "Enqueueing doc, no decryption needed: %d/%d." -            % (idx + 1, total)) -        self._sync_decr_pool.insert_received_doc( -            doc.doc_id, doc.rev, doc.content, gen, trans_id) - -    # -    # Symmetric decryption of syncing docs -    # - -    def clear_to_sync(self): -        """ -        Return whether sync can proceed (ie, the received db table is empty). - -        :return: Whether sync can proceed. -        :rtype: bool -        """ -        if self._sync_decr_pool: -            return self._sync_decr_pool.count_docs_in_sync_db() == 0 -        return True - -    def set_decryption_callback(self, cb): -        """ -        Set callback to be called when the decryption finishes. - -        :param cb: The callback to be set. -        :type cb: callable -        """ -        self._decryption_callback = cb - -    def has_decryption_callback(self): -        """ -        Return True if there is a decryption callback set. -        :rtype: bool -        """ -        return self._decryption_callback is not None - -    def has_syncdb(self): -        """ -        Return True if we have an initialized syncdb. -        """ -        return self._sync_db is not None - -    def _decrypt_syncing_received_docs(self): -        """ -        Decrypt the documents received from remote replica and insert them -        into the local one. - -        Called periodically from LoopingCall self._sync_loop. -        """ -        if sameProxiedObjects( -                self._insert_doc_cb.get(self.source_replica_uid), -                None): -            return - -        decrypter = self._sync_decr_pool -        decrypter.raise_in_case_of_failed_async_calls() -        decrypter.decrypt_received_docs() -        decrypter.process_decrypted() - -    def _sign_request(self, method, url_query, params): -        """ -        Return an authorization header to be included in the HTTP request. - -        :param method: The HTTP method. -        :type method: str -        :param url_query: The URL query string. -        :type url_query: str -        :param params: A list with encoded query parameters. -        :type param: list - -        :return: The Authorization header. -        :rtype: list of tuple -        """ -        return TokenBasedAuth._sign_request(self, method, url_query, params) - -    def set_token_credentials(self, uuid, token): -        """ -        Store given credentials so we can sign the request later. - -        :param uuid: The user's uuid. -        :type uuid: str -        :param token: The authentication token. -        :type token: str -        """ -        TokenBasedAuth.set_token_credentials(self, uuid, token) - -    def _fetchall(self, *args, **kwargs): -        with self._sync_db: -            c = self._sync_db.cursor() -            c.execute(*args, **kwargs) -            return c.fetchall() diff --git a/scripts/db_access/client_side_db.py b/scripts/db_access/client_side_db.py index d7c54b66..1d8d32e2 100644 --- a/scripts/db_access/client_side_db.py +++ b/scripts/db_access/client_side_db.py @@ -10,6 +10,7 @@ import requests  import srp._pysrp as srp  import binascii  import logging +import json  from twisted.internet import reactor  from twisted.internet.defer import inlineCallbacks @@ -147,6 +148,12 @@ def _parse_args():          '--passphrase', '-p', default=None,          help='the user passphrase')      parser.add_argument( +        '--get-all-docs', '-a', action='store_true', +        help='get all documents from the local database') +    parser.add_argument( +        '--create-doc', '-c', default=None, +        help='create a document with give content') +    parser.add_argument(          '--sync', '-s', action='store_true',          help='synchronize with the server replica')      parser.add_argument( @@ -196,19 +203,34 @@ def _export_incoming_messages(soledad, directory):          i += 1 +@inlineCallbacks +def _get_all_docs(soledad): +    _, docs = yield soledad.get_all_docs() +    for doc in docs: +        print json.dumps(doc.content, indent=4) + +  # main program  @inlineCallbacks  def _main(soledad, km, args): -    if args.sync: -        yield soledad.sync() -    if args.export_private_key: -        yield _export_key(args, km, args.export_private_key, private=True) -    if args.export_public_key: -        yield _export_key(args, km, args.expoert_public_key, private=False) -    if args.export_incoming_messages: -        yield _export_incoming_messages(soledad, args.export_incoming_messages) -    reactor.stop() +    try: +        if args.create_doc: +            yield soledad.create_doc({'content': args.create_doc}) +        if args.sync: +            yield soledad.sync() +        if args.get_all_docs: +            yield _get_all_docs(soledad) +        if args.export_private_key: +            yield _export_key(args, km, args.export_private_key, private=True) +        if args.export_public_key: +            yield _export_key(args, km, args.expoert_public_key, private=False) +        if args.export_incoming_messages: +            yield _export_incoming_messages(soledad, args.export_incoming_messages) +    except: +        pass +    finally: +        reactor.stop()  if __name__ == '__main__': diff --git a/run_tests.sh b/scripts/run_tests.sh index e36466f8..e36466f8 100755 --- a/run_tests.sh +++ b/scripts/run_tests.sh | 
