diff options
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 96 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 7 |
2 files changed, 35 insertions, 68 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 218ebfa9..7d646c51 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -24,6 +24,7 @@ during synchronization. import json import logging +from uuid import uuid4 from twisted.internet.task import LoopingCall from twisted.internet import threads @@ -65,13 +66,9 @@ class SyncEncryptDecryptPool(object): self._started = False def start(self): - if self.running: - return self._started = True def stop(self): - if not self.running: - return self._started = False # maybe cancel the next delayed call if self._delayed_call \ @@ -312,7 +309,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ TABLE_NAME = "docs_received" FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ - "trans_id, encrypted, idx" + "trans_id, encrypted, idx, sync_id" """ Period of recurrence of the periodic decrypting task, in seconds. @@ -343,42 +340,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._processed_docs = 0 self._last_inserted_idx = 0 - # initialize db and make sure any database operation happens after - # db initialization - self._deferred_init = self._init_db() - self._wait_init_db('_runOperation', '_runQuery') self._loop = LoopingCall(self._decrypt_and_recurse) - self._decrypted_docs_indexes = set() - - def _wait_init_db(self, *methods): - """ - Methods that need to wait for db initialization. - - :param methods: methods that need to wait for initialization - :type methods: tuple(str) - """ - self._waiting = [] - self._stored = {} - - def _restore(_): - for method in self._stored: - setattr(self, method, self._stored[method]) - for d in self._waiting: - d.callback(None) - - def _makeWrapper(method): - def wrapper(*args, **kw): - d = defer.Deferred() - d.addCallback(lambda _: self._stored[method](*args, **kw)) - self._waiting.append(d) - return d - return wrapper - - for method in methods: - self._stored[method] = getattr(self, method) - setattr(self, method, _makeWrapper(method)) - - self._deferred_init.addCallback(_restore) def start(self, docs_to_process): """ @@ -391,9 +353,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type docs_to_process: int """ SyncEncryptDecryptPool.start(self) + self._decrypted_docs_indexes = set() + self._sync_id = uuid4().hex self._docs_to_process = docs_to_process self._deferred = defer.Deferred() - self._loop.start(self.DECRYPT_LOOP_PERIOD) + d = self._init_db() + d.addCallback(lambda _: self._loop.start(self.DECRYPT_LOOP_PERIOD)) + return d def stop(self): if self._loop.running: @@ -401,6 +367,17 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._finish() SyncEncryptDecryptPool.stop(self) + def _init_db(self): + """ + Empty the received docs table of the sync database. + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,) + return self._runOperation(query, (self._sync_id,)) + def _errback(self, failure): log.err(failure) self._deferred.errback(failure) @@ -474,10 +451,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ if not isinstance(content, str): content = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \ % self.TABLE_NAME d = self._runOperation( - query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) + query, (doc_id, doc_rev, content, gen, trans_id, 0, + idx, self._sync_id)) d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx)) return d @@ -516,8 +494,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): return self.insert_received_doc( doc_id, rev, content, gen, trans_id, idx) - def _get_docs(self, encrypted=None, order_by='idx', order='ASC', - sequence=None): + def _get_docs(self, encrypted=None, sequence=None): """ Get documents from the received docs table in the sync db. @@ -525,9 +502,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): field equal to given parameter. :type encrypted: bool or None :param order_by: The name of the field to order results. - :type order_by: str - :param order: Whether the order should be ASC or DESC. - :type order: str :return: A deferred that will fire with the results of the database query. @@ -535,15 +509,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ "idx FROM %s" % self.TABLE_NAME + parameters = [] if encrypted or sequence: - query += " WHERE" + query += " WHERE sync_id = ? and" + parameters += [self._sync_id] if encrypted: - query += " encrypted = %d" % int(encrypted) + query += " encrypted = ?" + parameters += [int(encrypted)] if sequence: - query += " idx in (" + ', '.join(sequence) + ")" - - query += " ORDER BY %s %s" % (order_by, order) - return self._runQuery(query) + query += " idx in (" + ', '.join('?' * len(sequence)) + ")" + parameters += [int(i) for i in sequence] + query += " ORDER BY idx ASC" + return self._runQuery(query, parameters) @defer.inlineCallbacks def _get_insertable_docs(self): @@ -625,17 +602,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._last_inserted_idx = idx self._processed_docs += 1 - def _init_db(self): - """ - Empty the received docs table of the sync database. - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) - return self._runOperation(query) - @defer.inlineCallbacks def _decrypt_and_recurse(self): """ diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 9f7a4193..9801c3d9 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -81,9 +81,6 @@ class HTTPDocFetcher(object): new_generation = ngen new_transaction_id = ntrans - if defer_decryption: - self._sync_decr_pool.start(number_of_changes) - # --------------------------------------------------------------------- # maybe receive the rest of the documents # --------------------------------------------------------------------- @@ -151,6 +148,10 @@ class HTTPDocFetcher(object): new_generation, new_transaction_id, number_of_changes, doc_id, \ rev, content, gen, trans_id = \ self._parse_received_doc_response(response) + + if self._sync_decr_pool and not self._sync_decr_pool.running: + self._sync_decr_pool.start(number_of_changes) + if doc_id is not None: # decrypt incoming document and insert into local database # ------------------------------------------------------------- |