summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/crypto.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/crypto.py')
-rw-r--r--client/src/leap/soledad/client/crypto.py263
1 files changed, 128 insertions, 135 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index 7133f804..5e3760b3 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -224,7 +224,7 @@ class SoledadCrypto(object):
The password is derived using HMAC having sha256 as underlying hash
function. The key used for HMAC are the first
- C{soledad.REMOTE_STORAGE_SECRET_KENGTH} bytes of Soledad's storage
+ C{soledad.REMOTE_STORAGE_SECRET_LENGTH} bytes of Soledad's storage
secret stripped from the first MAC_KEY_LENGTH characters. The HMAC
message is C{doc_id}.
@@ -240,9 +240,7 @@ class SoledadCrypto(object):
if self.secret is None:
raise NoSymmetricSecret()
return hmac.new(
- self.secret[
- MAC_KEY_LENGTH:
- self._soledad.REMOTE_STORAGE_SECRET_LENGTH],
+ self.secret[MAC_KEY_LENGTH:],
doc_id,
hashlib.sha256).digest()
@@ -251,16 +249,16 @@ class SoledadCrypto(object):
#
def _get_secret(self):
- return self._soledad.storage_secret
+ return self._soledad.secrets.remote_storage_secret
secret = property(
_get_secret, doc='The secret used for symmetric encryption')
+
#
# Crypto utilities for a SoledadDocument.
#
-
def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret):
"""
Calculate a MAC for C{doc} using C{ciphertext}.
@@ -623,9 +621,8 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
con = self._sync_db
with self._sync_db_write_lock:
- with con:
- con.execute(sql_del, (doc_id, ))
- con.execute(sql_ins, (doc_id, doc_rev, content))
+ con.execute(sql_del, (doc_id, ))
+ con.execute(sql_ins, (doc_id, doc_rev, content))
def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
@@ -657,26 +654,6 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
return doc_id, doc_rev, decrypted_content, gen, trans_id
-def get_insertable_docs_by_gen(expected, got):
- """
- Return a list of documents ready to be inserted. This list is computed
- by aligning the expected list with the already gotten docs, and returning
- the maximum number of docs that can be processed in the expected order
- before finding a gap.
-
- :param expected: A list of generations to be inserted.
- :type expected: list
-
- :param got: A dictionary whose values are the docs to be inserted.
- :type got: dict
- """
- ordered = [got.get(i) for i in expected]
- if None in ordered:
- return ordered[:ordered.index(None)]
- else:
- return ordered
-
-
class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
Pool of workers that spawn subprocesses to execute the symmetric decryption
@@ -691,7 +668,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
# TODO implement throttling to reduce cpu usage??
TABLE_NAME = "docs_received"
- FIELD_NAMES = "doc_id, rev, content, gen, trans_id"
+ FIELD_NAMES = "doc_id, rev, content, gen, trans_id, encrypted"
write_encrypted_lock = threading.Lock()
@@ -700,10 +677,17 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
Initialize the decrypter pool, and setup a dict for putting the
results of the decrypted docs until they are picked by the insert
routine that gets them in order.
+
+ :param insert_doc_cb: A callback for inserting received documents from
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
+ :type insert_doc_cb: function
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
"""
self._insert_doc_cb = kwargs.pop("insert_doc_cb")
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self.decrypted_docs = {}
self.source_replica_uid = None
def set_source_replica_uid(self, source_replica_uid):
@@ -733,36 +717,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type trans_id: str
"""
docstr = json.dumps(content)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
+ sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
self.TABLE_NAME,)
con = self._sync_db
with self._sync_db_write_lock:
- with con:
- con.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id))
-
- def insert_marker_for_received_doc(self, doc_id, doc_rev, gen):
- """
- Insert a marker with the document id, revision and generation on the
- sync db. This document does not have an encrypted payload, so the
- content has already been inserted into the decrypted_docs dictionary
- from where it can be picked following generation order.
- We need to leave here the marker to be able to calculate the expected
- insertion order for a synchronization batch.
-
- :param doc_id: The Document ID.
- :type doc_id: str
- :param doc_rev: The Document Revision
- :param doc_rev: str
- :param gen: the Document Generation
- :type gen: int
- """
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
- self.TABLE_NAME,)
- con = self._sync_db
- with self._sync_db_write_lock:
- with con:
- con.execute(sql_ins, (doc_id, doc_rev, '', gen, ''))
+ con.execute(sql_del, (doc_id, ))
+ con.execute(
+ sql_ins,
+ (doc_id, doc_rev, docstr, gen, trans_id, 1))
def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):
"""
@@ -781,21 +745,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:param trans_id: Transaction ID
:type trans_id: str
"""
- # XXX this need a deeper review / testing.
- # I believe that what I'm doing here is prone to problems
- # if the sync is interrupted (ie, client crash) in the worst possible
- # moment. We would need a recover strategy in that case
- # (or, insert the document in the table all the same, but with a flag
- # saying if the document is sym-encrypted or not),
- content = json.dumps(content)
- result = doc_id, doc_rev, content, gen, trans_id
- self.decrypted_docs[gen] = result
- self.insert_marker_for_received_doc(doc_id, doc_rev, gen)
+ if not isinstance(content, str):
+ content = json.dumps(content)
+ sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (
+ self.TABLE_NAME,)
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
+ self.TABLE_NAME,)
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ con.execute(sql_del, (doc_id,))
+ con.execute(
+ sql_ins,
+ (doc_id, doc_rev, content, gen, trans_id, 0))
- def delete_encrypted_received_doc(self, doc_id, doc_rev):
+ def delete_received_doc(self, doc_id, doc_rev):
"""
- Delete a encrypted received doc after it was inserted into the local
- db.
+ Delete a received doc after it was inserted into the local db.
:param doc_id: Document ID.
:type doc_id: str
@@ -806,10 +771,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self.TABLE_NAME,)
con = self._sync_db
with self._sync_db_write_lock:
- with con:
- con.execute(sql_del, (doc_id, doc_rev))
+ con.execute(sql_del, (doc_id, doc_rev))
- def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True):
+ def decrypt_doc(self, doc_id, rev, content, gen, trans_id,
+ source_replica_uid, workers=True):
"""
Symmetrically decrypt a document.
@@ -817,6 +782,14 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type doc: str
:param rev: The revision of the document.
:type rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
:param source_replica_uid:
:type source_replica_uid: str
@@ -835,35 +808,17 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
logger.debug("Sync decrypter pool: no insert_doc_cb() yet.")
return
- # XXX move to get_doc function...
- c = self._sync_db.cursor()
- sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % (
- self.TABLE_NAME,)
- try:
- c.execute(sql, (doc_id, rev))
- res = c.fetchone()
- except Exception as exc:
- logger.warning("Error getting docs from syncdb: %r" % (exc,))
- return
- if res is None:
- logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev))
- return
-
soledad_assert(self._crypto is not None, "need a crypto object")
- try:
- doc_id, rev, docstr, gen, trans_id = res
- except ValueError:
- logger.warning("Wrong entry in sync db")
- return
- if len(docstr) == 0:
+ if len(content) == 0:
# not encrypted payload
return
try:
- content = json.loads(docstr)
+ content = json.loads(content)
except TypeError:
- logger.warning("Wrong type while decoding json: %s" % repr(docstr))
+ logger.warning("Wrong type while decoding json: %s"
+ % repr(content))
return
key = self._crypto.doc_passphrase(doc_id)
@@ -889,34 +844,71 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
def decrypt_doc_cb(self, result):
"""
- Temporarily store the decryption result in a dictionary where it will
- be picked by process_decrypted.
+ Store the decryption result in the sync db from where it will later be
+ picked by process_decrypted.
:param result: A tuple containing the doc id, revision and encrypted
content.
:type result: tuple(str, str, str)
"""
doc_id, rev, content, gen, trans_id = result
- logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen))
- self.decrypted_docs[gen] = result
+ logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
+ % (doc_id, rev, gen, trans_id))
+ self.insert_received_doc(doc_id, rev, content, gen, trans_id)
- def get_docs_by_generation(self):
+ def get_docs_by_generation(self, encrypted=None):
"""
Get all documents in the received table from the sync db,
ordered by generation.
- :return: list of doc_id, rev, generation
- """
- c = self._sync_db.cursor()
- sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % (
- self.TABLE_NAME,)
- c.execute(sql)
- return c.fetchall()
-
- def count_received_encrypted_docs(self):
- """
- Count how many documents we have in the table for received and
- encrypted docs.
+ :param encrypted: If not None, only return documents with encrypted
+ field equal to given parameter.
+ :type encrypted: bool or None
+
+ :return: list of doc_id, rev, generation, gen, trans_id
+ :rtype: list
+ """
+ sql = "SELECT doc_id, rev, content, gen, trans_id, encrypted FROM %s" \
+ % self.TABLE_NAME
+ if encrypted is not None:
+ sql += " WHERE encrypted = %d" % int(encrypted)
+ sql += " ORDER BY gen ASC"
+ docs = self._sync_db.select(sql)
+ return docs
+
+ def get_insertable_docs_by_gen(self):
+ """
+ Return a list of non-encrypted documents ready to be inserted.
+ """
+ # here, we compare the list of all available docs with the list of
+ # decrypted docs and find the longest common prefix between these two
+ # lists. Note that the order of lists fetch matters: if instead we
+ # first fetch the list of decrypted docs and then the list of all
+ # docs, then some document might have been decrypted between these two
+ # calls, and if it is just the right doc then it might not be caught
+ # by the next loop.
+ all_docs = self.get_docs_by_generation()
+ decrypted_docs = self.get_docs_by_generation(encrypted=False)
+ insertable = []
+ for doc_id, rev, _, gen, trans_id, encrypted in all_docs:
+ try:
+ next_doc_id, _, next_content, _, _, _ = decrypted_docs.next()
+ if doc_id == next_doc_id:
+ content = next_content
+ insertable.append((doc_id, rev, content, gen, trans_id))
+ else:
+ break
+ except StopIteration:
+ break
+ return insertable
+
+ def count_docs_in_sync_db(self, encrypted=None):
+ """
+ Count how many documents we have in the table for received docs.
+
+ :param encrypted: If not None, return count of documents with
+ encrypted field equal to given parameter.
+ :type encrypted: bool or None
:return: The count of documents.
:rtype: int
@@ -924,12 +916,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if self._sync_db is None:
logger.warning("cannot return count with null sync_db")
return
- c = self._sync_db.cursor()
sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
- c.execute(sql)
- res = c.fetchone()
+ if encrypted is not None:
+ sql += " WHERE encrypted = %d" % int(encrypted)
+ res = self._sync_db.select(sql)
if res is not None:
- return res[0]
+ val = res.next()
+ return val[0]
else:
return 0
@@ -938,11 +931,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
Get all the encrypted documents from the sync database and dispatch a
decrypt worker to decrypt each one of them.
"""
- docs_by_generation = self.get_docs_by_generation()
- logger.debug("Sync decrypter pool: There are %d documents to " \
- "decrypt." % len(docs_by_generation))
- for doc_id, rev, gen in filter(None, docs_by_generation):
- self.decrypt_doc(doc_id, rev, self.source_replica_uid)
+ docs_by_generation = self.get_docs_by_generation(encrypted=True)
+ for doc_id, rev, content, gen, trans_id, _ \
+ in filter(None, docs_by_generation):
+ self.decrypt_doc(
+ doc_id, rev, content, gen, trans_id, self.source_replica_uid)
def process_decrypted(self):
"""
@@ -956,15 +949,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# getting data from the syncing stream, to avoid InvalidGeneration
# problems.
with self.write_encrypted_lock:
- already_decrypted = self.decrypted_docs
- docs = self.get_docs_by_generation()
- docs = filter(lambda entry: len(entry) > 0, docs)
- expected = [gen for doc_id, rev, gen in docs]
- docs_to_insert = get_insertable_docs_by_gen(
- expected, already_decrypted)
- for doc_fields in docs_to_insert:
+ for doc_fields in self.get_insertable_docs_by_gen():
self.insert_decrypted_local_doc(*doc_fields)
- remaining = self.count_received_encrypted_docs()
+ remaining = self.count_docs_in_sync_db()
return remaining == 0
def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
@@ -989,21 +976,27 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
# could pass source_replica in params for callback chain
insert_fun = self._insert_doc_cb[self.source_replica_uid]
- logger.debug("Sync decrypter pool: inserting doc in local db: " \
+ logger.debug("Sync decrypter pool: inserting doc in local db: "
"%s:%s %s" % (doc_id, doc_rev, gen))
try:
# convert deleted documents to avoid error on document creation
if content == 'null':
content = None
doc = SoledadDocument(doc_id, doc_rev, content)
- insert_fun(doc, int(gen), trans_id)
+ gen = int(gen)
+ insert_fun(doc, gen, trans_id)
except Exception as exc:
logger.error("Sync decrypter pool: error while inserting "
"decrypted doc into local db.")
logger.exception(exc)
else:
- # If no errors found, remove it from the local temporary dict
- # and from the received database.
- self.decrypted_docs.pop(gen)
- self.delete_encrypted_received_doc(doc_id, doc_rev)
+ # If no errors found, remove it from the received database.
+ self.delete_received_doc(doc_id, doc_rev)
+
+ def empty(self):
+ """
+ Empty the received docs table of the sync database.
+ """
+ sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
+ res = self._sync_db.execute(sql)