diff options
| -rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 96 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 7 | 
2 files changed, 35 insertions, 68 deletions
| diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 218ebfa9..7d646c51 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -24,6 +24,7 @@ during synchronization.  import json  import logging +from uuid import uuid4  from twisted.internet.task import LoopingCall  from twisted.internet import threads @@ -65,13 +66,9 @@ class SyncEncryptDecryptPool(object):          self._started = False      def start(self): -        if self.running: -            return          self._started = True      def stop(self): -        if not self.running: -            return          self._started = False          # maybe cancel the next delayed call          if self._delayed_call \ @@ -312,7 +309,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):      """      TABLE_NAME = "docs_received"      FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ -                  "trans_id, encrypted, idx" +                  "trans_id, encrypted, idx, sync_id"      """      Period of recurrence of the periodic decrypting task, in seconds. @@ -343,42 +340,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          self._processed_docs = 0          self._last_inserted_idx = 0 -        # initialize db and make sure any database operation happens after -        # db initialization -        self._deferred_init = self._init_db() -        self._wait_init_db('_runOperation', '_runQuery')          self._loop = LoopingCall(self._decrypt_and_recurse) -        self._decrypted_docs_indexes = set() - -    def _wait_init_db(self, *methods): -        """ -        Methods that need to wait for db initialization. - -        :param methods: methods that need to wait for initialization -        :type methods: tuple(str) -        """ -        self._waiting = [] -        self._stored = {} - -        def _restore(_): -            for method in self._stored: -                setattr(self, method, self._stored[method]) -            for d in self._waiting: -                d.callback(None) - -        def _makeWrapper(method): -            def wrapper(*args, **kw): -                d = defer.Deferred() -                d.addCallback(lambda _: self._stored[method](*args, **kw)) -                self._waiting.append(d) -                return d -            return wrapper - -        for method in methods: -            self._stored[method] = getattr(self, method) -            setattr(self, method, _makeWrapper(method)) - -        self._deferred_init.addCallback(_restore)      def start(self, docs_to_process):          """ @@ -391,9 +353,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          :type docs_to_process: int          """          SyncEncryptDecryptPool.start(self) +        self._decrypted_docs_indexes = set() +        self._sync_id = uuid4().hex          self._docs_to_process = docs_to_process          self._deferred = defer.Deferred() -        self._loop.start(self.DECRYPT_LOOP_PERIOD) +        d = self._init_db() +        d.addCallback(lambda _: self._loop.start(self.DECRYPT_LOOP_PERIOD)) +        return d      def stop(self):          if self._loop.running: @@ -401,6 +367,17 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          self._finish()          SyncEncryptDecryptPool.stop(self) +    def _init_db(self): +        """ +        Empty the received docs table of the sync database. + +        :return: A deferred that will fire when the operation in the database +                 has finished. +        :rtype: twisted.internet.defer.Deferred +        """ +        query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,) +        return self._runOperation(query, (self._sync_id,)) +      def _errback(self, failure):          log.err(failure)          self._deferred.errback(failure) @@ -474,10 +451,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          """          if not isinstance(content, str):              content = json.dumps(content) -        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ +        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \                  % self.TABLE_NAME          d = self._runOperation( -            query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) +            query, (doc_id, doc_rev, content, gen, trans_id, 0, +                    idx, self._sync_id))          d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx))          return d @@ -516,8 +494,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          return self.insert_received_doc(              doc_id, rev, content, gen, trans_id, idx) -    def _get_docs(self, encrypted=None, order_by='idx', order='ASC', -                  sequence=None): +    def _get_docs(self, encrypted=None, sequence=None):          """          Get documents from the received docs table in the sync db. @@ -525,9 +502,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):                            field equal to given parameter.          :type encrypted: bool or None          :param order_by: The name of the field to order results. -        :type order_by: str -        :param order: Whether the order should be ASC or DESC. -        :type order: str          :return: A deferred that will fire with the results of the database                   query. @@ -535,15 +509,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          """          query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \                  "idx FROM %s" % self.TABLE_NAME +        parameters = []          if encrypted or sequence: -            query += " WHERE" +            query += " WHERE sync_id = ? and" +            parameters += [self._sync_id]          if encrypted: -            query += " encrypted = %d" % int(encrypted) +            query += " encrypted = ?" +            parameters += [int(encrypted)]          if sequence: -            query += " idx in (" + ', '.join(sequence) + ")" - -        query += " ORDER BY %s %s" % (order_by, order) -        return self._runQuery(query) +            query += " idx in (" + ', '.join('?' * len(sequence)) + ")" +            parameters += [int(i) for i in sequence] +        query += " ORDER BY idx ASC" +        return self._runQuery(query, parameters)      @defer.inlineCallbacks      def _get_insertable_docs(self): @@ -625,17 +602,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          self._last_inserted_idx = idx          self._processed_docs += 1 -    def _init_db(self): -        """ -        Empty the received docs table of the sync database. - -        :return: A deferred that will fire when the operation in the database -                 has finished. -        :rtype: twisted.internet.defer.Deferred -        """ -        query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) -        return self._runOperation(query) -      @defer.inlineCallbacks      def _decrypt_and_recurse(self):          """ diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 9f7a4193..9801c3d9 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -81,9 +81,6 @@ class HTTPDocFetcher(object):              new_generation = ngen              new_transaction_id = ntrans -        if defer_decryption: -            self._sync_decr_pool.start(number_of_changes) -          # ---------------------------------------------------------------------          # maybe receive the rest of the documents          # --------------------------------------------------------------------- @@ -151,6 +148,10 @@ class HTTPDocFetcher(object):          new_generation, new_transaction_id, number_of_changes, doc_id, \              rev, content, gen, trans_id = \              self._parse_received_doc_response(response) + +        if self._sync_decr_pool and not self._sync_decr_pool.running: +            self._sync_decr_pool.start(number_of_changes) +          if doc_id is not None:              # decrypt incoming document and insert into local database              # ------------------------------------------------------------- | 
