From 54a69eb14189e06556af15dcdf5d5ed424778fc2 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 15 Jul 2014 13:46:42 -0300 Subject: Store all received docs in sync db (#5895). --- client/src/leap/soledad/client/crypto.py | 156 ++++++++++++++++--------------- client/src/leap/soledad/client/target.py | 12 ++- 2 files changed, 89 insertions(+), 79 deletions(-) (limited to 'client/src/leap/soledad') diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index d0a5a693..4a73a910 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -256,11 +256,11 @@ class SoledadCrypto(object): secret = property( _get_secret, doc='The secret used for symmetric encryption') + # # Crypto utilities for a SoledadDocument. # - def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret): """ Calculate a MAC for C{doc} using C{ciphertext}. @@ -657,26 +657,6 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): return doc_id, doc_rev, decrypted_content, gen, trans_id -def get_insertable_docs_by_gen(expected, got): - """ - Return a list of documents ready to be inserted. This list is computed - by aligning the expected list with the already gotten docs, and returning - the maximum number of docs that can be processed in the expected order - before finding a gap. - - :param expected: A list of generations to be inserted. - :type expected: list - - :param got: A dictionary whose values are the docs to be inserted. - :type got: dict - """ - ordered = [got.get(i) for i in expected] - if None in ordered: - return ordered[:ordered.index(None)] - else: - return ordered - - class SyncDecrypterPool(SyncEncryptDecryptPool): """ Pool of workers that spawn subprocesses to execute the symmetric decryption @@ -700,10 +680,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): Initialize the decrypter pool, and setup a dict for putting the results of the decrypted docs until they are picked by the insert routine that gets them in order. + + :param insert_doc_cb: A callback for inserting received documents from + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. + :type insert_doc_cb: function + :param last_known_generation: Target's last known generation. + :type last_known_generation: int """ self._insert_doc_cb = kwargs.pop("insert_doc_cb") + self._last_known_generation = kwargs.pop("last_known_generation") SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self.decrypted_docs = {} self.source_replica_uid = None def set_source_replica_uid(self, source_replica_uid): @@ -733,12 +721,14 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type trans_id: str """ docstr = json.dumps(content) + sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( self.TABLE_NAME,) con = self._sync_db with self._sync_db_write_lock: with con: + con.execute(sql_del, (doc_id, )) con.execute( sql_ins, (doc_id, doc_rev, docstr, gen, trans_id, 1)) @@ -760,20 +750,23 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :param trans_id: Transaction ID :type trans_id: str """ - content = json.dumps(content) + if not isinstance(content, str): + content = json.dumps(content) + sql_del = "DELETE FROM '%s' WHERE doc_id=?" % ( + self.TABLE_NAME,) sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( self.TABLE_NAME,) con = self._sync_db with self._sync_db_write_lock: with con: + con.execute(sql_del, (doc_id,)) con.execute( sql_ins, (doc_id, doc_rev, content, gen, trans_id, 0)) - def delete_encrypted_received_doc(self, doc_id, doc_rev): + def delete_received_doc(self, doc_id, doc_rev): """ - Delete a encrypted received doc after it was inserted into the local - db. + Delete a received doc after it was inserted into the local db. :param doc_id: Document ID. :type doc_id: str @@ -787,7 +780,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): with con: con.execute(sql_del, (doc_id, doc_rev)) - def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True): + def decrypt_doc(self, doc_id, rev, content, gen, trans_id, + source_replica_uid, workers=True): """ Symmetrically decrypt a document. @@ -795,6 +789,14 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type doc: str :param rev: The revision of the document. :type rev: str + :param content: The serialized content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification + of that document. + :type trans_id: str :param source_replica_uid: :type source_replica_uid: str @@ -813,33 +815,14 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") return - # XXX move to get_doc function... - c = self._sync_db.cursor() - sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % ( - self.TABLE_NAME,) - try: - c.execute(sql, (doc_id, rev)) - res = c.fetchone() - except Exception as exc: - logger.warning("Error getting docs from syncdb: %r" % (exc,)) - return - if res is None: - logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev)) - return - soledad_assert(self._crypto is not None, "need a crypto object") - try: - doc_id, rev, docstr, gen, trans_id = res - except ValueError: - logger.warning("Wrong entry in sync db") - return - if len(docstr) == 0: + if len(content) == 0: # not encrypted payload return try: - content = json.loads(docstr) + content = json.loads(content) except TypeError: logger.warning("Wrong type while decoding json: %s" % repr(docstr)) return @@ -867,34 +850,61 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): def decrypt_doc_cb(self, result): """ - Temporarily store the decryption result in a dictionary where it will - be picked by process_decrypted. + Store the decryption result in the sync db from where it will later be + picked by process_decrypted. :param result: A tuple containing the doc id, revision and encrypted content. :type result: tuple(str, str, str) """ doc_id, rev, content, gen, trans_id = result - logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen)) + logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" + % (doc_id, rev, gen)) self.insert_received_doc(doc_id, rev, content, gen, trans_id) - def get_docs_by_generation(self): + def get_docs_by_generation(self, encrypted=None): """ Get all documents in the received table from the sync db, ordered by generation. - :return: list of doc_id, rev, generation + :param encrypted: If not None, only return documents with encrypted + field equal to given parameter. + :type encrypted: bool + + :return: list of doc_id, rev, generation, gen, trans_id + :rtype: list """ + sql = "SELECT doc_id, rev, content, gen, trans_id, encrypted FROM %s" \ + % self.TABLE_NAME + if encrypted is not None: + sql += " WHERE encrypted = %d" % int(encrypted) + sql += " ORDER BY gen" c = self._sync_db.cursor() - sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % ( - self.TABLE_NAME,) c.execute(sql) - return c.fetchall() + # TODO: due to unknown reasons, the fetchall() method may return empty + # values, so we filter them out here. We have to perform some tests to + # understand why and when this happens. + docs = filter(lambda entry: len(entry) > 0, c.fetchall()) + return docs + + def get_insertable_docs_by_gen(self): + """ + Return a list of documents ready to be inserted. + """ + docs = self.get_docs_by_generation(encrypted=False) + insertable = [] + if docs: + last_gen = self._last_known_generation + for doc_id, rev, content, gen, trans_id, _ in docs: + if gen != (last_gen + 1): + break + insertable.append((doc_id, rev, content, gen, trans_id)) + last_gen = gen + return insertable - def count_received_encrypted_docs(self): + def count_docs_in_sync_db(self): """ - Count how many documents we have in the table for received and - encrypted docs. + Count how many documents we have in the table for received docs. :return: The count of documents. :rtype: int @@ -916,11 +926,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): Get all the encrypted documents from the sync database and dispatch a decrypt worker to decrypt each one of them. """ - docs_by_generation = self.get_docs_by_generation() + docs_by_generation = self.get_docs_by_generation(encrypted=True) logger.debug("Sync decrypter pool: There are %d documents to " \ "decrypt." % len(docs_by_generation)) - for doc_id, rev, gen in filter(None, docs_by_generation): - self.decrypt_doc(doc_id, rev, self.source_replica_uid) + for doc_id, rev, content, gen, trans_id, _ \ + in filter(None, docs_by_generation): + self.decrypt_doc( + doc_id, rev, content, gen, trans_id, self.source_replica_uid) def process_decrypted(self): """ @@ -934,15 +946,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # getting data from the syncing stream, to avoid InvalidGeneration # problems. with self.write_encrypted_lock: - already_decrypted = self.decrypted_docs - docs = self.get_docs_by_generation() - docs = filter(lambda entry: len(entry) > 0, docs) - expected = [gen for doc_id, rev, gen in docs] - docs_to_insert = get_insertable_docs_by_gen( - expected, already_decrypted) - for doc_fields in docs_to_insert: + for doc_fields in self.get_insertable_docs_by_gen(): self.insert_decrypted_local_doc(*doc_fields) - remaining = self.count_received_encrypted_docs() + remaining = self.count_docs_in_sync_db() return remaining == 0 def insert_decrypted_local_doc(self, doc_id, doc_rev, content, @@ -974,14 +980,14 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): if content == 'null': content = None doc = SoledadDocument(doc_id, doc_rev, content) - insert_fun(doc, int(gen), trans_id) + gen = int(gen) + insert_fun(doc, gen, trans_id) + self._last_known_generation = gen except Exception as exc: logger.error("Sync decrypter pool: error while inserting " "decrypted doc into local db.") logger.exception(exc) else: - # If no errors found, remove it from the local temporary dict - # and from the received database. - self.decrypted_docs.pop(gen) - self.delete_encrypted_received_doc(doc_id, doc_rev) + # If no errors found, remove it from the received database. + self.delete_received_doc(doc_id, doc_rev) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 70e4d3a2..089a48a0 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -804,16 +804,20 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._sync_db = sync_db self._sync_db_write_lock = sync_db_write_lock - def _setup_sync_decr_pool(self): + def _setup_sync_decr_pool(self, last_known_generation): """ Set up the SyncDecrypterPool for deferred decryption. + + :param last_known_generation: Target's last known generation. + :type last_known_generation: int """ if self._sync_decr_pool is None: # initialize syncing queue decryption pool self._sync_decr_pool = SyncDecrypterPool( self._crypto, self._sync_db, self._sync_db_write_lock, - insert_doc_cb=self._insert_doc_cb) + insert_doc_cb=self._insert_doc_cb, + last_known_generation=last_known_generation) self._sync_decr_pool.set_source_replica_uid( self.source_replica_uid) @@ -1127,7 +1131,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if defer_decryption: self._sync_exchange_lock.acquire() - self._setup_sync_decr_pool() + self._setup_sync_decr_pool(last_known_generation) self._setup_sync_watcher() self._defer_decryption = True @@ -1402,7 +1406,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :rtype: bool """ if self._sync_decr_pool is not None: - return self._sync_decr_pool.count_received_encrypted_docs() == 0 + return self._sync_decr_pool.count_docs_in_sync_db() == 0 else: return True -- cgit v1.2.3