summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2014-07-15 13:46:42 -0300
committerdrebs <drebs@leap.se>2014-08-08 11:49:02 -0300
commit54a69eb14189e06556af15dcdf5d5ed424778fc2 (patch)
tree8cef9bb22835e2b2644caabb4732d3302a82a917
parent51e0bf7f79a444661b10fe418af85b0a60f41afb (diff)
Store all received docs in sync db (#5895).
-rw-r--r--client/src/leap/soledad/client/crypto.py156
-rw-r--r--client/src/leap/soledad/client/target.py12
2 files changed, 89 insertions, 79 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index d0a5a693..4a73a910 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -256,11 +256,11 @@ class SoledadCrypto(object):
secret = property(
_get_secret, doc='The secret used for symmetric encryption')
+
#
# Crypto utilities for a SoledadDocument.
#
-
def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret):
"""
Calculate a MAC for C{doc} using C{ciphertext}.
@@ -657,26 +657,6 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
return doc_id, doc_rev, decrypted_content, gen, trans_id
-def get_insertable_docs_by_gen(expected, got):
- """
- Return a list of documents ready to be inserted. This list is computed
- by aligning the expected list with the already gotten docs, and returning
- the maximum number of docs that can be processed in the expected order
- before finding a gap.
-
- :param expected: A list of generations to be inserted.
- :type expected: list
-
- :param got: A dictionary whose values are the docs to be inserted.
- :type got: dict
- """
- ordered = [got.get(i) for i in expected]
- if None in ordered:
- return ordered[:ordered.index(None)]
- else:
- return ordered
-
-
class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
Pool of workers that spawn subprocesses to execute the symmetric decryption
@@ -700,10 +680,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
Initialize the decrypter pool, and setup a dict for putting the
results of the decrypted docs until they are picked by the insert
routine that gets them in order.
+
+ :param insert_doc_cb: A callback for inserting received documents from
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
+ :type insert_doc_cb: function
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
"""
self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ self._last_known_generation = kwargs.pop("last_known_generation")
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self.decrypted_docs = {}
self.source_replica_uid = None
def set_source_replica_uid(self, source_replica_uid):
@@ -733,12 +721,14 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type trans_id: str
"""
docstr = json.dumps(content)
+ sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
self.TABLE_NAME,)
con = self._sync_db
with self._sync_db_write_lock:
with con:
+ con.execute(sql_del, (doc_id, ))
con.execute(
sql_ins,
(doc_id, doc_rev, docstr, gen, trans_id, 1))
@@ -760,20 +750,23 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:param trans_id: Transaction ID
:type trans_id: str
"""
- content = json.dumps(content)
+ if not isinstance(content, str):
+ content = json.dumps(content)
+ sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (
+ self.TABLE_NAME,)
sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
self.TABLE_NAME,)
con = self._sync_db
with self._sync_db_write_lock:
with con:
+ con.execute(sql_del, (doc_id,))
con.execute(
sql_ins,
(doc_id, doc_rev, content, gen, trans_id, 0))
- def delete_encrypted_received_doc(self, doc_id, doc_rev):
+ def delete_received_doc(self, doc_id, doc_rev):
"""
- Delete a encrypted received doc after it was inserted into the local
- db.
+ Delete a received doc after it was inserted into the local db.
:param doc_id: Document ID.
:type doc_id: str
@@ -787,7 +780,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
with con:
con.execute(sql_del, (doc_id, doc_rev))
- def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True):
+ def decrypt_doc(self, doc_id, rev, content, gen, trans_id,
+ source_replica_uid, workers=True):
"""
Symmetrically decrypt a document.
@@ -795,6 +789,14 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type doc: str
:param rev: The revision of the document.
:type rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
:param source_replica_uid:
:type source_replica_uid: str
@@ -813,33 +815,14 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
logger.debug("Sync decrypter pool: no insert_doc_cb() yet.")
return
- # XXX move to get_doc function...
- c = self._sync_db.cursor()
- sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % (
- self.TABLE_NAME,)
- try:
- c.execute(sql, (doc_id, rev))
- res = c.fetchone()
- except Exception as exc:
- logger.warning("Error getting docs from syncdb: %r" % (exc,))
- return
- if res is None:
- logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev))
- return
-
soledad_assert(self._crypto is not None, "need a crypto object")
- try:
- doc_id, rev, docstr, gen, trans_id = res
- except ValueError:
- logger.warning("Wrong entry in sync db")
- return
- if len(docstr) == 0:
+ if len(content) == 0:
# not encrypted payload
return
try:
- content = json.loads(docstr)
+ content = json.loads(content)
except TypeError:
logger.warning("Wrong type while decoding json: %s" % repr(docstr))
return
@@ -867,34 +850,61 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
def decrypt_doc_cb(self, result):
"""
- Temporarily store the decryption result in a dictionary where it will
- be picked by process_decrypted.
+ Store the decryption result in the sync db from where it will later be
+ picked by process_decrypted.
:param result: A tuple containing the doc id, revision and encrypted
content.
:type result: tuple(str, str, str)
"""
doc_id, rev, content, gen, trans_id = result
- logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen))
+ logger.debug("Sync decrypter pool: decrypted doc %s: %s %s"
+ % (doc_id, rev, gen))
self.insert_received_doc(doc_id, rev, content, gen, trans_id)
- def get_docs_by_generation(self):
+ def get_docs_by_generation(self, encrypted=None):
"""
Get all documents in the received table from the sync db,
ordered by generation.
- :return: list of doc_id, rev, generation
+ :param encrypted: If not None, only return documents with encrypted
+ field equal to given parameter.
+ :type encrypted: bool
+
+ :return: list of doc_id, rev, generation, gen, trans_id
+ :rtype: list
"""
+ sql = "SELECT doc_id, rev, content, gen, trans_id, encrypted FROM %s" \
+ % self.TABLE_NAME
+ if encrypted is not None:
+ sql += " WHERE encrypted = %d" % int(encrypted)
+ sql += " ORDER BY gen"
c = self._sync_db.cursor()
- sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % (
- self.TABLE_NAME,)
c.execute(sql)
- return c.fetchall()
+ # TODO: due to unknown reasons, the fetchall() method may return empty
+ # values, so we filter them out here. We have to perform some tests to
+ # understand why and when this happens.
+ docs = filter(lambda entry: len(entry) > 0, c.fetchall())
+ return docs
+
+ def get_insertable_docs_by_gen(self):
+ """
+ Return a list of documents ready to be inserted.
+ """
+ docs = self.get_docs_by_generation(encrypted=False)
+ insertable = []
+ if docs:
+ last_gen = self._last_known_generation
+ for doc_id, rev, content, gen, trans_id, _ in docs:
+ if gen != (last_gen + 1):
+ break
+ insertable.append((doc_id, rev, content, gen, trans_id))
+ last_gen = gen
+ return insertable
- def count_received_encrypted_docs(self):
+ def count_docs_in_sync_db(self):
"""
- Count how many documents we have in the table for received and
- encrypted docs.
+ Count how many documents we have in the table for received docs.
:return: The count of documents.
:rtype: int
@@ -916,11 +926,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
Get all the encrypted documents from the sync database and dispatch a
decrypt worker to decrypt each one of them.
"""
- docs_by_generation = self.get_docs_by_generation()
+ docs_by_generation = self.get_docs_by_generation(encrypted=True)
logger.debug("Sync decrypter pool: There are %d documents to " \
"decrypt." % len(docs_by_generation))
- for doc_id, rev, gen in filter(None, docs_by_generation):
- self.decrypt_doc(doc_id, rev, self.source_replica_uid)
+ for doc_id, rev, content, gen, trans_id, _ \
+ in filter(None, docs_by_generation):
+ self.decrypt_doc(
+ doc_id, rev, content, gen, trans_id, self.source_replica_uid)
def process_decrypted(self):
"""
@@ -934,15 +946,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# getting data from the syncing stream, to avoid InvalidGeneration
# problems.
with self.write_encrypted_lock:
- already_decrypted = self.decrypted_docs
- docs = self.get_docs_by_generation()
- docs = filter(lambda entry: len(entry) > 0, docs)
- expected = [gen for doc_id, rev, gen in docs]
- docs_to_insert = get_insertable_docs_by_gen(
- expected, already_decrypted)
- for doc_fields in docs_to_insert:
+ for doc_fields in self.get_insertable_docs_by_gen():
self.insert_decrypted_local_doc(*doc_fields)
- remaining = self.count_received_encrypted_docs()
+ remaining = self.count_docs_in_sync_db()
return remaining == 0
def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
@@ -974,14 +980,14 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if content == 'null':
content = None
doc = SoledadDocument(doc_id, doc_rev, content)
- insert_fun(doc, int(gen), trans_id)
+ gen = int(gen)
+ insert_fun(doc, gen, trans_id)
+ self._last_known_generation = gen
except Exception as exc:
logger.error("Sync decrypter pool: error while inserting "
"decrypted doc into local db.")
logger.exception(exc)
else:
- # If no errors found, remove it from the local temporary dict
- # and from the received database.
- self.decrypted_docs.pop(gen)
- self.delete_encrypted_received_doc(doc_id, doc_rev)
+ # If no errors found, remove it from the received database.
+ self.delete_received_doc(doc_id, doc_rev)
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 70e4d3a2..089a48a0 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -804,16 +804,20 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._sync_db = sync_db
self._sync_db_write_lock = sync_db_write_lock
- def _setup_sync_decr_pool(self):
+ def _setup_sync_decr_pool(self, last_known_generation):
"""
Set up the SyncDecrypterPool for deferred decryption.
+
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
"""
if self._sync_decr_pool is None:
# initialize syncing queue decryption pool
self._sync_decr_pool = SyncDecrypterPool(
self._crypto, self._sync_db,
self._sync_db_write_lock,
- insert_doc_cb=self._insert_doc_cb)
+ insert_doc_cb=self._insert_doc_cb,
+ last_known_generation=last_known_generation)
self._sync_decr_pool.set_source_replica_uid(
self.source_replica_uid)
@@ -1127,7 +1131,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if defer_decryption:
self._sync_exchange_lock.acquire()
- self._setup_sync_decr_pool()
+ self._setup_sync_decr_pool(last_known_generation)
self._setup_sync_watcher()
self._defer_decryption = True
@@ -1402,7 +1406,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:rtype: bool
"""
if self._sync_decr_pool is not None:
- return self._sync_decr_pool.count_received_encrypted_docs() == 0
+ return self._sync_decr_pool.count_docs_in_sync_db() == 0
else:
return True