diff options
| -rw-r--r-- | client/src/leap/soledad/client/crypto.py | 30 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/target.py | 32 | 
2 files changed, 40 insertions, 22 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 4a73a910..5ae5937f 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -690,7 +690,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          :type last_known_generation: int          """          self._insert_doc_cb = kwargs.pop("insert_doc_cb") -        self._last_known_generation = kwargs.pop("last_known_generation")          SyncEncryptDecryptPool.__init__(self, *args, **kwargs)          self.source_replica_uid = None @@ -858,8 +857,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          :type result: tuple(str, str, str)          """          doc_id, rev, content, gen, trans_id = result -        logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" -                     % (doc_id, rev, gen)) +        logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" +                     % (doc_id, rev, gen, trans_id))          self.insert_received_doc(doc_id, rev, content, gen, trans_id)      def get_docs_by_generation(self, encrypted=None): @@ -878,7 +877,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):                % self.TABLE_NAME          if encrypted is not None:              sql += " WHERE encrypted = %d" % int(encrypted) -        sql += " ORDER BY gen" +        sql += " ORDER BY gen ASC"          c = self._sync_db.cursor()          c.execute(sql)          # TODO: due to unknown reasons, the fetchall() method may return empty @@ -891,21 +890,25 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          """          Return a list of documents ready to be inserted.          """ -        docs = self.get_docs_by_generation(encrypted=False) +        all_docs = self.get_docs_by_generation() +        decrypted_docs = self.get_docs_by_generation(encrypted=False)          insertable = [] -        if docs: -            last_gen = self._last_known_generation -            for doc_id, rev, content, gen, trans_id, _ in docs: -                if gen != (last_gen + 1): -                    break +        for doc_id, rev, content, gen, trans_id, encrypted in all_docs: +            next_decrypted = decrypted_docs.pop(0) +            if doc_id == next_decrypted[0]:                  insertable.append((doc_id, rev, content, gen, trans_id)) -                last_gen = gen +            else: +                break          return insertable -    def count_docs_in_sync_db(self): +    def count_docs_in_sync_db(self, encrypted=None):          """          Count how many documents we have in the table for received docs. +        :param encrypted: If not None, return count of documents with +                          encrypted field equal to given parameter. +        :type encrypted: bool +          :return: The count of documents.          :rtype: int          """ @@ -914,6 +917,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):              return          c = self._sync_db.cursor()          sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) +        if encrypted is not None: +            sql += " WHERE encrypted = %d" % int(encrypted)          c.execute(sql)          res = c.fetchone()          if res is not None: @@ -982,7 +987,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):              doc = SoledadDocument(doc_id, doc_rev, content)              gen = int(gen)              insert_fun(doc, gen, trans_id) -            self._last_known_generation = gen          except Exception as exc:              logger.error("Sync decrypter pool: error while inserting "                           "decrypted doc into local db.") diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 089a48a0..032134ec 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -816,8 +816,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              self._sync_decr_pool = SyncDecrypterPool(                  self._crypto, self._sync_db,                  self._sync_db_write_lock, -                insert_doc_cb=self._insert_doc_cb, -                last_known_generation=last_known_generation) +                insert_doc_cb=self._insert_doc_cb)              self._sync_decr_pool.set_source_replica_uid(                  self.source_replica_uid) @@ -1251,15 +1250,26 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              sent += 1          # make sure all threads finished and we have up-to-date info +        last_successful_thread = None          while threads:              # check if there are failures              t, doc = threads.pop(0)              t.join()              if t.success:                  synced.append((doc.doc_id, doc.rev)) +                last_successful_thread = t -        if defer_decryption: -            self._sync_watcher.start() +        # delete documents from the sync database +        if defer_encryption: +            self.delete_encrypted_docs_from_db(synced) + +        # get target gen and trans_id after docs +        gen_after_send = None +        trans_id_after_send = None +        if last_successful_thread is not None: +            response_dict = json.loads(last_successful_thread.response[0])[0] +            gen_after_send = response_dict['new_generation'] +            trans_id_after_send  = response_dict['new_transaction_id']          # get docs from target          if self.stopped is False: @@ -1268,20 +1278,24 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                  last_known_generation, last_known_trans_id, headers,                  return_doc_cb, ensure_callback, sync_id, syncer_pool,                  defer_decryption=defer_decryption) -        syncer_pool.cleanup() -        # delete documents from the sync database -        if defer_encryption: -            self.delete_encrypted_docs_from_db(synced) +        syncer_pool.cleanup() -        # wait for deferred decryption to finish +        # decrypt docs in case of deferred decryption          if defer_decryption: +            self._sync_watcher.start()              while self.clear_to_sync() is False:                  sleep(self.DECRYPT_TASK_PERIOD)              self._teardown_sync_watcher()              self._teardown_sync_decr_pool()              self._sync_exchange_lock.release() +        # update gen and trans id info in case we just sent and did not +        # receive docs. +        if gen_after_send is not None and gen_after_send > cur_target_gen: +            cur_target_gen = gen_after_send +            cur_target_trans_id = trans_id_after_send +          self.stop()          return cur_target_gen, cur_target_trans_id  | 
