From 5e4dae3427f40879156ddfaaaa8f878ab2504ee3 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 23 Jul 2014 10:29:09 -0300 Subject: On sync, fetch all docs before decrypting. --- client/src/leap/soledad/client/crypto.py | 30 +++++++++++++++++------------- client/src/leap/soledad/client/target.py | 32 +++++++++++++++++++++++--------- 2 files changed, 40 insertions(+), 22 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 4a73a910..5ae5937f 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -690,7 +690,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type last_known_generation: int """ self._insert_doc_cb = kwargs.pop("insert_doc_cb") - self._last_known_generation = kwargs.pop("last_known_generation") SyncEncryptDecryptPool.__init__(self, *args, **kwargs) self.source_replica_uid = None @@ -858,8 +857,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type result: tuple(str, str, str) """ doc_id, rev, content, gen, trans_id = result - logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" - % (doc_id, rev, gen)) + logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" + % (doc_id, rev, gen, trans_id)) self.insert_received_doc(doc_id, rev, content, gen, trans_id) def get_docs_by_generation(self, encrypted=None): @@ -878,7 +877,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): % self.TABLE_NAME if encrypted is not None: sql += " WHERE encrypted = %d" % int(encrypted) - sql += " ORDER BY gen" + sql += " ORDER BY gen ASC" c = self._sync_db.cursor() c.execute(sql) # TODO: due to unknown reasons, the fetchall() method may return empty @@ -891,21 +890,25 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ Return a list of documents ready to be inserted. """ - docs = self.get_docs_by_generation(encrypted=False) + all_docs = self.get_docs_by_generation() + decrypted_docs = self.get_docs_by_generation(encrypted=False) insertable = [] - if docs: - last_gen = self._last_known_generation - for doc_id, rev, content, gen, trans_id, _ in docs: - if gen != (last_gen + 1): - break + for doc_id, rev, content, gen, trans_id, encrypted in all_docs: + next_decrypted = decrypted_docs.pop(0) + if doc_id == next_decrypted[0]: insertable.append((doc_id, rev, content, gen, trans_id)) - last_gen = gen + else: + break return insertable - def count_docs_in_sync_db(self): + def count_docs_in_sync_db(self, encrypted=None): """ Count how many documents we have in the table for received docs. + :param encrypted: If not None, return count of documents with + encrypted field equal to given parameter. + :type encrypted: bool + :return: The count of documents. :rtype: int """ @@ -914,6 +917,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): return c = self._sync_db.cursor() sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) + if encrypted is not None: + sql += " WHERE encrypted = %d" % int(encrypted) c.execute(sql) res = c.fetchone() if res is not None: @@ -982,7 +987,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): doc = SoledadDocument(doc_id, doc_rev, content) gen = int(gen) insert_fun(doc, gen, trans_id) - self._last_known_generation = gen except Exception as exc: logger.error("Sync decrypter pool: error while inserting " "decrypted doc into local db.") diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 089a48a0..032134ec 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -816,8 +816,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._sync_decr_pool = SyncDecrypterPool( self._crypto, self._sync_db, self._sync_db_write_lock, - insert_doc_cb=self._insert_doc_cb, - last_known_generation=last_known_generation) + insert_doc_cb=self._insert_doc_cb) self._sync_decr_pool.set_source_replica_uid( self.source_replica_uid) @@ -1251,15 +1250,26 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): sent += 1 # make sure all threads finished and we have up-to-date info + last_successful_thread = None while threads: # check if there are failures t, doc = threads.pop(0) t.join() if t.success: synced.append((doc.doc_id, doc.rev)) + last_successful_thread = t - if defer_decryption: - self._sync_watcher.start() + # delete documents from the sync database + if defer_encryption: + self.delete_encrypted_docs_from_db(synced) + + # get target gen and trans_id after docs + gen_after_send = None + trans_id_after_send = None + if last_successful_thread is not None: + response_dict = json.loads(last_successful_thread.response[0])[0] + gen_after_send = response_dict['new_generation'] + trans_id_after_send = response_dict['new_transaction_id'] # get docs from target if self.stopped is False: @@ -1268,20 +1278,24 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): last_known_generation, last_known_trans_id, headers, return_doc_cb, ensure_callback, sync_id, syncer_pool, defer_decryption=defer_decryption) - syncer_pool.cleanup() - # delete documents from the sync database - if defer_encryption: - self.delete_encrypted_docs_from_db(synced) + syncer_pool.cleanup() - # wait for deferred decryption to finish + # decrypt docs in case of deferred decryption if defer_decryption: + self._sync_watcher.start() while self.clear_to_sync() is False: sleep(self.DECRYPT_TASK_PERIOD) self._teardown_sync_watcher() self._teardown_sync_decr_pool() self._sync_exchange_lock.release() + # update gen and trans id info in case we just sent and did not + # receive docs. + if gen_after_send is not None and gen_after_send > cur_target_gen: + cur_target_gen = gen_after_send + cur_target_trans_id = trans_id_after_send + self.stop() return cur_target_gen, cur_target_trans_id -- cgit v1.2.3