diff options
| -rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 175 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 64 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sync.py | 4 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/target.py | 217 | 
4 files changed, 275 insertions, 185 deletions
| diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 0466ec5d..0c1f92ea 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -85,6 +85,36 @@ class SyncEncryptDecryptPool(object):          logger.debug("Terminating %s" % (self.__class__.__name__,))          self._pool.terminate() +    def _runOperation(self, query, *args): +        """ +        Run an operation on the sync db. + +        :param query: The query to be executed. +        :type query: str +        :param args: A list of query arguments. +        :type args: list + +        :return: A deferred that will fire when the operation in the database +                 has finished. +        :rtype: twisted.internet.defer.Deferred +        """ +        return self._sync_db.runOperation(query, *args) + +    def _runQuery(self, query, *args): +        """ +        Run a query on the sync db. + +        :param query: The query to be executed. +        :type query: str +        :param args: A list of query arguments. +        :type args: list + +        :return: A deferred that will fire with the results of the database +                 query. +        :rtype: twisted.internet.defer.Deferred +        """ +        return self._sync_db.runQuery(query, *args) +  def encrypt_doc_task(doc_id, doc_rev, content, key, secret):      """ @@ -119,7 +149,50 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):      TABLE_NAME = "docs_tosync"      FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" -    def encrypt_doc(self, doc, workers=True): +    ENCRYPT_LOOP_PERIOD = 0.5 + +    def __init__(self, *args, **kwargs): +        """ +        Initialize the sync encrypter pool. +        """ +        SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + +        self._stopped = False +        self._sync_queue = multiprocessing.Queue() + +        # start the encryption loop +        self._deferred_loop = deferToThread(self._encrypt_docs_loop) +        self._deferred_loop.addCallback( +            lambda _: logger.debug("Finished encrypter thread.")) + +    def enqueue_doc_for_encryption(self, doc): +        """ +        Enqueue a document for encryption. + +        :param doc: The document to be encrypted. +        :type doc: SoledadDocument +        """ +        try: +            self.sync_queue.put_nowait(doc) +        except multiprocessing.Queue.Full: +            # do not asynchronously encrypt this file if the queue is full +            pass + +    def _encrypt_docs_loop(self): +        """ +        Process the syncing queue and send the documents there to be encrypted +        in the sync db. They will be read by the SoledadSyncTarget during the +        sync_exchange. +        """ +        logger.debug("Starting encrypter thread.") +        while not self._stopped: +            try: +                doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD) +                self._encrypt_doc(doc) +            except multiprocessing.Queue.Empty: +                pass + +    def _encrypt_doc(self, doc, workers=True):          """          Symmetrically encrypt a document. @@ -136,20 +209,20 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):          secret = self._crypto.secret          args = doc.doc_id, doc.rev, docstr, key, secret -        try: -            if workers: -                res = self._pool.apply_async( -                    encrypt_doc_task, args, -                    callback=self.encrypt_doc_cb) -            else: -                # encrypt inline +        if workers: +            # encrypt asynchronously +            self._pool.apply_async( +                encrypt_doc_task, args, +                callback=self._encrypt_doc_cb) +        else: +            # encrypt inline +            try:                  res = encrypt_doc_task(*args) -                self.encrypt_doc_cb(res) +                self._encrypt_doc_cb(res) +            except Exception as exc: +                logger.exception(exc) -        except Exception as exc: -            logger.exception(exc) - -    def encrypt_doc_cb(self, result): +    def _encrypt_doc_cb(self, result):          """          Insert results of encryption routine into the local sync database. @@ -158,9 +231,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):          :type result: tuple(str, str, str)          """          doc_id, doc_rev, content = result -        return self.insert_encrypted_local_doc(doc_id, doc_rev, content) +        return self._insert_encrypted_local_doc(doc_id, doc_rev, content) -    def insert_encrypted_local_doc(self, doc_id, doc_rev, content): +    def _insert_encrypted_local_doc(self, doc_id, doc_rev, content):          """          Insert the contents of the encrypted doc into the local sync          database. @@ -174,7 +247,58 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):          """          query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \                  % (self.TABLE_NAME,) -        return self._sync_db.runOperation(query, (doc_id, doc_rev, content)) +        return self._runOperation(query, (doc_id, doc_rev, content)) + +    @defer.inlineCallbacks +    def get_encrypted_doc(self, doc_id, doc_rev): +        """ +        Get an encrypted document from the sync db. + +        :param doc_id: The id of the document. +        :type doc_id: str +        :param doc_rev: The revision of the document. +        :type doc_rev: str + +        :return: A deferred that will fire with the encrypted content of the +                 document or None if the document was not found in the sync +                 db. +        :rtype: twisted.internet.defer.Deferred +        """ +        logger.debug("Trying to get encrypted doc from sync db: %s" % doc_id) +        query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \ +                % self.TABLE_NAME +        result = yield self._runQuery(query, (doc_id, doc_rev)) +        if result: +            val = result.pop() +            defer.returnValue(val[0]) +        defer.returnValue(None) + +    def delete_encrypted_doc(self, doc_id, doc_rev): +        """ +        Delete an encrypted document from the sync db. + +        :param doc_id: The id of the document. +        :type doc_id: str +        :param doc_rev: The revision of the document. +        :type doc_rev: str + +        :return: A deferred that will fire when the operation in the database +                 has finished. +        :rtype: twisted.internet.defer.Deferred +        """ +        query = "DELETE FROM %s WHERE doc_id=? and rev=?" \ +                % self.TABLE_NAME +        self._runOperation(query, (doc_id, doc_rev)) + +    def close(self): +        """ +        Close the encrypter pool. +        """ +        self._stopped = True +        self._sync_queue.close() +        q = self._sync_queue +        del q +        self._sync_queue = None  def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, @@ -275,9 +399,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          d.addCallback(lambda _: self._empty_db.set())          # start the decryption loop -        self._deferred_loop = deferToThread(self._decrypt_and_process_docs) +        self._deferred_loop = deferToThread( +            self._decrypt_and_process_docs_loop)          self._deferred_loop.addCallback( -            lambda _: logger.debug("Finished decryptor thread.")) +            lambda _: logger.debug("Finished decrypter thread."))      def set_docs_to_process(self, docs_to_process):          """ @@ -317,7 +442,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          docstr = json.dumps(content)          query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \                  % self.TABLE_NAME -        return self._sync_db.runOperation( +        return self._runOperation(              query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx))      def insert_received_doc( @@ -348,7 +473,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):              content = json.dumps(content)          query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \                  % self.TABLE_NAME -        return self._sync_db.runOperation( +        return self._runOperation(              query, (doc_id, doc_rev, content, gen, trans_id, 0, idx))      def _delete_received_doc(self, doc_id): @@ -364,7 +489,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          """          query = "DELETE FROM '%s' WHERE doc_id=?" \                  % self.TABLE_NAME -        return self._sync_db.runOperation(query, (doc_id,)) +        return self._runOperation(query, (doc_id,))      def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx,                       workers=True): @@ -474,7 +599,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          if encrypted is not None:              query += " WHERE encrypted = %d" % int(encrypted)          query += " ORDER BY %s %s" % (order_by, order) -        return self._sync_db.runQuery(query) +        return self._runQuery(query)      @defer.inlineCallbacks      def _get_insertable_docs(self): @@ -612,7 +737,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          :rtype: twisted.internet.defer.Deferred          """          query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) -        return self._sync_db.runOperation(query) +        return self._runOperation(query)      def _raise_if_async_fails(self):          """ @@ -627,7 +752,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):                      # re-raise the exception raised by the remote call                      res.get() -    def _decrypt_and_process_docs(self): +    def _decrypt_and_process_docs_loop(self):          """          Decrypt the documents received from remote replica and insert them          into the local one. @@ -668,6 +793,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          """          Wait for the decrypt-and-process loop to finish.          """ +        logger.debug("Waiting for asynchronous decryption of incoming documents...")          self._finished.wait() +        logger.debug("Asynchronous decryption of incoming documents finished.")          if self._exception:              raise self._exception diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 39d5dd0e..16241621 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -42,7 +42,6 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases  handled by Soledad should be created by SQLCipher >= 2.0.  """  import logging -import multiprocessing  import os  import threading  import json @@ -286,10 +285,9 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          :rtype: str          """          doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) - -        # TODO XXX move to API XXX          if self.defer_encryption: -            self.sync_queue.put_nowait(doc) +            # TODO move to api? +            self._sync_enc_pool.enqueue_doc_for_encryption(doc)          return doc_rev      # @@ -429,13 +427,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):      LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db'      """ -    A dictionary that hold locks which avoid multiple sync attempts from the -    same database replica. -    """ -    # XXX We do not need the lock here now. Remove. -    encrypting_lock = threading.Lock() - -    """      Period or recurrence of the Looping Call that will do the encryption to the      syncdb (in seconds).      """ @@ -458,7 +449,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          self._sync_db_key = opts.sync_db_key          self._sync_db = None          self._sync_enc_pool = None -        self.sync_queue = None          # we store syncers in a dictionary indexed by the target URL. We also          # store a hash of the auth info in case auth info expires and we need @@ -468,7 +458,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          #  self._syncers = {'<url>': ('<auth_hash>', syncer), ...}          self._syncers = {} -        self.sync_queue = multiprocessing.Queue()          self.running = False          self._sync_threadpool = None @@ -486,24 +475,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          self._initialize_sync_db(opts)          if defer_encryption: -              # initialize syncing queue encryption pool              self._sync_enc_pool = encdecpool.SyncEncrypterPool(                  self._crypto, self._sync_db) -            # ----------------------------------------------------------------- -            # From the documentation: If f returns a deferred, rescheduling -            # will not take place until the deferred has fired. The result -            # value is ignored. - -            # TODO use this to avoid multiple sync attempts if the sync has not -            # finished! -            # ----------------------------------------------------------------- - -            # XXX this was called sync_watcher --- trace any remnants -            self._sync_loop = LoopingCall(self._encrypt_syncing_docs) -            self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD) -          self.shutdownID = None      @property @@ -703,7 +678,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase):                                    self._replica_uid,                                    creds=creds,                                    crypto=self._crypto, -                                  sync_db=self._sync_db)) +                                  sync_db=self._sync_db, +                                  sync_enc_pool=self._sync_enc_pool))              self._syncers[url] = (h, syncer)          # in order to reuse the same synchronizer multiple times we have to          # reset its state (i.e. the number of documents received from target @@ -715,33 +691,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):      # Symmetric encryption of syncing docs      # -    def _encrypt_syncing_docs(self): -        """ -        Process the syncing queue and send the documents there -        to be encrypted in the sync db. They will be read by the -        SoledadSyncTarget during the sync_exchange. - -        Called periodically from the LoopingCall self._sync_loop. -        """ -        # TODO should return a deferred that would firewhen the encryption is -        # done. See note on __init__ - -        lock = self.encrypting_lock -        # optional wait flag used to avoid blocking -        if not lock.acquire(False): -            return -        else: -            queue = self.sync_queue -            try: -                while not queue.empty(): -                    doc = queue.get_nowait() -                    self._sync_enc_pool.encrypt_doc(doc) - -            except Exception as exc: -                logger.error("Error while  encrypting docs to sync") -                logger.exception(exc) -            finally: -                lock.release()      def get_generation(self):          # FIXME @@ -779,11 +728,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          if self._sync_db is not None:              self._sync_db.close()              self._sync_db = None -        # close the sync queue -        if self.sync_queue is not None: -            self.sync_queue.close() -            del self.sync_queue -            self.sync_queue = None  class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index d3f106da..d4ca4258 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -195,10 +195,6 @@ class SoledadSynchronizer(Synchronizer):                  "my_gen": my_gen              }              self._syncing_info = info -            if defer_decryption and not sync_target.has_syncdb(): -                logger.debug("Sync target has no valid sync db, " -                             "aborting defer_decryption") -                defer_decryption = False              self.complete_sync()          except Exception as e:              logger.error("Soledad sync error: %s" % str(e)) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index f2415218..667aab15 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -28,6 +28,7 @@ import threading  from collections import defaultdict  from time import sleep  from uuid import uuid4 +from functools import partial  import simplejson as json @@ -38,12 +39,12 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase  from zope.proxy import ProxyBase  from zope.proxy import setProxiedObject +from twisted.internet import defer  from leap.soledad.common.document import SoledadDocument  from leap.soledad.client.auth import TokenBasedAuth  from leap.soledad.client.crypto import is_symmetrically_encrypted  from leap.soledad.client.crypto import encrypt_doc, decrypt_doc -from leap.soledad.client.encdecpool import SyncEncrypterPool  from leap.soledad.client.encdecpool import SyncDecrypterPool  from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS  from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS @@ -111,6 +112,7 @@ class DocumentSyncerThread(threading.Thread):          self._exception = None          self._result = None          self._success = False +        self.started = threading.Event()          # a lock so we can signal when we're finished          self._request_lock = threading.Lock()          self._request_lock.acquire() @@ -128,6 +130,8 @@ class DocumentSyncerThread(threading.Thread):          finish before actually performing the request. It also traps any          exception and register any failure with the request.          """ +        self.started.set() +          with self._stop_lock:              if self._stopped is None:                  self._stopped = False @@ -756,7 +760,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):      #      def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, -                 sync_db=None): +                 sync_db=None, sync_enc_pool=None):          """          Initialize the SoledadSyncTarget. @@ -787,8 +791,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          self.source_replica_uid = source_replica_uid          self._syncer_pool = None -        # deferred decryption attributes +        # asynchronous encryption/decryption attributes          self._sync_db = sync_db +        self._sync_enc_pool = sync_enc_pool          self._decryption_callback = None          self._sync_decr_pool = None @@ -796,7 +801,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          """          Set up the SyncDecrypterPool for deferred decryption.          """ -        if self._sync_decr_pool is None: +        if self._sync_decr_pool is None and self._sync_db is not None:              # initialize syncing queue decryption pool              self._sync_decr_pool = SyncDecrypterPool(                  self._crypto, @@ -1018,7 +1023,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                  t.join()                  if t.success:                      number_of_changes, _, _ = t.result -                    if defer_decryption and number_of_changes: +                    if defer_decryption:                          self._sync_decr_pool.set_docs_to_process(                              number_of_changes)                  else: @@ -1060,6 +1065,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          return new_generation, new_transaction_id +    @property +    def _defer_encryption(self): +        return self._sync_enc_pool is not None + +    @property +    def _defer_decryption(self): +        return self._sync_decr_pool is not None +      def sync_exchange(self, docs_by_generations,                        source_replica_uid, last_known_generation,                        last_known_trans_id, return_doc_cb, @@ -1126,17 +1139,19 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          cur_target_gen = last_known_generation          cur_target_trans_id = last_known_trans_id -        # send docs +        # ------------------------------------------------------------------- +        # start of send documents to target +        # -------------------------------------------------------------------          msg = "%d/%d" % (0, len(docs_by_generations))          signal(SOLEDAD_SYNC_SEND_STATUS, msg)          logger.debug("Soledad sync send status: %s" % msg) -        defer_encryption = self._sync_db is not None          self._syncer_pool = DocumentSyncerPool(              self._raw_url, self._raw_creds, url, headers, ensure_callback,              self.stop_syncer)          threads = []          last_callback_lock = None +          sent = 0          total = len(docs_by_generations) @@ -1156,66 +1171,78 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              # -------------------------------------------------------------              # symmetric encryption of document's contents              # ------------------------------------------------------------- -            doc_json = doc.get_json() -            if not doc.is_tombstone(): -                if not defer_encryption: -                    # fallback case, for tests -                    doc_json = encrypt_doc(self._crypto, doc) -                else: -                    try: -                        doc_json = self.get_encrypted_doc_from_db( -                            doc.doc_id, doc.rev) -                    except Exception as exc: -                        logger.error("Error while getting " -                                     "encrypted doc from db") -                        logger.exception(exc) -                        continue + +            # the following var will hold a deferred because we may try to +            # fetch the encrypted document from the sync db +            d = None + +            if doc.is_tombstone(): +                d = defer.succeed(None) +            elif not self._defer_encryption: +                # fallback case, for tests +                d = defer.succeed(encrypt_doc(self._crypto, doc)) +            else: + +                def _maybe_encrypt_doc_inline(doc_json):                      if doc_json is None: -                        # Not marked as tombstone, but we got nothing -                        # from the sync db. As it is not encrypted yet, we -                        # force inline encryption. +                        # the document is not marked as tombstone, but we got +                        # nothing from the sync db. As it is not encrypted +                        # yet, we force inline encryption.                          # TODO: implement a queue to deal with these cases. -                        doc_json = encrypt_doc(self._crypto, doc) +                        return encrypt_doc(self._crypto, doc) +                    return doc_json + +                d = self.get_encrypted_doc_from_db(doc.doc_id, doc.rev) +                d.addCallback(_maybe_encrypt_doc_inline)              # -------------------------------------------------------------              # end of symmetric encryption              # ------------------------------------------------------------- +              t = self._syncer_pool.new_syncer_thread(                  sent + 1, total, last_request_lock=last_request_lock,                  last_callback_lock=last_callback_lock) -            # bail out if any thread failed +            # bail out if creation of any thread failed              if t is None:                  self.stop(fail=True)                  break -            # set the request method -            t.doc_syncer.set_request_method( -                'put', sync_id, cur_target_gen, cur_target_trans_id, -                id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, -                trans_id=trans_id, number_of_docs=number_of_docs, -                doc_idx=sent + 1) -            # set the success calback +            # the following callback will be called when the document's +            # encrypted content is available, either because it was found on +            # the sync db or because it has been encrypted inline. -            def _success_callback(idx, total, response): -                _success_msg = "Soledad sync send status: %d/%d" \ -                               % (idx, total) -                signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) -                logger.debug(_success_msg) +            def _configure_and_start_thread(t, doc_json): +                # set the request method +                t.doc_syncer.set_request_method( +                    'put', sync_id, cur_target_gen, cur_target_trans_id, +                    id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, +                    trans_id=trans_id, number_of_docs=number_of_docs, +                    doc_idx=sent + 1) +                # set the success calback -            t.doc_syncer.set_success_callback(_success_callback) +                def _success_callback(idx, total, response): +                    _success_msg = "Soledad sync send status: %d/%d" \ +                                   % (idx, total) +                    signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) +                    logger.debug(_success_msg) -            # set the failure callback -            def _failure_callback(idx, total, exception): -                _failure_msg = "Soledad sync: error while sending document " \ -                               "%d/%d: %s" % (idx, total, exception) -                logger.warning("%s" % _failure_msg) -                logger.warning("Soledad sync: failing gracefully, will " -                               "recover on next sync.") +                t.doc_syncer.set_success_callback(_success_callback) -            t.doc_syncer.set_failure_callback(_failure_callback) +                # set the failure callback +                def _failure_callback(idx, total, exception): +                    _failure_msg = "Soledad sync: error while sending document " \ +                                   "%d/%d: %s" % (idx, total, exception) +                    logger.warning("%s" % _failure_msg) +                    logger.warning("Soledad sync: failing gracefully, will " +                                   "recover on next sync.") + +                t.doc_syncer.set_failure_callback(_failure_callback) + +                # save thread and append +                t.start() + +            d.addCallback(partial(_configure_and_start_thread, t)) -            # save thread and append -            t.start()              threads.append((t, doc))              # update lock references so they can be used in next call to @@ -1230,6 +1257,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          while threads:              # check if there are failures              t, doc = threads.pop(0) +            t.started.wait()              t.join()              if t.success:                  synced.append((doc.doc_id, doc.rev)) @@ -1238,8 +1266,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                  raise t.exception          # delete documents from the sync database -        if defer_encryption: -            self.delete_encrypted_docs_from_db(synced) +        if self._defer_encryption: +            self._delete_encrypted_docs_from_db(synced)          # get target gen and trans_id after docs          gen_after_send = None @@ -1248,16 +1276,23 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              response_dict = json.loads(last_successful_thread.response[0])[0]              gen_after_send = response_dict['new_generation']              trans_id_after_send = response_dict['new_transaction_id'] - -        # get docs from target -        if self._sync_db is None: -            defer_decryption = False +        # ------------------------------------------------------------------- +        # end of send documents to target +        # ------------------------------------------------------------------- + +        # ------------------------------------------------------------------- +        # start of fetch documents from target +        # ------------------------------------------------------------------- +        defer_decryption = defer_decryption and self._defer_decryption          if self.stopped is False:              cur_target_gen, cur_target_trans_id = self._get_remote_docs(                  url,                  last_known_generation, last_known_trans_id, headers,                  return_doc_cb, ensure_callback, sync_id,                  defer_decryption=defer_decryption) +        # ------------------------------------------------------------------- +        # end of fetch documents from target +        # -------------------------------------------------------------------          self._syncer_pool.cleanup() @@ -1308,6 +1343,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          with self._stop_lock:              return self._stopped is True +    # +    # Symmetric encryption of syncing docs +    # +      def get_encrypted_doc_from_db(self, doc_id, doc_rev):          """          Retrieve encrypted document from the database of encrypted docs for @@ -1318,33 +1357,31 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :param doc_rev: The document revision          :type doc_rev: str + +        :return: A deferred which is fired with the document's encrypted +                 content or None if the document was not found on the sync db. +        :rtype: twisted.internet.defer.Deferred          """ -        encr = SyncEncrypterPool -        sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( -            encr.TABLE_NAME,)) -        res = self._fetchall(sql, (doc_id, doc_rev)) -        if res: -            val = res.pop() -            return val[0] -        else: -            # no doc found -            return None +        logger.debug("Looking for encrypted document on sync db: %s" % doc_id) +        return self._sync_enc_pool.get_encrypted_doc(doc_id, doc_rev) -    def delete_encrypted_docs_from_db(self, docs_ids): +    def _delete_encrypted_docs_from_db(self, docs):          """          Delete several encrypted documents from the database of symmetrically          encrypted docs to sync. -        :param docs_ids: an iterable with (doc_id, doc_rev) for all documents -                         to be deleted. -        :type docs_ids: any iterable of tuples of str +        :param docs: an iterable with (doc_id, doc_rev) for all documents +                     to be deleted. +        :type docs: any iterable of tuples of str          """ -        if docs_ids: -            encr = SyncEncrypterPool -            for doc_id, doc_rev in docs_ids: -                sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( -                    encr.TABLE_NAME,)) -                self._sync_db.execute(sql, (doc_id, doc_rev)) +        for doc_id, doc_rev in docs: +            logger.debug("Removing encrypted document on sync db: %s" +                         % doc_id) +            return self._sync_enc_pool.delete_encrypted_doc(doc_id, doc_rev) + +    # +    # Symmetric decryption of syncing docs +    #      def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total):          """ @@ -1357,15 +1394,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :param gen: The generation.          :type gen: str          :param  trans_id: Transacion id. -        :type gen: str +          :param idx: The index count of the current operation.          :type idx: int          :param total: The total number of operations.          :type total: int          """ -        logger.debug( -            "Enqueueing doc for decryption: %d/%d." -            % (idx + 1, total)) +        logger.debug("Enqueueing doc for decryption: %d/%d." +                     % (idx + 1, total))          self._sync_decr_pool.insert_encrypted_received_doc(              doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) @@ -1384,16 +1420,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :param total: The total number of operations.          :type total: int          """ -        logger.debug( -            "Enqueueing doc, no decryption needed: %d/%d." -            % (idx + 1, total)) +        logger.debug("Enqueueing doc, no decryption needed: %d/%d." +                     % (idx + 1, total))          self._sync_decr_pool.insert_received_doc(              doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) -    # -    # Symmetric decryption of syncing docs -    # -      def set_decryption_callback(self, cb):          """          Set callback to be called when the decryption finishes. @@ -1410,11 +1441,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          """          return self._decryption_callback is not None -    def has_syncdb(self): -        """ -        Return True if we have an initialized syncdb. -        """ -        return self._sync_db is not None +    # +    # Authentication methods +    #      def _sign_request(self, method, url_query, params):          """ @@ -1442,9 +1471,3 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :type token: str          """          TokenBasedAuth.set_token_credentials(self, uuid, token) - -    def _fetchall(self, *args, **kwargs): -        with self._sync_db: -            c = self._sync_db.cursor() -            c.execute(*args, **kwargs) -            return c.fetchall() | 
