summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/crypto.py30
-rw-r--r--client/src/leap/soledad/client/target.py32
2 files changed, 40 insertions, 22 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index 4a73a910..5ae5937f 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -690,7 +690,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type last_known_generation: int
"""
self._insert_doc_cb = kwargs.pop("insert_doc_cb")
- self._last_known_generation = kwargs.pop("last_known_generation")
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
self.source_replica_uid = None
@@ -858,8 +857,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type result: tuple(str, str, str)
"""
doc_id, rev, content, gen, trans_id = result
- logger.debug("Sync decrypter pool: decrypted doc %s: %s %s"
- % (doc_id, rev, gen))
+ logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
+ % (doc_id, rev, gen, trans_id))
self.insert_received_doc(doc_id, rev, content, gen, trans_id)
def get_docs_by_generation(self, encrypted=None):
@@ -878,7 +877,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
% self.TABLE_NAME
if encrypted is not None:
sql += " WHERE encrypted = %d" % int(encrypted)
- sql += " ORDER BY gen"
+ sql += " ORDER BY gen ASC"
c = self._sync_db.cursor()
c.execute(sql)
# TODO: due to unknown reasons, the fetchall() method may return empty
@@ -891,21 +890,25 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
Return a list of documents ready to be inserted.
"""
- docs = self.get_docs_by_generation(encrypted=False)
+ all_docs = self.get_docs_by_generation()
+ decrypted_docs = self.get_docs_by_generation(encrypted=False)
insertable = []
- if docs:
- last_gen = self._last_known_generation
- for doc_id, rev, content, gen, trans_id, _ in docs:
- if gen != (last_gen + 1):
- break
+ for doc_id, rev, content, gen, trans_id, encrypted in all_docs:
+ next_decrypted = decrypted_docs.pop(0)
+ if doc_id == next_decrypted[0]:
insertable.append((doc_id, rev, content, gen, trans_id))
- last_gen = gen
+ else:
+ break
return insertable
- def count_docs_in_sync_db(self):
+ def count_docs_in_sync_db(self, encrypted=None):
"""
Count how many documents we have in the table for received docs.
+ :param encrypted: If not None, return count of documents with
+ encrypted field equal to given parameter.
+ :type encrypted: bool
+
:return: The count of documents.
:rtype: int
"""
@@ -914,6 +917,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
return
c = self._sync_db.cursor()
sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
+ if encrypted is not None:
+ sql += " WHERE encrypted = %d" % int(encrypted)
c.execute(sql)
res = c.fetchone()
if res is not None:
@@ -982,7 +987,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
doc = SoledadDocument(doc_id, doc_rev, content)
gen = int(gen)
insert_fun(doc, gen, trans_id)
- self._last_known_generation = gen
except Exception as exc:
logger.error("Sync decrypter pool: error while inserting "
"decrypted doc into local db.")
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 089a48a0..032134ec 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -816,8 +816,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._sync_decr_pool = SyncDecrypterPool(
self._crypto, self._sync_db,
self._sync_db_write_lock,
- insert_doc_cb=self._insert_doc_cb,
- last_known_generation=last_known_generation)
+ insert_doc_cb=self._insert_doc_cb)
self._sync_decr_pool.set_source_replica_uid(
self.source_replica_uid)
@@ -1251,15 +1250,26 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
sent += 1
# make sure all threads finished and we have up-to-date info
+ last_successful_thread = None
while threads:
# check if there are failures
t, doc = threads.pop(0)
t.join()
if t.success:
synced.append((doc.doc_id, doc.rev))
+ last_successful_thread = t
- if defer_decryption:
- self._sync_watcher.start()
+ # delete documents from the sync database
+ if defer_encryption:
+ self.delete_encrypted_docs_from_db(synced)
+
+ # get target gen and trans_id after docs
+ gen_after_send = None
+ trans_id_after_send = None
+ if last_successful_thread is not None:
+ response_dict = json.loads(last_successful_thread.response[0])[0]
+ gen_after_send = response_dict['new_generation']
+ trans_id_after_send = response_dict['new_transaction_id']
# get docs from target
if self.stopped is False:
@@ -1268,20 +1278,24 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
last_known_generation, last_known_trans_id, headers,
return_doc_cb, ensure_callback, sync_id, syncer_pool,
defer_decryption=defer_decryption)
- syncer_pool.cleanup()
- # delete documents from the sync database
- if defer_encryption:
- self.delete_encrypted_docs_from_db(synced)
+ syncer_pool.cleanup()
- # wait for deferred decryption to finish
+ # decrypt docs in case of deferred decryption
if defer_decryption:
+ self._sync_watcher.start()
while self.clear_to_sync() is False:
sleep(self.DECRYPT_TASK_PERIOD)
self._teardown_sync_watcher()
self._teardown_sync_decr_pool()
self._sync_exchange_lock.release()
+ # update gen and trans id info in case we just sent and did not
+ # receive docs.
+ if gen_after_send is not None and gen_after_send > cur_target_gen:
+ cur_target_gen = gen_after_send
+ cur_target_trans_id = trans_id_after_send
+
self.stop()
return cur_target_gen, cur_target_trans_id