From 3a3f2d8ca5b0ae2adb5007577f2d828677ff64e0 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 30 Apr 2015 11:24:21 -0300 Subject: [bug] always initialize sync db Both deferred encryption and decryption rely on a special sync db. Previous to this fix, the sync db was only initialized if a syncer was configured with deferred encryption capabilities. This was a problem when the syncer was not configured like so, but the actual sync method was initiated configured to do deferred decryption. This commit fixes this by always initializing the sync db, so we have the option of doing all combinations of deferred encryption and decryption. --- client/changes/bug_always-initialize-the-sync-db | 2 ++ client/src/leap/soledad/client/sqlcipher.py | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 client/changes/bug_always-initialize-the-sync-db diff --git a/client/changes/bug_always-initialize-the-sync-db b/client/changes/bug_always-initialize-the-sync-db new file mode 100644 index 00000000..2b12989a --- /dev/null +++ b/client/changes/bug_always-initialize-the-sync-db @@ -0,0 +1,2 @@ + o Always initialize the sync db to allow for both asynchronous encryption + and asynchronous decryption when syncing. diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index db3cb5cb..ec7946b7 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -503,8 +503,12 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._db_handle = None self._initialize_main_db() + # the sync_db is used both for deferred encryption and decryption, so + # we want to initialize it anyway to allow for all combinations of + # deferred encryption and decryption configurations. + self._initialize_sync_db(opts) + if defer_encryption: - self._initialize_sync_db(opts) # initialize syncing queue encryption pool self._sync_enc_pool = crypto.SyncEncrypterPool( -- cgit v1.2.3 From 6ab3fe57764c2e5f2a5688d377fe46a51437f0be Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 30 Apr 2015 11:47:10 -0300 Subject: [bug] fix log messages when fetching documents We always got a log message saying "canceling sync threads" in the end of the sync process, even when there was no error during the sync. This commit changes that in a way that we only have that log when the sync was actually cancelled because of an error. --- .../bug_improve-log-when-fetching-documents | 1 + client/src/leap/soledad/client/target.py | 28 ++++++++++++++-------- 2 files changed, 19 insertions(+), 10 deletions(-) create mode 100644 client/changes/bug_improve-log-when-fetching-documents diff --git a/client/changes/bug_improve-log-when-fetching-documents b/client/changes/bug_improve-log-when-fetching-documents new file mode 100644 index 00000000..a67ce028 --- /dev/null +++ b/client/changes/bug_improve-log-when-fetching-documents @@ -0,0 +1 @@ + o Improve log messages when concurrently fetching documents from the server. diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 986bd991..d59923b2 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -300,7 +300,7 @@ class DocumentSyncerPool(object): # we rely on DocumentSyncerThread.run() to release the lock using # self.release_syncer so we can launch a new thread. t = DocumentSyncerThread( - doc_syncer, self.release_syncer, self.cancel_threads, + doc_syncer, self.release_syncer, self.stop_threads, idx, total, last_request_lock=last_request_lock, last_callback_lock=last_callback_lock) @@ -348,17 +348,21 @@ class DocumentSyncerPool(object): self._threads.remove(syncer_thread) self._semaphore_pool.release() - def cancel_threads(self): + def stop_threads(self, fail=True): """ Stop all threads in the pool. + + :param fail: Whether we are stopping because of a failure. + :type fail: bool """ # stop sync self._stop_method() stopped = [] # stop all threads - logger.warning("Soledad sync: cancelling sync threads...") with self._pool_access_lock: - self._failures = True + if fail: + self._failures = True + logger.error("sync failed: cancelling sync threads...") while self._threads: t = self._threads.pop(0) t.stop() @@ -377,7 +381,8 @@ class DocumentSyncerPool(object): self._semaphore_pool.release() except ValueError: break - logger.warning("Soledad sync: cancelled sync threads.") + if fail: + logger.error("Soledad sync: cancelled sync threads.") def cleanup(self): """ @@ -1020,7 +1025,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # bail out if any thread failed if t is None: - self.stop() + self.stop(fail=True) break t.doc_syncer.set_request_method( @@ -1220,7 +1225,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # bail out if any thread failed if t is None: - self.stop() + self.stop(fail=True) break # set the request method @@ -1308,7 +1313,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen = gen_after_send cur_target_trans_id = trans_id_after_send - self.stop() + self.stop(fail=False) self._syncer_pool = None return cur_target_gen, cur_target_trans_id @@ -1324,17 +1329,20 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): with self._stop_lock: self._stopped = True - def stop(self): + def stop(self, fail=False): """ Mark current sync session as stopped. This will eventually interrupt the sync_exchange() method and return enough information to the synchronizer so the sync session can be recovered afterwards. + + :param fail: Whether we are stopping because of a failure. + :type fail: bool """ self.stop_syncer() if self._syncer_pool: - self._syncer_pool.cancel_threads() + self._syncer_pool.stop_threads(fail=fail) @property def stopped(self): -- cgit v1.2.3 From b75bedb065cfbbb2993659d867ef554ff70596ae Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 4 May 2015 13:26:57 -0300 Subject: [bug] fix log messages for secrets in storage --- client/changes/bug_6892_fix-log-message-for-local-secret | 2 ++ client/changes/bug_fix-async-decrypt | 2 ++ client/src/leap/soledad/client/secrets.py | 16 ++++++++++------ run_tests.sh | 3 --- scripts/run_tests.sh | 3 +++ 5 files changed, 17 insertions(+), 9 deletions(-) create mode 100644 client/changes/bug_6892_fix-log-message-for-local-secret create mode 100644 client/changes/bug_fix-async-decrypt delete mode 100755 run_tests.sh create mode 100755 scripts/run_tests.sh diff --git a/client/changes/bug_6892_fix-log-message-for-local-secret b/client/changes/bug_6892_fix-log-message-for-local-secret new file mode 100644 index 00000000..39c13257 --- /dev/null +++ b/client/changes/bug_6892_fix-log-message-for-local-secret @@ -0,0 +1,2 @@ + o Fix the log message when a local secret is not found so it's less + confusing. Closes #6892. diff --git a/client/changes/bug_fix-async-decrypt b/client/changes/bug_fix-async-decrypt new file mode 100644 index 00000000..eb0ce7b5 --- /dev/null +++ b/client/changes/bug_fix-async-decrypt @@ -0,0 +1,2 @@ + o Refactor asynchronous encryption/decryption code to its own file. + o Fix logging and graceful failing when exceptions are raised during sync. diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index af781a26..96f7e906 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -246,22 +246,26 @@ class SoledadSecrets(object): :return: Whether there's a storage secret for symmetric encryption. :rtype: bool """ - if self._secret_id is None or self._secret_id not in self._secrets: + logger.info("Checking if there's a secret in local storage...") + if (self._secret_id is None or self._secret_id not in self._secrets) \ + and os.path.isfile(self._secrets_path): try: self._load_secrets() # try to load from disk except IOError as e: logger.warning( 'IOError while loading secrets from disk: %s' % str(e)) - return False - return self.storage_secret is not None + + if self.storage_secret is not None: + logger.info("Found a secret in local storage.") + return True + + logger.info("Could not find a secret in local storage.") + return False def _load_secrets(self): """ Load storage secrets from local file. """ - # does the file exist in disk? - if not os.path.isfile(self._secrets_path): - raise IOError('File does not exist: %s' % self._secrets_path) # read storage secrets from file content = None with open(self._secrets_path, 'r') as f: diff --git a/run_tests.sh b/run_tests.sh deleted file mode 100755 index e36466f8..00000000 --- a/run_tests.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh -cd common -python setup.py test diff --git a/scripts/run_tests.sh b/scripts/run_tests.sh new file mode 100755 index 00000000..e36466f8 --- /dev/null +++ b/scripts/run_tests.sh @@ -0,0 +1,3 @@ +#!/bin/sh +cd common +python setup.py test -- cgit v1.2.3 From 3a7ddacd06fd57afb10cc3d7083c2aa196c9328f Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 4 May 2015 13:04:56 -0300 Subject: [feature] use async adbapi for async decryption Since we started implementing twisted api in soledad, some pieces are missing. Accessing the sqlcipher database directly with the twisted adbapi facilities is one of them. The async encryption/decryption was touching the database directly, and this was causing some difficulties like having different threads accessing the same database. This commit implements the twisted adbapi stuff for the asynchronous encryption/decryption facilities. Next steps would be use async adbapi for async encryption and use async adbapi for all sqlcipher access. --- .../changes/feature_use-twisted-adbapi-for-sync-db | 1 + client/src/leap/soledad/client/adbapi.py | 3 +- client/src/leap/soledad/client/crypto.py | 190 ++++++++++++--------- client/src/leap/soledad/client/pragmas.py | 43 +++++ client/src/leap/soledad/client/sqlcipher.py | 130 +++++++------- client/src/leap/soledad/client/target.py | 71 +------- 6 files changed, 233 insertions(+), 205 deletions(-) create mode 100644 client/changes/feature_use-twisted-adbapi-for-sync-db diff --git a/client/changes/feature_use-twisted-adbapi-for-sync-db b/client/changes/feature_use-twisted-adbapi-for-sync-db new file mode 100644 index 00000000..41e5e6e3 --- /dev/null +++ b/client/changes/feature_use-twisted-adbapi-for-sync-db @@ -0,0 +1 @@ + o Use twisted.enterprise.adbapi for access to the sync database. diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 7ad10db5..5b882bbe 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -32,6 +32,7 @@ from zope.proxy import ProxyBase, setProxiedObject from pysqlcipher.dbapi2 import OperationalError from leap.soledad.client import sqlcipher as soledad_sqlcipher +from leap.soledad.client.pragmas import set_init_pragmas logger = logging.getLogger(name=__name__) @@ -72,7 +73,7 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher"): :rtype: U1DBConnectionPool """ if openfun is None and driver == "pysqlcipher": - openfun = partial(soledad_sqlcipher.set_init_pragmas, opts=opts) + openfun = partial(set_init_pragmas, opts=opts) return U1DBConnectionPool( "%s.dbapi2" % driver, database=opts.path, check_same_thread=False, cp_openfun=openfun, diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 107bf7f1..dd40b198 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -25,11 +25,15 @@ import json import logging import multiprocessing import threading +import time from pycryptopp.cipher.aes import AES from pycryptopp.cipher.xsalsa20 import XSalsa20 from zope.proxy import sameProxiedObjects +from twisted.internet import defer +from twisted.internet.threads import deferToThread + from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type from leap.soledad.common import crypto @@ -227,7 +231,7 @@ class SoledadCrypto(object): # def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv, - mac_method, secret): + mac_method, secret): """ Calculate a MAC for C{doc} using C{ciphertext}. @@ -378,7 +382,7 @@ def decrypt_doc(crypto, doc): def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, - enc_iv, mac_method, secret, doc_mac): + enc_iv, mac_method, secret, doc_mac): """ Verify that C{doc_mac} is a correct MAC for the given document. @@ -523,7 +527,7 @@ class SyncEncryptDecryptPool(object): """ WORKERS = multiprocessing.cpu_count() - def __init__(self, crypto, sync_db, write_lock): + def __init__(self, crypto, sync_db): """ Initialize the pool of encryption-workers. @@ -540,7 +544,6 @@ class SyncEncryptDecryptPool(object): self._pool = multiprocessing.Pool(self.WORKERS) self._crypto = crypto self._sync_db = sync_db - self._sync_db_write_lock = write_lock def close(self): """ @@ -592,7 +595,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): # TODO implement throttling to reduce cpu usage?? WORKERS = multiprocessing.cpu_count() TABLE_NAME = "docs_tosync" - FIELD_NAMES = "doc_id, rev, content" + FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" def encrypt_doc(self, doc, workers=True): """ @@ -633,8 +636,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :type result: tuple(str, str, str) """ doc_id, doc_rev, content = result - self.insert_encrypted_local_doc(doc_id, doc_rev, content) + return self.insert_encrypted_local_doc(doc_id, doc_rev, content) + @defer.inlineCallbacks def insert_encrypted_local_doc(self, doc_id, doc_rev, content): """ Insert the contents of the encrypted doc into the local sync @@ -652,13 +656,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): # FIXME --- callback should complete immediately since otherwise the # thread which handles the results will get blocked # Right now we're blocking the dispatcher with the writes to sqlite. - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,) - - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, )) - con.execute(sql_ins, (doc_id, doc_rev, content)) + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ + % (self.TABLE_NAME,) + yield self._sync_db.runQuery(query, (doc_id, doc_rev, content)) def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): @@ -704,9 +704,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ # TODO implement throttling to reduce cpu usage?? TABLE_NAME = "docs_received" - FIELD_NAMES = "doc_id, rev, content, gen, trans_id, encrypted" + FIELD_NAMES = "doc_id PRIMARY_KEY, rev, content, gen, trans_id, encrypted" - write_encrypted_lock = threading.Lock() + """ + Period of recurrence of the periodic decrypting task, in seconds. + """ + DECRYPT_LOOP_PERIOD = 0.5 def __init__(self, *args, **kwargs): """ @@ -723,19 +726,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type last_known_generation: int """ self._insert_doc_cb = kwargs.pop("insert_doc_cb") + self.source_replica_uid = kwargs.pop("source_replica_uid") SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self.source_replica_uid = None self._async_results = [] - def set_source_replica_uid(self, source_replica_uid): - """ - Set the source replica uid for this decrypter pool instance. - - :param source_replica_uid: The uid of the source replica. - :type source_replica_uid: str - """ - self.source_replica_uid = source_replica_uid + self._stopped = threading.Event() + self._deferred_loop = deferToThread(self._decrypt_and_process_docs) + self._deferred_loop.addCallback( + lambda _: logger.debug("Finished decryptor thread.")) + @defer.inlineCallbacks def insert_encrypted_received_doc(self, doc_id, doc_rev, content, gen, trans_id): """ @@ -754,17 +754,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type trans_id: str """ docstr = json.dumps(content) - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( self.TABLE_NAME,) + yield self._sync_db.runQuery( + query, + (doc_id, doc_rev, docstr, gen, trans_id, 1)) - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, )) - con.execute( - sql_ins, - (doc_id, doc_rev, docstr, gen, trans_id, 1)) - + @defer.inlineCallbacks def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): """ Insert a document that is not symmetrically encrypted. @@ -784,17 +780,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ if not isinstance(content, str): content = json.dumps(content) - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % ( - self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( self.TABLE_NAME,) - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id,)) - con.execute( - sql_ins, - (doc_id, doc_rev, content, gen, trans_id, 0)) + yield self._sync_db.runQuery( + query, + (doc_id, doc_rev, content, gen, trans_id, 0)) + @defer.inlineCallbacks def delete_received_doc(self, doc_id, doc_rev): """ Delete a received doc after it was inserted into the local db. @@ -806,12 +798,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( self.TABLE_NAME,) - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, doc_rev)) + yield self._sync_db.runQuery(sql_del, (doc_id, doc_rev)) - def decrypt_doc(self, doc_id, rev, content, gen, trans_id, - source_replica_uid, workers=True): + def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, + source_replica_uid, workers=True): """ Symmetrically decrypt a document. @@ -860,16 +850,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # save the async result object so we can inspect it for failures self._async_results.append(self._pool.apply_async( decrypt_doc_task, args, - callback=self.decrypt_doc_cb)) + callback=self._decrypt_doc_cb)) else: # decrypt inline res = decrypt_doc_task(*args) - self.decrypt_doc_cb(res) + self._decrypt_doc_cb(res) - def decrypt_doc_cb(self, result): + def _decrypt_doc_cb(self, result): """ Store the decryption result in the sync db from where it will later be - picked by process_decrypted. + picked by _process_decrypted. :param result: A tuple containing the doc id, revision and encrypted content. @@ -878,7 +868,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): doc_id, rev, content, gen, trans_id = result logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" % (doc_id, rev, gen, trans_id)) - self.insert_received_doc(doc_id, rev, content, gen, trans_id) + return self.insert_received_doc(doc_id, rev, content, gen, trans_id) def get_docs_by_generation(self, encrypted=None): """ @@ -899,6 +889,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): sql += " ORDER BY gen ASC" return self._fetchall(sql) + @defer.inlineCallbacks def get_insertable_docs_by_gen(self): """ Return a list of non-encrypted documents ready to be inserted. @@ -910,8 +901,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # docs, then some document might have been decrypted between these two # calls, and if it is just the right doc then it might not be caught # by the next loop. - all_docs = self.get_docs_by_generation() - decrypted_docs = self.get_docs_by_generation(encrypted=False) + all_docs = yield self.get_docs_by_generation() + decrypted_docs = yield self.get_docs_by_generation(encrypted=False) insertable = [] for doc_id, rev, _, gen, trans_id, encrypted in all_docs: for next_doc_id, _, next_content, _, _, _ in decrypted_docs: @@ -920,9 +911,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): insertable.append((doc_id, rev, content, gen, trans_id)) else: break - return insertable + defer.returnValue(insertable) - def count_docs_in_sync_db(self, encrypted=None): + @defer.inlineCallbacks + def _count_docs_in_sync_db(self, encrypted=None): """ Count how many documents we have in the table for received docs. @@ -933,31 +925,30 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :return: The count of documents. :rtype: int """ - if self._sync_db is None: - logger.warning("cannot return count with null sync_db") - return - sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) + query = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) if encrypted is not None: - sql += " WHERE encrypted = %d" % int(encrypted) - res = self._fetchall(sql) + query += " WHERE encrypted = %d" % int(encrypted) + res = yield self._sync_db.runQuery(query) if res: val = res.pop() - return val[0] + defer.returnValue(val[0]) else: - return 0 + defer.returnValue(0) - def decrypt_received_docs(self): + @defer.inlineCallbacks + def _decrypt_received_docs(self): """ Get all the encrypted documents from the sync database and dispatch a decrypt worker to decrypt each one of them. """ - docs_by_generation = self.get_docs_by_generation(encrypted=True) - for doc_id, rev, content, gen, trans_id, _ \ - in filter(None, docs_by_generation): - self.decrypt_doc( + self._raise_in_case_of_failed_async_calls() + docs_by_generation = yield self.get_docs_by_generation(encrypted=True) + for doc_id, rev, content, gen, trans_id, _ in docs_by_generation: + self._decrypt_doc( doc_id, rev, content, gen, trans_id, self.source_replica_uid) - def process_decrypted(self): + @defer.inlineCallbacks + def _process_decrypted(self): """ Process the already decrypted documents, and insert as many documents as can be taken from the expected order without finding a gap. @@ -968,12 +959,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # Acquire the lock to avoid processing while we're still # getting data from the syncing stream, to avoid InvalidGeneration # problems. - with self.write_encrypted_lock: - for doc_fields in self.get_insertable_docs_by_gen(): - self.insert_decrypted_local_doc(*doc_fields) - remaining = self.count_docs_in_sync_db() - return remaining == 0 + insertable = yield self.get_insertable_docs_by_gen() + for doc_fields in insertable: + yield self.insert_decrypted_local_doc(*doc_fields) + @defer.inlineCallbacks def insert_decrypted_local_doc(self, doc_id, doc_rev, content, gen, trans_id): """ @@ -1007,22 +997,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): insert_fun(doc, gen, trans_id) # If no errors found, remove it from the received database. - self.delete_received_doc(doc_id, doc_rev) + yield self.delete_received_doc(doc_id, doc_rev) + @defer.inlineCallbacks def empty(self): """ Empty the received docs table of the sync database. """ sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) - self._sync_db.execute(sql) + yield self._sync_db.runQuery(sql) + @defer.inlineCallbacks def _fetchall(self, *args, **kwargs): - with self._sync_db: - c = self._sync_db.cursor() - c.execute(*args, **kwargs) - return c.fetchall() + results = yield self._sync_db.runQuery(*args, **kwargs) + defer.returnValue(results) - def raise_in_case_of_failed_async_calls(self): + def _raise_in_case_of_failed_async_calls(self): """ Re-raise any exception raised by an async call. @@ -1033,3 +1023,39 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): if not res.successful(): # re-raise the exception raised by the remote call res.get() + + def _stop_decr_loop(self): + """ + """ + self._stopped.set() + + def close(self): + """ + """ + self._stop_decr_loop() + SyncEncryptDecryptPool.close(self) + + def _decrypt_and_process_docs(self): + """ + Decrypt the documents received from remote replica and insert them + into the local one. + + Called periodically from LoopingCall self._sync_loop. + """ + while not self._stopped.is_set(): + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + continue + self._decrypt_received_docs() + self._process_decrypted() + time.sleep(self.DECRYPT_LOOP_PERIOD) + + def wait(self): + while not self.clear_to_sync(): + time.sleep(self.DECRYPT_LOOP_PERIOD) + + @defer.inlineCallbacks + def clear_to_sync(self): + count = yield self._count_docs_in_sync_db() + defer.returnValue(count == 0) diff --git a/client/src/leap/soledad/client/pragmas.py b/client/src/leap/soledad/client/pragmas.py index 2e9c53a3..55397d10 100644 --- a/client/src/leap/soledad/client/pragmas.py +++ b/client/src/leap/soledad/client/pragmas.py @@ -19,10 +19,53 @@ Different pragmas used in the initialization of the SQLCipher database. """ import logging import string +import threading +import os + +from leap.soledad.common import soledad_assert + logger = logging.getLogger(__name__) +_db_init_lock = threading.Lock() + + +def set_init_pragmas(conn, opts=None, extra_queries=None): + """ + Set the initialization pragmas. + + This includes the crypto pragmas, and any other options that must + be passed early to sqlcipher db. + """ + soledad_assert(opts is not None) + extra_queries = [] if extra_queries is None else extra_queries + with _db_init_lock: + # only one execution path should initialize the db + _set_init_pragmas(conn, opts, extra_queries) + + +def _set_init_pragmas(conn, opts, extra_queries): + + sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') + memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') + nowal = os.environ.get('LEAP_SQLITE_NOWAL') + + set_crypto_pragmas(conn, opts) + + if not nowal: + set_write_ahead_logging(conn) + if sync_off: + set_synchronous_off(conn) + else: + set_synchronous_normal(conn) + if memstore: + set_mem_temp_store(conn) + + for query in extra_queries: + conn.cursor().execute(query) + + def set_crypto_pragmas(db_handle, sqlcipher_opts): """ Set cryptographic params (key, cipher, KDF number of iterations and diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index ec7946b7..4f7ecd1b 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -55,6 +55,7 @@ from hashlib import sha256 from contextlib import contextmanager from collections import defaultdict from httplib import CannotSendRequest +from functools import partial from pysqlcipher import dbapi2 as sqlcipher_dbapi2 @@ -63,6 +64,7 @@ from twisted.internet.task import LoopingCall from twisted.internet.threads import deferToThreadPool from twisted.python.threadpool import ThreadPool from twisted.python import log +from twisted.enterprise import adbapi from leap.soledad.client import crypto from leap.soledad.client.target import SoledadSyncTarget @@ -102,46 +104,14 @@ def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True): conn = sqlcipher_dbapi2.connect( opts.path, check_same_thread=check_same_thread) - set_init_pragmas(conn, opts, extra_queries=on_init) + pragmas.set_init_pragmas(conn, opts, extra_queries=on_init) return conn -_db_init_lock = threading.Lock() - - -def set_init_pragmas(conn, opts=None, extra_queries=None): - """ - Set the initialization pragmas. - - This includes the crypto pragmas, and any other options that must - be passed early to sqlcipher db. - """ - soledad_assert(opts is not None) - extra_queries = [] if extra_queries is None else extra_queries - with _db_init_lock: - # only one execution path should initialize the db - _set_init_pragmas(conn, opts, extra_queries) - - -def _set_init_pragmas(conn, opts, extra_queries): - - sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') - memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') - nowal = os.environ.get('LEAP_SQLITE_NOWAL') - - pragmas.set_crypto_pragmas(conn, opts) - - if not nowal: - pragmas.set_write_ahead_logging(conn) - if sync_off: - pragmas.set_synchronous_off(conn) - else: - pragmas.set_synchronous_normal(conn) - if memstore: - pragmas.set_mem_temp_store(conn) - - for query in extra_queries: - conn.cursor().execute(query) +def initialize_sqlcipher_adbapi_db(opts, extra_queries=None): + from leap.soledad.client import sqlcipher_adbapi + return sqlcipher_adbapi.getConnectionPool( + opts, extra_queries=extra_queries) class SQLCipherOptions(object): @@ -151,22 +121,32 @@ class SQLCipherOptions(object): @classmethod def copy(cls, source, path=None, key=None, create=None, - is_raw_key=None, cipher=None, kdf_iter=None, cipher_page_size=None, - defer_encryption=None, sync_db_key=None): + is_raw_key=None, cipher=None, kdf_iter=None, + cipher_page_size=None, defer_encryption=None, sync_db_key=None): """ Return a copy of C{source} with parameters different than None replaced by new values. """ - return SQLCipherOptions( - path if path else source.path, - key if key else source.key, - create=create if create else source.create, - is_raw_key=is_raw_key if is_raw_key else source.is_raw_key, - cipher=cipher if cipher else source.cipher, - kdf_iter=kdf_iter if kdf_iter else source.kdf_iter, - cipher_page_size=cipher_page_size if cipher_page_size else source.cipher_page_size, - defer_encryption=defer_encryption if defer_encryption else source.defer_encryption, - sync_db_key=sync_db_key if sync_db_key else source.sync_db_key) + local_vars = locals() + args = [] + kwargs = {} + + for name in ["path", "key"]: + val = local_vars[name] + if val is not None: + args.append(val) + else: + args.append(getattr(source, name)) + + for name in ["create", "is_raw_key", "cipher", "kdf_iter", + "cipher_page_size", "defer_encryption", "sync_db_key"]: + val = local_vars[name] + if val is not None: + kwargs[name] = val + else: + kwargs[name] = getattr(source, name) + + return SQLCipherOptions(*args, **kwargs) def __init__(self, path, key, create=True, is_raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024, @@ -478,7 +458,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._sync_db_key = opts.sync_db_key self._sync_db = None - self._sync_db_write_lock = None self._sync_enc_pool = None self.sync_queue = None @@ -490,7 +469,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # self._syncers = {'': ('', syncer), ...} self._syncers = {} - self._sync_db_write_lock = threading.Lock() self.sync_queue = multiprocessing.Queue() self.running = False @@ -512,7 +490,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # initialize syncing queue encryption pool self._sync_enc_pool = crypto.SyncEncrypterPool( - self._crypto, self._sync_db, self._sync_db_write_lock) + self._crypto, self._sync_db) # ----------------------------------------------------------------- # From the documentation: If f returns a deferred, rescheduling @@ -588,11 +566,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # somewhere else sync_opts = SQLCipherOptions.copy( opts, path=sync_db_path, create=True) - self._sync_db = initialize_sqlcipher_db( - sync_opts, on_init=self._sync_db_extra_init, - check_same_thread=False) - pragmas.set_crypto_pragmas(self._sync_db, opts) - # --------------------------------------------------------- + self._sync_db = getConnectionPool( + sync_opts, extra_queries=self._sync_db_extra_init) @property def _sync_db_extra_init(self): @@ -727,7 +702,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): h = sha256(json.dumps([url, creds])).hexdigest() cur_h, syncer = self._syncers.get(url, (None, None)) if syncer is None or h != cur_h: - wlock = self._sync_db_write_lock syncer = SoledadSynchronizer( self, SoledadSyncTarget(url, @@ -735,8 +709,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._replica_uid, creds=creds, crypto=self._crypto, - sync_db=self._sync_db, - sync_db_write_lock=wlock)) + sync_db=self._sync_db)) self._syncers[url] = (h, syncer) # in order to reuse the same synchronizer multiple times we have to # reset its state (i.e. the number of documents received from target @@ -907,3 +880,40 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False, has_conflicts=has_conflicts, syncable=syncable) sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) + + +# +# twisted.enterprise.adbapi SQLCipher implementation +# + +SQLCIPHER_CONNECTION_TIMEOUT = 10 + + +def getConnectionPool(opts, extra_queries=None): + openfun = partial( + pragmas.set_init_pragmas, + opts=opts, + extra_queries=extra_queries) + return SQLCipherConnectionPool( + database=opts.path, + check_same_thread=False, + cp_openfun=openfun, + timeout=SQLCIPHER_CONNECTION_TIMEOUT) + + +class SQLCipherConnection(adbapi.Connection): + pass + + +class SQLCipherTransaction(adbapi.Transaction): + pass + + +class SQLCipherConnectionPool(adbapi.ConnectionPool): + + connectionFactory = SQLCipherConnection + transactionFactory = SQLCipherTransaction + + def __init__(self, *args, **kwargs): + adbapi.ConnectionPool.__init__( + self, "pysqlcipher.dbapi2", *args, **kwargs) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index d59923b2..06cef1ee 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -36,9 +36,8 @@ from u1db.remote import utils, http_errors from u1db.remote.http_target import HTTPSyncTarget from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase from zope.proxy import ProxyBase -from zope.proxy import sameProxiedObjects, setProxiedObject +from zope.proxy import setProxiedObject -from twisted.internet.task import LoopingCall from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth @@ -755,17 +754,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # passed to sync_exchange _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) - """ - Period of recurrence of the periodic decrypting task, in seconds. - """ - DECRYPT_LOOP_PERIOD = 0.5 - # # Modified HTTPSyncTarget methods. # def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, - sync_db=None, sync_db_write_lock=None): + sync_db=None): """ Initialize the SoledadSyncTarget. @@ -786,9 +780,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): instead of retreiving it from the dedicated database. :type sync_db: Sqlite handler - :param sync_db_write_lock: a write lock for controlling concurrent - access to the sync_db - :type sync_db_write_lock: threading.Lock """ HTTPSyncTarget.__init__(self, url, creds) self._raw_url = url @@ -802,14 +793,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._syncer_pool = None # deferred decryption attributes - self._sync_db = None - self._sync_db_write_lock = None + self._sync_db = sync_db self._decryption_callback = None self._sync_decr_pool = None - self._sync_loop = None - if sync_db and sync_db_write_lock is not None: - self._sync_db = sync_db - self._sync_db_write_lock = sync_db_write_lock def _setup_sync_decr_pool(self): """ @@ -818,11 +804,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if self._sync_decr_pool is None: # initialize syncing queue decryption pool self._sync_decr_pool = SyncDecrypterPool( - self._crypto, self._sync_db, - self._sync_db_write_lock, - insert_doc_cb=self._insert_doc_cb) - self._sync_decr_pool.set_source_replica_uid( - self.source_replica_uid) + self._crypto, + self._sync_db, + insert_doc_cb=self._insert_doc_cb, + source_replica_uid=self.source_replica_uid) def _teardown_sync_decr_pool(self): """ @@ -832,23 +817,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._sync_decr_pool.close() self._sync_decr_pool = None - def _setup_sync_loop(self): - """ - Set up the sync loop for deferred decryption. - """ - if self._sync_loop is None: - self._sync_loop = LoopingCall( - self._decrypt_syncing_received_docs) - self._sync_loop.start(self.DECRYPT_LOOP_PERIOD) - - def _teardown_sync_loop(self): - """ - Tear down the sync loop. - """ - if self._sync_loop is not None: - self._sync_loop.stop() - self._sync_loop = None - def _get_replica_uid(self, url): """ Return replica uid from the url, or None. @@ -1138,7 +1106,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if defer_decryption and self._sync_db is not None: self._sync_exchange_lock.acquire() self._setup_sync_decr_pool() - self._setup_sync_loop() self._defer_decryption = True else: # fall back @@ -1301,9 +1268,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # decrypt docs in case of deferred decryption if defer_decryption: - while not self.clear_to_sync(): - sleep(self.DECRYPT_LOOP_PERIOD) - self._teardown_sync_loop() + self._sync_decr_pool.wait() self._teardown_sync_decr_pool() self._sync_exchange_lock.release() @@ -1324,7 +1289,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): with self._stop_lock: self._stopped = False - def stop_syncer(self): with self._stop_lock: self._stopped = True @@ -1449,7 +1413,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :rtype: bool """ if self._sync_decr_pool: - return self._sync_decr_pool.count_docs_in_sync_db() == 0 + return self._sync_decr_pool.clear_to_sync() return True def set_decryption_callback(self, cb): @@ -1474,23 +1438,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ return self._sync_db is not None - def _decrypt_syncing_received_docs(self): - """ - Decrypt the documents received from remote replica and insert them - into the local one. - - Called periodically from LoopingCall self._sync_loop. - """ - if sameProxiedObjects( - self._insert_doc_cb.get(self.source_replica_uid), - None): - return - - decrypter = self._sync_decr_pool - decrypter.raise_in_case_of_failed_async_calls() - decrypter.decrypt_received_docs() - decrypter.process_decrypted() - def _sign_request(self, method, url_query, params): """ Return an authorization header to be included in the HTTP request. -- cgit v1.2.3 From eae4468d99029006cc36a021e82350a0f62f7006 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 7 May 2015 14:49:40 -0300 Subject: [bug] fix order of insertion of decrypted docs This commit actually does some different things: * When doing asynchronous decryption of incoming documents in soledad client during a sync, there was the possibility that a document corresponding to a newer generation would be decrypted and inserted in the local database before a document corresponding to an older generation. When this happened, the metadata about the target database (i.e. its locally-known generation) would be first updated to the newer generation, and then an attempt to insert a document corresponding to an older generation would cause the infamous InvalidGeneration error. To fix that we use the sync-index information that is contained in the sync stream to correctly find the insertable docs to be inserted in the local database, thus avoiding the problem described above. * Refactor the sync encrypt/decrypt pool to its own file. * Fix the use of twisted adbapi with multiprocessing. Closes: #6757. --- .../bug_6757_fix-order-of-insertion-when-syncing | 2 + client/src/leap/soledad/client/crypto.py | 552 ----------------- client/src/leap/soledad/client/encdecpool.py | 673 +++++++++++++++++++++ client/src/leap/soledad/client/sqlcipher.py | 8 +- client/src/leap/soledad/client/target.py | 64 +- scripts/db_access/client_side_db.py | 13 + 6 files changed, 715 insertions(+), 597 deletions(-) create mode 100644 client/changes/bug_6757_fix-order-of-insertion-when-syncing create mode 100644 client/src/leap/soledad/client/encdecpool.py diff --git a/client/changes/bug_6757_fix-order-of-insertion-when-syncing b/client/changes/bug_6757_fix-order-of-insertion-when-syncing new file mode 100644 index 00000000..c0470f5a --- /dev/null +++ b/client/changes/bug_6757_fix-order-of-insertion-when-syncing @@ -0,0 +1,2 @@ + o Fix the order of insertion of documents when using workers for decrypting + incoming documents during a sync. Closes #6757. diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index dd40b198..bdbaa8e0 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -23,21 +23,13 @@ import hmac import hashlib import json import logging -import multiprocessing -import threading -import time from pycryptopp.cipher.aes import AES from pycryptopp.cipher.xsalsa20 import XSalsa20 -from zope.proxy import sameProxiedObjects - -from twisted.internet import defer -from twisted.internet.threads import deferToThread from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type from leap.soledad.common import crypto -from leap.soledad.common.document import SoledadDocument logger = logging.getLogger(__name__) @@ -515,547 +507,3 @@ def is_symmetrically_encrypted(doc): == crypto.EncryptionSchemes.SYMKEY: return True return False - - -# -# Encrypt/decrypt pools of workers -# - -class SyncEncryptDecryptPool(object): - """ - Base class for encrypter/decrypter pools. - """ - WORKERS = multiprocessing.cpu_count() - - def __init__(self, crypto, sync_db): - """ - Initialize the pool of encryption-workers. - - :param crypto: A SoledadCryto instance to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - - :param sync_db: A database connection handle - :type sync_db: pysqlcipher.dbapi2.Connection - - :param write_lock: a write lock for controlling concurrent access - to the sync_db - :type write_lock: threading.Lock - """ - self._pool = multiprocessing.Pool(self.WORKERS) - self._crypto = crypto - self._sync_db = sync_db - - def close(self): - """ - Cleanly close the pool of workers. - """ - logger.debug("Closing %s" % (self.__class__.__name__,)) - self._pool.close() - try: - self._pool.join() - except Exception: - pass - - def terminate(self): - """ - Terminate the pool of workers. - """ - logger.debug("Terminating %s" % (self.__class__.__name__,)) - self._pool.terminate() - - -def encrypt_doc_task(doc_id, doc_rev, content, key, secret): - """ - Encrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad storage secret (used for MAC auth). - :type secret: str - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - encrypted_content = encrypt_docstr( - content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, encrypted_content - - -class SyncEncrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric encryption - of documents to be synced. - """ - # TODO implement throttling to reduce cpu usage?? - WORKERS = multiprocessing.cpu_count() - TABLE_NAME = "docs_tosync" - FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" - - def encrypt_doc(self, doc, workers=True): - """ - Symmetrically encrypt a document. - - :param doc: The document with contents to be encrypted. - :type doc: SoledadDocument - - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool - """ - soledad_assert(self._crypto is not None, "need a crypto object") - docstr = doc.get_json() - key = self._crypto.doc_passphrase(doc.doc_id) - secret = self._crypto.secret - args = doc.doc_id, doc.rev, docstr, key, secret - - try: - if workers: - res = self._pool.apply_async( - encrypt_doc_task, args, - callback=self.encrypt_doc_cb) - else: - # encrypt inline - res = encrypt_doc_task(*args) - self.encrypt_doc_cb(res) - - except Exception as exc: - logger.exception(exc) - - def encrypt_doc_cb(self, result): - """ - Insert results of encryption routine into the local sync database. - - :param result: A tuple containing the doc id, revision and encrypted - content. - :type result: tuple(str, str, str) - """ - doc_id, doc_rev, content = result - return self.insert_encrypted_local_doc(doc_id, doc_rev, content) - - @defer.inlineCallbacks - def insert_encrypted_local_doc(self, doc_id, doc_rev, content): - """ - Insert the contents of the encrypted doc into the local sync - database. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param content: The encrypted document. - :type content: str - """ - # FIXME --- callback should complete immediately since otherwise the - # thread which handles the results will get blocked - # Right now we're blocking the dispatcher with the writes to sqlite. - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ - % (self.TABLE_NAME,) - yield self._sync_db.runQuery(query, (doc_id, doc_rev, content)) - - -def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): - """ - Decrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The encrypted content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification of - that document. - :type trans_id: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad storage secret (used for MAC auth). - :type secret: str - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - decrypted_content = decrypt_doc_dict( - content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, decrypted_content, gen, trans_id - - -class SyncDecrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric decryption - of documents that were received. - - The decryption of the received documents is done in two steps: - - 1. All the encrypted docs are collected, together with their generation - and transaction-id - 2. The docs are enqueued for decryption. When completed, they are - inserted following the generation order. - """ - # TODO implement throttling to reduce cpu usage?? - TABLE_NAME = "docs_received" - FIELD_NAMES = "doc_id PRIMARY_KEY, rev, content, gen, trans_id, encrypted" - - """ - Period of recurrence of the periodic decrypting task, in seconds. - """ - DECRYPT_LOOP_PERIOD = 0.5 - - def __init__(self, *args, **kwargs): - """ - Initialize the decrypter pool, and setup a dict for putting the - results of the decrypted docs until they are picked by the insert - routine that gets them in order. - - :param insert_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type insert_doc_cb: function - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - """ - self._insert_doc_cb = kwargs.pop("insert_doc_cb") - self.source_replica_uid = kwargs.pop("source_replica_uid") - SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self._async_results = [] - - self._stopped = threading.Event() - self._deferred_loop = deferToThread(self._decrypt_and_process_docs) - self._deferred_loop.addCallback( - lambda _: logger.debug("Finished decryptor thread.")) - - @defer.inlineCallbacks - def insert_encrypted_received_doc(self, doc_id, doc_rev, content, - gen, trans_id): - """ - Insert a received message with encrypted content, to be decrypted later - on. - - :param doc_id: The Document ID. - :type doc_id: str - :param doc_rev: The Document Revision - :param doc_rev: str - :param content: the Content of the document - :type content: str - :param gen: the Document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - """ - docstr = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( - self.TABLE_NAME,) - yield self._sync_db.runQuery( - query, - (doc_id, doc_rev, docstr, gen, trans_id, 1)) - - @defer.inlineCallbacks - def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): - """ - Insert a document that is not symmetrically encrypted. - We store it in the staging area (the decrypted_docs dictionary) to be - picked up in order as the preceding documents are decrypted. - - :param doc_id: The Document ID. - :type doc_id: str - :param doc_rev: The Document Revision - :param doc_rev: str - :param content: the Content of the document - :type content: str - :param gen: the Document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - """ - if not isinstance(content, str): - content = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( - self.TABLE_NAME,) - yield self._sync_db.runQuery( - query, - (doc_id, doc_rev, content, gen, trans_id, 0)) - - @defer.inlineCallbacks - def delete_received_doc(self, doc_id, doc_rev): - """ - Delete a received doc after it was inserted into the local db. - - :param doc_id: Document ID. - :type doc_id: str - :param doc_rev: Document revision. - :type doc_rev: str - """ - sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( - self.TABLE_NAME,) - yield self._sync_db.runQuery(sql_del, (doc_id, doc_rev)) - - def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, - source_replica_uid, workers=True): - """ - Symmetrically decrypt a document. - - :param doc_id: The ID for the document with contents to be encrypted. - :type doc: str - :param rev: The revision of the document. - :type rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - :param source_replica_uid: - :type source_replica_uid: str - - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool - """ - self.source_replica_uid = source_replica_uid - - # insert_doc_cb is a proxy object that gets updated with the right - # insert function only when the sync_target invokes the sync_exchange - # method. so, if we don't still have a non-empty callback, we refuse - # to proceed. - if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), - None): - logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") - return - - soledad_assert(self._crypto is not None, "need a crypto object") - - if len(content) == 0: - # not encrypted payload - return - - content = json.loads(content) - key = self._crypto.doc_passphrase(doc_id) - secret = self._crypto.secret - args = doc_id, rev, content, gen, trans_id, key, secret - - if workers: - # save the async result object so we can inspect it for failures - self._async_results.append(self._pool.apply_async( - decrypt_doc_task, args, - callback=self._decrypt_doc_cb)) - else: - # decrypt inline - res = decrypt_doc_task(*args) - self._decrypt_doc_cb(res) - - def _decrypt_doc_cb(self, result): - """ - Store the decryption result in the sync db from where it will later be - picked by _process_decrypted. - - :param result: A tuple containing the doc id, revision and encrypted - content. - :type result: tuple(str, str, str) - """ - doc_id, rev, content, gen, trans_id = result - logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" - % (doc_id, rev, gen, trans_id)) - return self.insert_received_doc(doc_id, rev, content, gen, trans_id) - - def get_docs_by_generation(self, encrypted=None): - """ - Get all documents in the received table from the sync db, - ordered by generation. - - :param encrypted: If not None, only return documents with encrypted - field equal to given parameter. - :type encrypted: bool or None - - :return: list of doc_id, rev, generation, gen, trans_id - :rtype: list - """ - sql = "SELECT doc_id, rev, content, gen, trans_id, encrypted FROM %s" \ - % self.TABLE_NAME - if encrypted is not None: - sql += " WHERE encrypted = %d" % int(encrypted) - sql += " ORDER BY gen ASC" - return self._fetchall(sql) - - @defer.inlineCallbacks - def get_insertable_docs_by_gen(self): - """ - Return a list of non-encrypted documents ready to be inserted. - """ - # here, we compare the list of all available docs with the list of - # decrypted docs and find the longest common prefix between these two - # lists. Note that the order of lists fetch matters: if instead we - # first fetch the list of decrypted docs and then the list of all - # docs, then some document might have been decrypted between these two - # calls, and if it is just the right doc then it might not be caught - # by the next loop. - all_docs = yield self.get_docs_by_generation() - decrypted_docs = yield self.get_docs_by_generation(encrypted=False) - insertable = [] - for doc_id, rev, _, gen, trans_id, encrypted in all_docs: - for next_doc_id, _, next_content, _, _, _ in decrypted_docs: - if doc_id == next_doc_id: - content = next_content - insertable.append((doc_id, rev, content, gen, trans_id)) - else: - break - defer.returnValue(insertable) - - @defer.inlineCallbacks - def _count_docs_in_sync_db(self, encrypted=None): - """ - Count how many documents we have in the table for received docs. - - :param encrypted: If not None, return count of documents with - encrypted field equal to given parameter. - :type encrypted: bool or None - - :return: The count of documents. - :rtype: int - """ - query = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) - if encrypted is not None: - query += " WHERE encrypted = %d" % int(encrypted) - res = yield self._sync_db.runQuery(query) - if res: - val = res.pop() - defer.returnValue(val[0]) - else: - defer.returnValue(0) - - @defer.inlineCallbacks - def _decrypt_received_docs(self): - """ - Get all the encrypted documents from the sync database and dispatch a - decrypt worker to decrypt each one of them. - """ - self._raise_in_case_of_failed_async_calls() - docs_by_generation = yield self.get_docs_by_generation(encrypted=True) - for doc_id, rev, content, gen, trans_id, _ in docs_by_generation: - self._decrypt_doc( - doc_id, rev, content, gen, trans_id, self.source_replica_uid) - - @defer.inlineCallbacks - def _process_decrypted(self): - """ - Process the already decrypted documents, and insert as many documents - as can be taken from the expected order without finding a gap. - - :return: Whether we have processed all the pending docs. - :rtype: bool - """ - # Acquire the lock to avoid processing while we're still - # getting data from the syncing stream, to avoid InvalidGeneration - # problems. - insertable = yield self.get_insertable_docs_by_gen() - for doc_fields in insertable: - yield self.insert_decrypted_local_doc(*doc_fields) - - @defer.inlineCallbacks - def insert_decrypted_local_doc(self, doc_id, doc_rev, content, - gen, trans_id): - """ - Insert the decrypted document into the local sqlcipher database. - Makes use of the passed callback `return_doc_cb` passed to the caller - by u1db sync. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - """ - # could pass source_replica in params for callback chain - insert_fun = self._insert_doc_cb[self.source_replica_uid] - logger.debug("Sync decrypter pool: inserting doc in local db: " - "%s:%s %s" % (doc_id, doc_rev, gen)) - - # convert deleted documents to avoid error on document creation - if content == 'null': - content = None - doc = SoledadDocument(doc_id, doc_rev, content) - gen = int(gen) - insert_fun(doc, gen, trans_id) - - # If no errors found, remove it from the received database. - yield self.delete_received_doc(doc_id, doc_rev) - - @defer.inlineCallbacks - def empty(self): - """ - Empty the received docs table of the sync database. - """ - sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) - yield self._sync_db.runQuery(sql) - - @defer.inlineCallbacks - def _fetchall(self, *args, **kwargs): - results = yield self._sync_db.runQuery(*args, **kwargs) - defer.returnValue(results) - - def _raise_in_case_of_failed_async_calls(self): - """ - Re-raise any exception raised by an async call. - - :raise Exception: Raised if an async call has raised an exception. - """ - for res in self._async_results: - if res.ready(): - if not res.successful(): - # re-raise the exception raised by the remote call - res.get() - - def _stop_decr_loop(self): - """ - """ - self._stopped.set() - - def close(self): - """ - """ - self._stop_decr_loop() - SyncEncryptDecryptPool.close(self) - - def _decrypt_and_process_docs(self): - """ - Decrypt the documents received from remote replica and insert them - into the local one. - - Called periodically from LoopingCall self._sync_loop. - """ - while not self._stopped.is_set(): - if sameProxiedObjects( - self._insert_doc_cb.get(self.source_replica_uid), - None): - continue - self._decrypt_received_docs() - self._process_decrypted() - time.sleep(self.DECRYPT_LOOP_PERIOD) - - def wait(self): - while not self.clear_to_sync(): - time.sleep(self.DECRYPT_LOOP_PERIOD) - - @defer.inlineCallbacks - def clear_to_sync(self): - count = yield self._count_docs_in_sync_db() - defer.returnValue(count == 0) diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py new file mode 100644 index 00000000..0466ec5d --- /dev/null +++ b/client/src/leap/soledad/client/encdecpool.py @@ -0,0 +1,673 @@ +# -*- coding: utf-8 -*- +# encdecpool.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +A pool of encryption/decryption concurrent and parallel workers for using +during synchronization. +""" + + +import multiprocessing +import threading +import time +import json +import logging + +from zope.proxy import sameProxiedObjects + +from twisted.internet import defer +from twisted.internet.threads import deferToThread + +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common import soledad_assert + +from leap.soledad.client.crypto import encrypt_docstr +from leap.soledad.client.crypto import decrypt_doc_dict + + +logger = logging.getLogger(__name__) + + +# +# Encrypt/decrypt pools of workers +# + +class SyncEncryptDecryptPool(object): + """ + Base class for encrypter/decrypter pools. + """ + WORKERS = multiprocessing.cpu_count() + + def __init__(self, crypto, sync_db): + """ + Initialize the pool of encryption-workers. + + :param crypto: A SoledadCryto instance to perform the encryption. + :type crypto: leap.soledad.crypto.SoledadCrypto + + :param sync_db: A database connection handle + :type sync_db: pysqlcipher.dbapi2.Connection + """ + self._pool = multiprocessing.Pool(self.WORKERS) + self._crypto = crypto + self._sync_db = sync_db + + def close(self): + """ + Cleanly close the pool of workers. + """ + logger.debug("Closing %s" % (self.__class__.__name__,)) + self._pool.close() + try: + self._pool.join() + except Exception: + pass + + def terminate(self): + """ + Terminate the pool of workers. + """ + logger.debug("Terminating %s" % (self.__class__.__name__,)) + self._pool.terminate() + + +def encrypt_doc_task(doc_id, doc_rev, content, key, secret): + """ + Encrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad storage secret (used for MAC auth). + :type secret: str + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + encrypted_content = encrypt_docstr( + content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, encrypted_content + + +class SyncEncrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric encryption + of documents to be synced. + """ + # TODO implement throttling to reduce cpu usage?? + WORKERS = multiprocessing.cpu_count() + TABLE_NAME = "docs_tosync" + FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" + + def encrypt_doc(self, doc, workers=True): + """ + Symmetrically encrypt a document. + + :param doc: The document with contents to be encrypted. + :type doc: SoledadDocument + + :param workers: Whether to defer the decryption to the multiprocess + pool of workers. Useful for debugging purposes. + :type workers: bool + """ + soledad_assert(self._crypto is not None, "need a crypto object") + docstr = doc.get_json() + key = self._crypto.doc_passphrase(doc.doc_id) + secret = self._crypto.secret + args = doc.doc_id, doc.rev, docstr, key, secret + + try: + if workers: + res = self._pool.apply_async( + encrypt_doc_task, args, + callback=self.encrypt_doc_cb) + else: + # encrypt inline + res = encrypt_doc_task(*args) + self.encrypt_doc_cb(res) + + except Exception as exc: + logger.exception(exc) + + def encrypt_doc_cb(self, result): + """ + Insert results of encryption routine into the local sync database. + + :param result: A tuple containing the doc id, revision and encrypted + content. + :type result: tuple(str, str, str) + """ + doc_id, doc_rev, content = result + return self.insert_encrypted_local_doc(doc_id, doc_rev, content) + + def insert_encrypted_local_doc(self, doc_id, doc_rev, content): + """ + Insert the contents of the encrypted doc into the local sync + database. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + """ + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ + % (self.TABLE_NAME,) + return self._sync_db.runOperation(query, (doc_id, doc_rev, content)) + + +def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, + idx): + """ + Decrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The encrypted content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification of + that document. + :type trans_id: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad storage secret (used for MAC auth). + :type secret: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, decrypted_content, gen, trans_id, idx + + +class SyncDecrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric decryption + of documents that were received. + + The decryption of the received documents is done in two steps: + + 1. Encrypted documents are stored in the sync db by the actual soledad + sync loop. + 2. The soledad sync loop tells us how many documents we should expect + to process. + 3. We start a decrypt-and-process loop: + + a. Encrypted documents are fetched. + b. Encrypted documents are decrypted. + c. The longest possible list of decrypted documents are inserted + in the soledad db (this depends on which documents have already + arrived and which documents have already been decrypte, because + the order of insertion in the local soledad db matters). + d. Processed documents are deleted from the database. + + 4. When we have processed as many documents as we should, the loop + finishes. + """ + # TODO implement throttling to reduce cpu usage?? + TABLE_NAME = "docs_received" + FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ + "trans_id, encrypted, idx" + + """ + Period of recurrence of the periodic decrypting task, in seconds. + """ + DECRYPT_LOOP_PERIOD = 0.5 + + def __init__(self, *args, **kwargs): + """ + Initialize the decrypter pool, and setup a dict for putting the + results of the decrypted docs until they are picked by the insert + routine that gets them in order. + + :param insert_doc_cb: A callback for inserting received documents from + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. + :type insert_doc_cb: function + :param source_replica_uid: The source replica uid, used to find the + correct callback for inserting documents. + :type source_replica_uid: str + """ + self._insert_doc_cb = kwargs.pop("insert_doc_cb") + self.source_replica_uid = kwargs.pop("source_replica_uid") + SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + + self._last_inserted_idx = 0 + self._docs_to_process = None + self._processed_docs = 0 + + self._async_results = [] + self._exception = None + self._finished = threading.Event() + + # clear the database before starting the sync + self._empty_db = threading.Event() + d = self._empty() + d.addCallback(lambda _: self._empty_db.set()) + + # start the decryption loop + self._deferred_loop = deferToThread(self._decrypt_and_process_docs) + self._deferred_loop.addCallback( + lambda _: logger.debug("Finished decryptor thread.")) + + def set_docs_to_process(self, docs_to_process): + """ + Set the number of documents we expect to process. + + This should be called by the during the sync exchange process as soon + as we know how many documents are arriving from the server. + + :param docs_to_process: The number of documents to process. + :type docs_to_process: int + """ + self._docs_to_process = docs_to_process + + def insert_encrypted_received_doc( + self, doc_id, doc_rev, content, gen, trans_id, idx): + """ + Insert a received message with encrypted content, to be decrypted later + on. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + docstr = json.dumps(content) + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ + % self.TABLE_NAME + return self._sync_db.runOperation( + query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx)) + + def insert_received_doc( + self, doc_id, doc_rev, content, gen, trans_id, idx): + """ + Insert a document that is not symmetrically encrypted. + We store it in the staging area (the decrypted_docs dictionary) to be + picked up in order as the preceding documents are decrypted. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + if not isinstance(content, str): + content = json.dumps(content) + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ + % self.TABLE_NAME + return self._sync_db.runOperation( + query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) + + def _delete_received_doc(self, doc_id): + """ + Delete a received doc after it was inserted into the local db. + + :param doc_id: Document ID. + :type doc_id: str + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM '%s' WHERE doc_id=?" \ + % self.TABLE_NAME + return self._sync_db.runOperation(query, (doc_id,)) + + def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx, + workers=True): + """ + Symmetrically decrypt a document and store in the sync db. + + :param doc_id: The ID for the document with contents to be encrypted. + :type doc: str + :param rev: The revision of the document. + :type rev: str + :param content: The serialized content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification + of that document. + :type trans_id: str + :param idx: The index of this document in the current sync process. + :type idx: int + :param workers: Whether to defer the decryption to the multiprocess + pool of workers. Useful for debugging purposes. + :type workers: bool + + :return: A deferred that will fire after the document hasa been + decrypted and inserted in the sync db. + :rtype: twisted.internet.defer.Deferred + """ + # insert_doc_cb is a proxy object that gets updated with the right + # insert function only when the sync_target invokes the sync_exchange + # method. so, if we don't still have a non-empty callback, we refuse + # to proceed. + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") + return + + soledad_assert(self._crypto is not None, "need a crypto object") + + content = json.loads(content) + key = self._crypto.doc_passphrase(doc_id) + secret = self._crypto.secret + args = doc_id, rev, content, gen, trans_id, key, secret, idx + + if workers: + # when using multiprocessing, we need to wait for all parallel + # processing to finish before continuing with the + # decrypt-and-process loop. We do this by using an extra deferred + # that will be fired by the multiprocessing callback when it has + # finished processing. + d1 = defer.Deferred() + + def _multiprocessing_callback(result): + d2 = self._decrypt_doc_cb(result) + d2.addCallback(lambda defres: d1.callback(defres)) + + # save the async result object so we can inspect it for failures + self._async_results.append( + self._pool.apply_async( + decrypt_doc_task, args, + callback=_multiprocessing_callback)) + + return d1 + else: + # decrypt inline + res = decrypt_doc_task(*args) + return self._decrypt_doc_cb(res) + + def _decrypt_doc_cb(self, result): + """ + Store the decryption result in the sync db from where it will later be + picked by _process_decrypted. + + :param result: A tuple containing the document's id, revision, + content, generation, transaction id and sync index. + :type result: tuple(str, str, str, int, str, int) + + :return: A deferred that will fire after the document has been + inserted in the sync db. + :rtype: twisted.internet.defer.Deferred + """ + doc_id, rev, content, gen, trans_id, idx = result + logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" + % (doc_id, rev, gen, trans_id)) + return self.insert_received_doc( + doc_id, rev, content, gen, trans_id, idx) + + def _get_docs(self, encrypted=None, order_by='idx', order='ASC'): + """ + Get documents from the received docs table in the sync db. + + :param encrypted: If not None, only return documents with encrypted + field equal to given parameter. + :type encrypted: bool or None + :param order_by: The name of the field to order results. + :type order_by: str + :param order: Whether the order should be ASC or DESC. + :type order: str + + :return: A deferred that will fire with the results of the database + query. + :rtype: twisted.internet.defer.Deferred + """ + query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ + "idx FROM %s" % self.TABLE_NAME + if encrypted is not None: + query += " WHERE encrypted = %d" % int(encrypted) + query += " ORDER BY %s %s" % (order_by, order) + return self._sync_db.runQuery(query) + + @defer.inlineCallbacks + def _get_insertable_docs(self): + """ + Return a list of non-encrypted documents ready to be inserted. + + :return: A deferred that will fire with the list of insertable + documents. + :rtype: twisted.internet.defer.Deferred + """ + # here, we fetch the list of decrypted documents and compare with the + # index of the last succesfully processed document. + decrypted_docs = yield self._get_docs(encrypted=False) + insertable = [] + last_idx = self._last_inserted_idx + for doc_id, rev, content, gen, trans_id, encrypted, idx in \ + decrypted_docs: + # XXX for some reason, a document might not have been deleted from + # the database. This is a bug. In this point, already + # processed documents should have been removed from the sync + # database and we should not have to skip them here. We need + # to find out why this is happening, fix, and remove the + # skipping below. + if (idx < last_idx + 1): + continue + if (idx != last_idx + 1): + break + insertable.append((doc_id, rev, content, gen, trans_id, idx)) + last_idx += 1 + defer.returnValue(insertable) + + def _decrypt_received_docs(self): + """ + Get all the encrypted documents from the sync database and dispatch a + decrypt worker to decrypt each one of them. + + :return: A deferred that will fire after all documents have been + decrypted and inserted back in the sync db. + :rtype: twisted.internet.defer.Deferred + """ + + def _callback(received_docs): + deferreds = [] + for doc_id, rev, content, gen, trans_id, _, idx in received_docs: + deferreds.append( + self._decrypt_doc( + doc_id, rev, content, gen, trans_id, idx)) + return defer.gatherResults(deferreds) + + d = self._get_docs(encrypted=True) + d.addCallback(_callback) + return d + + def _process_decrypted(self): + """ + Fetch as many decrypted documents as can be taken from the expected + order and insert them in the database. + + :return: A deferred that will fire with the list of inserted + documents. + :rtype: twisted.internet.defer.Deferred + """ + + def _callback(insertable): + for doc_fields in insertable: + self._insert_decrypted_local_doc(*doc_fields) + return insertable + + d = self._get_insertable_docs() + d.addCallback(_callback) + return d + + def _delete_processed_docs(self, inserted): + """ + Delete from the sync db documents that have been processed. + + :param inserted: List of documents inserted in the previous process + step. + :type inserted: list + + :return: A list of deferreds that will fire when each operation in the + database has finished. + :rtype: twisted.internet.defer.DeferredList + """ + deferreds = [] + for doc_id, doc_rev, _, _, _, _ in inserted: + deferreds.append( + self._delete_received_doc(doc_id)) + if not deferreds: + return defer.succeed(None) + return defer.gatherResults(deferreds) + + def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, + gen, trans_id, idx): + """ + Insert the decrypted document into the local sqlcipher database. + Makes use of the passed callback `return_doc_cb` passed to the caller + by u1db sync. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification + of that document. + :type trans_id: str + """ + # could pass source_replica in params for callback chain + insert_fun = self._insert_doc_cb[self.source_replica_uid] + logger.debug("Sync decrypter pool: inserting doc in local db: " + "%s:%s %s" % (doc_id, doc_rev, gen)) + + # convert deleted documents to avoid error on document creation + if content == 'null': + content = None + doc = SoledadDocument(doc_id, doc_rev, content) + gen = int(gen) + insert_fun(doc, gen, trans_id) + + # store info about processed docs + self._last_inserted_idx = idx + self._processed_docs += 1 + + def _empty(self): + """ + Empty the received docs table of the sync database. + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) + return self._sync_db.runOperation(query) + + def _raise_if_async_fails(self): + """ + Raise any exception raised by a multiprocessing async decryption + call. + + :raise Exception: Raised if an async call has raised an exception. + """ + for res in self._async_results: + if res.ready(): + if not res.successful(): + # re-raise the exception raised by the remote call + res.get() + + def _decrypt_and_process_docs(self): + """ + Decrypt the documents received from remote replica and insert them + into the local one. + + This method runs in its own thread, so sleeping will not interfere + with the main thread. + """ + try: + # wait for database to be emptied + self._empty_db.wait() + # wait until we know how many documents we need to process + while self._docs_to_process is None: + time.sleep(self.DECRYPT_LOOP_PERIOD) + # because all database operations are asynchronous, we use an event to + # make sure we don't start the next loop before the current one has + # finished. + event = threading.Event() + # loop until we have processes as many docs as the number of changes + while self._processed_docs < self._docs_to_process: + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + continue + event.clear() + d = self._decrypt_received_docs() + d.addCallback(lambda _: self._raise_if_async_fails()) + d.addCallback(lambda _: self._process_decrypted()) + d.addCallback(self._delete_processed_docs) + d.addCallback(lambda _: event.set()) + event.wait() + # sleep a bit to give time for some decryption work + time.sleep(self.DECRYPT_LOOP_PERIOD) + except Exception as e: + self._exception = e + self._finished.set() + + def wait(self): + """ + Wait for the decrypt-and-process loop to finish. + """ + self._finished.wait() + if self._exception: + raise self._exception diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 4f7ecd1b..d3b3d01b 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -66,7 +66,7 @@ from twisted.python.threadpool import ThreadPool from twisted.python import log from twisted.enterprise import adbapi -from leap.soledad.client import crypto +from leap.soledad.client import encdecpool from leap.soledad.client.target import SoledadSyncTarget from leap.soledad.client.target import PendingReceivedDocsSyncError from leap.soledad.client.sync import SoledadSynchronizer @@ -489,7 +489,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): if defer_encryption: # initialize syncing queue encryption pool - self._sync_enc_pool = crypto.SyncEncrypterPool( + self._sync_enc_pool = encdecpool.SyncEncrypterPool( self._crypto, self._sync_db) # ----------------------------------------------------------------- @@ -578,8 +578,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :rtype: tuple of strings """ maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)" - encr = crypto.SyncEncrypterPool - decr = crypto.SyncDecrypterPool + encr = encdecpool.SyncEncrypterPool + decr = encdecpool.SyncDecrypterPool sql_encr_table_query = (maybe_create % ( encr.TABLE_NAME, encr.FIELD_NAMES)) sql_decr_table_query = (maybe_create % ( diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 06cef1ee..17ce718f 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -43,7 +43,8 @@ from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth from leap.soledad.client.crypto import is_symmetrically_encrypted from leap.soledad.client.crypto import encrypt_doc, decrypt_doc -from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool +from leap.soledad.client.encdecpool import SyncEncrypterPool +from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import signal @@ -787,9 +788,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._crypto = crypto self._stopped = True self._stop_lock = threading.Lock() - self._sync_exchange_lock = threading.Lock() self.source_replica_uid = source_replica_uid - self._defer_decryption = False self._syncer_pool = None # deferred decryption attributes @@ -813,9 +812,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ Tear down the SyncDecrypterPool. """ - if self._sync_decr_pool is not None: - self._sync_decr_pool.close() - self._sync_decr_pool = None + self._sync_decr_pool.close() + self._sync_decr_pool = None def _get_replica_uid(self, url): """ @@ -903,7 +901,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): doc = SoledadDocument(doc_id, rev, content) if is_symmetrically_encrypted(doc): if self._queue_for_decrypt: - self._save_encrypted_received_doc( + self._enqueue_encrypted_received_doc( doc, gen, trans_id, idx, total) else: # defer_decryption is False or no-sync-db fallback @@ -913,7 +911,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # not symmetrically encrypted doc, insert it directly # or save it in the decrypted stage. if self._queue_for_decrypt: - self._save_received_doc(doc, gen, trans_id, idx, total) + self._enqueue_received_doc(doc, gen, trans_id, idx, total) else: self._return_doc_cb(doc, gen, trans_id) # ------------------------------------------------------------- @@ -996,6 +994,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self.stop(fail=True) break + if defer_decryption: + self._setup_sync_decr_pool() + t.doc_syncer.set_request_method( 'get', idx, sync_id, last_known_generation, last_known_trans_id) @@ -1021,6 +1022,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): t.join() if t.success: number_of_changes, _, _ = t.result + if defer_decryption and number_of_changes: + self._sync_decr_pool.set_docs_to_process( + number_of_changes) else: raise t.exception first_request = False @@ -1053,6 +1057,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): new_generation = doc_data['gen'] new_transaction_id = doc_data['trans_id'] + # decrypt docs in case of deferred decryption + if defer_decryption: + self._sync_decr_pool.wait() + self._teardown_sync_decr_pool() + return new_generation, new_transaction_id def sync_exchange(self, docs_by_generations, @@ -1103,14 +1112,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ self._ensure_callback = ensure_callback - if defer_decryption and self._sync_db is not None: - self._sync_exchange_lock.acquire() - self._setup_sync_decr_pool() - self._defer_decryption = True - else: - # fall back - defer_decryption = False - self.start() if sync_id is None: @@ -1120,10 +1121,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): setProxiedObject(self._insert_doc_cb[source_replica_uid], return_doc_cb) - # empty the database before starting a new sync - if defer_decryption is True and not self.clear_to_sync(): - self._sync_decr_pool.empty() - self._ensure_connection() if self._trace_hook: # for tests self._trace_hook('sync_exchange') @@ -1257,6 +1254,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): trans_id_after_send = response_dict['new_transaction_id'] # get docs from target + if self._sync_db is None: + defer_decryption = False if self.stopped is False: cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, @@ -1266,12 +1265,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._syncer_pool.cleanup() - # decrypt docs in case of deferred decryption - if defer_decryption: - self._sync_decr_pool.wait() - self._teardown_sync_decr_pool() - self._sync_exchange_lock.release() - # update gen and trans id info in case we just sent and did not # receive docs. if gen_after_send is not None and gen_after_send > cur_target_gen: @@ -1357,7 +1350,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): encr.TABLE_NAME,)) self._sync_db.execute(sql, (doc_id, doc_rev)) - def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): + def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total): """ Save a symmetrically encrypted incoming document into the received docs table in the sync db. A decryption task will pick it up @@ -1378,9 +1371,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): "Enqueueing doc for decryption: %d/%d." % (idx + 1, total)) self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id) + doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) - def _save_received_doc(self, doc, gen, trans_id, idx, total): + def _enqueue_received_doc(self, doc, gen, trans_id, idx, total): """ Save any incoming document into the received docs table in the sync db. @@ -1399,23 +1392,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): "Enqueueing doc, no decryption needed: %d/%d." % (idx + 1, total)) self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id) + doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) # # Symmetric decryption of syncing docs # - def clear_to_sync(self): - """ - Return whether sync can proceed (ie, the received db table is empty). - - :return: Whether sync can proceed. - :rtype: bool - """ - if self._sync_decr_pool: - return self._sync_decr_pool.clear_to_sync() - return True - def set_decryption_callback(self, cb): """ Set callback to be called when the decryption finishes. diff --git a/scripts/db_access/client_side_db.py b/scripts/db_access/client_side_db.py index d7c54b66..a047b522 100644 --- a/scripts/db_access/client_side_db.py +++ b/scripts/db_access/client_side_db.py @@ -10,6 +10,7 @@ import requests import srp._pysrp as srp import binascii import logging +import json from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks @@ -146,6 +147,9 @@ def _parse_args(): parser.add_argument( '--passphrase', '-p', default=None, help='the user passphrase') + parser.add_argument( + '--get-all-docs', '-a', action='store_true', + help='get all documents from the local database') parser.add_argument( '--sync', '-s', action='store_true', help='synchronize with the server replica') @@ -196,12 +200,21 @@ def _export_incoming_messages(soledad, directory): i += 1 +@inlineCallbacks +def _get_all_docs(soledad): + _, docs = yield soledad.get_all_docs() + for doc in docs: + print json.dumps(doc.content, indent=4) + + # main program @inlineCallbacks def _main(soledad, km, args): if args.sync: yield soledad.sync() + if args.get_all_docs: + yield _get_all_docs(soledad) if args.export_private_key: yield _export_key(args, km, args.export_private_key, private=True) if args.export_public_key: -- cgit v1.2.3 From 67f17cd30d01696ab24407b907bb55ae0fddacad Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 12 May 2015 10:33:23 -0400 Subject: [bug] remove illegal CR from auth header The b64 encoding of the auth token was introducing an illegal character (\n), which was breaking the authentication step since an exception was being raised - when that multi-line header was attempted to be built. this commit fixes that bug. - Resolves: #6959 --- client/src/leap/soledad/client/auth.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/client/src/leap/soledad/client/auth.py b/client/src/leap/soledad/client/auth.py index 72ab0008..6dfabeb4 100644 --- a/client/src/leap/soledad/client/auth.py +++ b/client/src/leap/soledad/client/auth.py @@ -14,15 +14,13 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . - - """ Methods for token-based authentication. These methods have to be included in all classes that extend HTTPClient so they can do token-based auth requests to the Soledad server. """ - +import base64 from u1db import errors @@ -49,7 +47,7 @@ class TokenBasedAuth(object): Return an authorization header to be included in the HTTP request, in the form: - [('Authorization', 'Token ')] :param method: The HTTP method. :type method: str @@ -64,7 +62,8 @@ class TokenBasedAuth(object): if 'token' in self._creds: uuid, token = self._creds['token'] auth = '%s:%s' % (uuid, token) - return [('Authorization', 'Token %s' % auth.encode('base64')[:-1])] + b64_token = base64.b64encode(auth) + return [('Authorization', 'Token %s' % b64_token)] else: raise errors.UnknownAuthMethod( 'Wrong credentials: %s' % self._creds) -- cgit v1.2.3 From d1c39d389737ee844f42c5ed3dfc97c8ddf24250 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 12 May 2015 18:07:27 -0300 Subject: [bug] remove unused pending documents exception When we started implementing the sync db, one of the ideas was to reuse the data in the database in the case of a sync interruption. We don't do that now and thus the pending documents exception is unneeded. This commit removes that exception from the code. --- client/src/leap/soledad/client/sqlcipher.py | 6 ------ client/src/leap/soledad/client/target.py | 4 ---- 2 files changed, 10 deletions(-) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index d3b3d01b..39d5dd0e 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -68,7 +68,6 @@ from twisted.enterprise import adbapi from leap.soledad.client import encdecpool from leap.soledad.client.target import SoledadSyncTarget -from leap.soledad.client.target import PendingReceivedDocsSyncError from leap.soledad.client.sync import SoledadSynchronizer from leap.soledad.client import pragmas @@ -636,17 +635,12 @@ class SQLCipherU1DBSync(SQLCipherDatabase): log.msg('syncer sync...') res = syncer.sync(autocreate=autocreate, defer_decryption=defer_decryption) - - except PendingReceivedDocsSyncError: - logger.warning("Local sync db is not clear, skipping sync...") - return except CannotSendRequest: logger.warning("Connection with sync target couldn't be " "established. Resetting connection...") # closing the connection it will be recreated in the next try syncer.sync_target.close() return - return res def stop_sync(self): diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 17ce718f..f2415218 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -71,10 +71,6 @@ def _gunzip(data): return data -class PendingReceivedDocsSyncError(Exception): - pass - - class DocumentSyncerThread(threading.Thread): """ A thread that knowns how to either send or receive a document during the -- cgit v1.2.3 From 93717f50c9e8fc6295f74b6117268ba595f13ce9 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 13 May 2015 10:34:22 -0300 Subject: [feature] add --create-doc to client db script --- scripts/db_access/client_side_db.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/scripts/db_access/client_side_db.py b/scripts/db_access/client_side_db.py index a047b522..5dd2bd95 100644 --- a/scripts/db_access/client_side_db.py +++ b/scripts/db_access/client_side_db.py @@ -150,6 +150,9 @@ def _parse_args(): parser.add_argument( '--get-all-docs', '-a', action='store_true', help='get all documents from the local database') + parser.add_argument( + '--create-doc', '-c', default=None, + help='create a document with give content') parser.add_argument( '--sync', '-s', action='store_true', help='synchronize with the server replica') @@ -211,6 +214,8 @@ def _get_all_docs(soledad): @inlineCallbacks def _main(soledad, km, args): + if args.create_doc: + yield soledad.create_doc({'content': args.create_doc}) if args.sync: yield soledad.sync() if args.get_all_docs: -- cgit v1.2.3 From 94cbe24f6c6cd54e14d8d1b14e617c2d52c427fd Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 13 May 2015 10:31:47 -0300 Subject: [feature] use twisted adbapi for async encryption The access to the sync db was modified to use twisted.enterprise.adbapi, but only the asynchronous decryption of incoming documents during sync was adapted. This commit modifies the asynchornous encryption of documents to also use the adbapi for accessing the sync db. --- client/src/leap/soledad/client/encdecpool.py | 175 ++++++++++++++++++--- client/src/leap/soledad/client/sqlcipher.py | 64 +------- client/src/leap/soledad/client/sync.py | 4 - client/src/leap/soledad/client/target.py | 217 +++++++++++++++------------ 4 files changed, 275 insertions(+), 185 deletions(-) diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 0466ec5d..0c1f92ea 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -85,6 +85,36 @@ class SyncEncryptDecryptPool(object): logger.debug("Terminating %s" % (self.__class__.__name__,)) self._pool.terminate() + def _runOperation(self, query, *args): + """ + Run an operation on the sync db. + + :param query: The query to be executed. + :type query: str + :param args: A list of query arguments. + :type args: list + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + return self._sync_db.runOperation(query, *args) + + def _runQuery(self, query, *args): + """ + Run a query on the sync db. + + :param query: The query to be executed. + :type query: str + :param args: A list of query arguments. + :type args: list + + :return: A deferred that will fire with the results of the database + query. + :rtype: twisted.internet.defer.Deferred + """ + return self._sync_db.runQuery(query, *args) + def encrypt_doc_task(doc_id, doc_rev, content, key, secret): """ @@ -119,7 +149,50 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): TABLE_NAME = "docs_tosync" FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" - def encrypt_doc(self, doc, workers=True): + ENCRYPT_LOOP_PERIOD = 0.5 + + def __init__(self, *args, **kwargs): + """ + Initialize the sync encrypter pool. + """ + SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + + self._stopped = False + self._sync_queue = multiprocessing.Queue() + + # start the encryption loop + self._deferred_loop = deferToThread(self._encrypt_docs_loop) + self._deferred_loop.addCallback( + lambda _: logger.debug("Finished encrypter thread.")) + + def enqueue_doc_for_encryption(self, doc): + """ + Enqueue a document for encryption. + + :param doc: The document to be encrypted. + :type doc: SoledadDocument + """ + try: + self.sync_queue.put_nowait(doc) + except multiprocessing.Queue.Full: + # do not asynchronously encrypt this file if the queue is full + pass + + def _encrypt_docs_loop(self): + """ + Process the syncing queue and send the documents there to be encrypted + in the sync db. They will be read by the SoledadSyncTarget during the + sync_exchange. + """ + logger.debug("Starting encrypter thread.") + while not self._stopped: + try: + doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD) + self._encrypt_doc(doc) + except multiprocessing.Queue.Empty: + pass + + def _encrypt_doc(self, doc, workers=True): """ Symmetrically encrypt a document. @@ -136,20 +209,20 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): secret = self._crypto.secret args = doc.doc_id, doc.rev, docstr, key, secret - try: - if workers: - res = self._pool.apply_async( - encrypt_doc_task, args, - callback=self.encrypt_doc_cb) - else: - # encrypt inline + if workers: + # encrypt asynchronously + self._pool.apply_async( + encrypt_doc_task, args, + callback=self._encrypt_doc_cb) + else: + # encrypt inline + try: res = encrypt_doc_task(*args) - self.encrypt_doc_cb(res) + self._encrypt_doc_cb(res) + except Exception as exc: + logger.exception(exc) - except Exception as exc: - logger.exception(exc) - - def encrypt_doc_cb(self, result): + def _encrypt_doc_cb(self, result): """ Insert results of encryption routine into the local sync database. @@ -158,9 +231,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :type result: tuple(str, str, str) """ doc_id, doc_rev, content = result - return self.insert_encrypted_local_doc(doc_id, doc_rev, content) + return self._insert_encrypted_local_doc(doc_id, doc_rev, content) - def insert_encrypted_local_doc(self, doc_id, doc_rev, content): + def _insert_encrypted_local_doc(self, doc_id, doc_rev, content): """ Insert the contents of the encrypted doc into the local sync database. @@ -174,7 +247,58 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): """ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ % (self.TABLE_NAME,) - return self._sync_db.runOperation(query, (doc_id, doc_rev, content)) + return self._runOperation(query, (doc_id, doc_rev, content)) + + @defer.inlineCallbacks + def get_encrypted_doc(self, doc_id, doc_rev): + """ + Get an encrypted document from the sync db. + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + + :return: A deferred that will fire with the encrypted content of the + document or None if the document was not found in the sync + db. + :rtype: twisted.internet.defer.Deferred + """ + logger.debug("Trying to get encrypted doc from sync db: %s" % doc_id) + query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \ + % self.TABLE_NAME + result = yield self._runQuery(query, (doc_id, doc_rev)) + if result: + val = result.pop() + defer.returnValue(val[0]) + defer.returnValue(None) + + def delete_encrypted_doc(self, doc_id, doc_rev): + """ + Delete an encrypted document from the sync db. + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM %s WHERE doc_id=? and rev=?" \ + % self.TABLE_NAME + self._runOperation(query, (doc_id, doc_rev)) + + def close(self): + """ + Close the encrypter pool. + """ + self._stopped = True + self._sync_queue.close() + q = self._sync_queue + del q + self._sync_queue = None def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, @@ -275,9 +399,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): d.addCallback(lambda _: self._empty_db.set()) # start the decryption loop - self._deferred_loop = deferToThread(self._decrypt_and_process_docs) + self._deferred_loop = deferToThread( + self._decrypt_and_process_docs_loop) self._deferred_loop.addCallback( - lambda _: logger.debug("Finished decryptor thread.")) + lambda _: logger.debug("Finished decrypter thread.")) def set_docs_to_process(self, docs_to_process): """ @@ -317,7 +442,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): docstr = json.dumps(content) query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ % self.TABLE_NAME - return self._sync_db.runOperation( + return self._runOperation( query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx)) def insert_received_doc( @@ -348,7 +473,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): content = json.dumps(content) query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ % self.TABLE_NAME - return self._sync_db.runOperation( + return self._runOperation( query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) def _delete_received_doc(self, doc_id): @@ -364,7 +489,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ query = "DELETE FROM '%s' WHERE doc_id=?" \ % self.TABLE_NAME - return self._sync_db.runOperation(query, (doc_id,)) + return self._runOperation(query, (doc_id,)) def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx, workers=True): @@ -474,7 +599,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): if encrypted is not None: query += " WHERE encrypted = %d" % int(encrypted) query += " ORDER BY %s %s" % (order_by, order) - return self._sync_db.runQuery(query) + return self._runQuery(query) @defer.inlineCallbacks def _get_insertable_docs(self): @@ -612,7 +737,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :rtype: twisted.internet.defer.Deferred """ query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) - return self._sync_db.runOperation(query) + return self._runOperation(query) def _raise_if_async_fails(self): """ @@ -627,7 +752,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # re-raise the exception raised by the remote call res.get() - def _decrypt_and_process_docs(self): + def _decrypt_and_process_docs_loop(self): """ Decrypt the documents received from remote replica and insert them into the local one. @@ -668,6 +793,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ Wait for the decrypt-and-process loop to finish. """ + logger.debug("Waiting for asynchronous decryption of incoming documents...") self._finished.wait() + logger.debug("Asynchronous decryption of incoming documents finished.") if self._exception: raise self._exception diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 39d5dd0e..16241621 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -42,7 +42,6 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases handled by Soledad should be created by SQLCipher >= 2.0. """ import logging -import multiprocessing import os import threading import json @@ -286,10 +285,9 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :rtype: str """ doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) - - # TODO XXX move to API XXX if self.defer_encryption: - self.sync_queue.put_nowait(doc) + # TODO move to api? + self._sync_enc_pool.enqueue_doc_for_encryption(doc) return doc_rev # @@ -428,13 +426,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): """ LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db' - """ - A dictionary that hold locks which avoid multiple sync attempts from the - same database replica. - """ - # XXX We do not need the lock here now. Remove. - encrypting_lock = threading.Lock() - """ Period or recurrence of the Looping Call that will do the encryption to the syncdb (in seconds). @@ -458,7 +449,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._sync_db_key = opts.sync_db_key self._sync_db = None self._sync_enc_pool = None - self.sync_queue = None # we store syncers in a dictionary indexed by the target URL. We also # store a hash of the auth info in case auth info expires and we need @@ -468,7 +458,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # self._syncers = {'': ('', syncer), ...} self._syncers = {} - self.sync_queue = multiprocessing.Queue() self.running = False self._sync_threadpool = None @@ -486,24 +475,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._initialize_sync_db(opts) if defer_encryption: - # initialize syncing queue encryption pool self._sync_enc_pool = encdecpool.SyncEncrypterPool( self._crypto, self._sync_db) - # ----------------------------------------------------------------- - # From the documentation: If f returns a deferred, rescheduling - # will not take place until the deferred has fired. The result - # value is ignored. - - # TODO use this to avoid multiple sync attempts if the sync has not - # finished! - # ----------------------------------------------------------------- - - # XXX this was called sync_watcher --- trace any remnants - self._sync_loop = LoopingCall(self._encrypt_syncing_docs) - self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD) - self.shutdownID = None @property @@ -703,7 +678,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._replica_uid, creds=creds, crypto=self._crypto, - sync_db=self._sync_db)) + sync_db=self._sync_db, + sync_enc_pool=self._sync_enc_pool)) self._syncers[url] = (h, syncer) # in order to reuse the same synchronizer multiple times we have to # reset its state (i.e. the number of documents received from target @@ -715,33 +691,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # Symmetric encryption of syncing docs # - def _encrypt_syncing_docs(self): - """ - Process the syncing queue and send the documents there - to be encrypted in the sync db. They will be read by the - SoledadSyncTarget during the sync_exchange. - - Called periodically from the LoopingCall self._sync_loop. - """ - # TODO should return a deferred that would firewhen the encryption is - # done. See note on __init__ - - lock = self.encrypting_lock - # optional wait flag used to avoid blocking - if not lock.acquire(False): - return - else: - queue = self.sync_queue - try: - while not queue.empty(): - doc = queue.get_nowait() - self._sync_enc_pool.encrypt_doc(doc) - - except Exception as exc: - logger.error("Error while encrypting docs to sync") - logger.exception(exc) - finally: - lock.release() def get_generation(self): # FIXME @@ -779,11 +728,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): if self._sync_db is not None: self._sync_db.close() self._sync_db = None - # close the sync queue - if self.sync_queue is not None: - self.sync_queue.close() - del self.sync_queue - self.sync_queue = None class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index d3f106da..d4ca4258 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -195,10 +195,6 @@ class SoledadSynchronizer(Synchronizer): "my_gen": my_gen } self._syncing_info = info - if defer_decryption and not sync_target.has_syncdb(): - logger.debug("Sync target has no valid sync db, " - "aborting defer_decryption") - defer_decryption = False self.complete_sync() except Exception as e: logger.error("Soledad sync error: %s" % str(e)) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index f2415218..667aab15 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -28,6 +28,7 @@ import threading from collections import defaultdict from time import sleep from uuid import uuid4 +from functools import partial import simplejson as json @@ -38,12 +39,12 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase from zope.proxy import ProxyBase from zope.proxy import setProxiedObject +from twisted.internet import defer from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth from leap.soledad.client.crypto import is_symmetrically_encrypted from leap.soledad.client.crypto import encrypt_doc, decrypt_doc -from leap.soledad.client.encdecpool import SyncEncrypterPool from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS @@ -111,6 +112,7 @@ class DocumentSyncerThread(threading.Thread): self._exception = None self._result = None self._success = False + self.started = threading.Event() # a lock so we can signal when we're finished self._request_lock = threading.Lock() self._request_lock.acquire() @@ -128,6 +130,8 @@ class DocumentSyncerThread(threading.Thread): finish before actually performing the request. It also traps any exception and register any failure with the request. """ + self.started.set() + with self._stop_lock: if self._stopped is None: self._stopped = False @@ -756,7 +760,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, - sync_db=None): + sync_db=None, sync_enc_pool=None): """ Initialize the SoledadSyncTarget. @@ -787,8 +791,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self.source_replica_uid = source_replica_uid self._syncer_pool = None - # deferred decryption attributes + # asynchronous encryption/decryption attributes self._sync_db = sync_db + self._sync_enc_pool = sync_enc_pool self._decryption_callback = None self._sync_decr_pool = None @@ -796,7 +801,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ Set up the SyncDecrypterPool for deferred decryption. """ - if self._sync_decr_pool is None: + if self._sync_decr_pool is None and self._sync_db is not None: # initialize syncing queue decryption pool self._sync_decr_pool = SyncDecrypterPool( self._crypto, @@ -1018,7 +1023,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): t.join() if t.success: number_of_changes, _, _ = t.result - if defer_decryption and number_of_changes: + if defer_decryption: self._sync_decr_pool.set_docs_to_process( number_of_changes) else: @@ -1060,6 +1065,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): return new_generation, new_transaction_id + @property + def _defer_encryption(self): + return self._sync_enc_pool is not None + + @property + def _defer_decryption(self): + return self._sync_decr_pool is not None + def sync_exchange(self, docs_by_generations, source_replica_uid, last_known_generation, last_known_trans_id, return_doc_cb, @@ -1126,17 +1139,19 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen = last_known_generation cur_target_trans_id = last_known_trans_id - # send docs + # ------------------------------------------------------------------- + # start of send documents to target + # ------------------------------------------------------------------- msg = "%d/%d" % (0, len(docs_by_generations)) signal(SOLEDAD_SYNC_SEND_STATUS, msg) logger.debug("Soledad sync send status: %s" % msg) - defer_encryption = self._sync_db is not None self._syncer_pool = DocumentSyncerPool( self._raw_url, self._raw_creds, url, headers, ensure_callback, self.stop_syncer) threads = [] last_callback_lock = None + sent = 0 total = len(docs_by_generations) @@ -1156,66 +1171,78 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # ------------------------------------------------------------- # symmetric encryption of document's contents # ------------------------------------------------------------- - doc_json = doc.get_json() - if not doc.is_tombstone(): - if not defer_encryption: - # fallback case, for tests - doc_json = encrypt_doc(self._crypto, doc) - else: - try: - doc_json = self.get_encrypted_doc_from_db( - doc.doc_id, doc.rev) - except Exception as exc: - logger.error("Error while getting " - "encrypted doc from db") - logger.exception(exc) - continue + + # the following var will hold a deferred because we may try to + # fetch the encrypted document from the sync db + d = None + + if doc.is_tombstone(): + d = defer.succeed(None) + elif not self._defer_encryption: + # fallback case, for tests + d = defer.succeed(encrypt_doc(self._crypto, doc)) + else: + + def _maybe_encrypt_doc_inline(doc_json): if doc_json is None: - # Not marked as tombstone, but we got nothing - # from the sync db. As it is not encrypted yet, we - # force inline encryption. + # the document is not marked as tombstone, but we got + # nothing from the sync db. As it is not encrypted + # yet, we force inline encryption. # TODO: implement a queue to deal with these cases. - doc_json = encrypt_doc(self._crypto, doc) + return encrypt_doc(self._crypto, doc) + return doc_json + + d = self.get_encrypted_doc_from_db(doc.doc_id, doc.rev) + d.addCallback(_maybe_encrypt_doc_inline) # ------------------------------------------------------------- # end of symmetric encryption # ------------------------------------------------------------- + t = self._syncer_pool.new_syncer_thread( sent + 1, total, last_request_lock=last_request_lock, last_callback_lock=last_callback_lock) - # bail out if any thread failed + # bail out if creation of any thread failed if t is None: self.stop(fail=True) break - # set the request method - t.doc_syncer.set_request_method( - 'put', sync_id, cur_target_gen, cur_target_trans_id, - id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, - trans_id=trans_id, number_of_docs=number_of_docs, - doc_idx=sent + 1) - # set the success calback + # the following callback will be called when the document's + # encrypted content is available, either because it was found on + # the sync db or because it has been encrypted inline. - def _success_callback(idx, total, response): - _success_msg = "Soledad sync send status: %d/%d" \ - % (idx, total) - signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) - logger.debug(_success_msg) + def _configure_and_start_thread(t, doc_json): + # set the request method + t.doc_syncer.set_request_method( + 'put', sync_id, cur_target_gen, cur_target_trans_id, + id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, + trans_id=trans_id, number_of_docs=number_of_docs, + doc_idx=sent + 1) + # set the success calback - t.doc_syncer.set_success_callback(_success_callback) + def _success_callback(idx, total, response): + _success_msg = "Soledad sync send status: %d/%d" \ + % (idx, total) + signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) + logger.debug(_success_msg) - # set the failure callback - def _failure_callback(idx, total, exception): - _failure_msg = "Soledad sync: error while sending document " \ - "%d/%d: %s" % (idx, total, exception) - logger.warning("%s" % _failure_msg) - logger.warning("Soledad sync: failing gracefully, will " - "recover on next sync.") + t.doc_syncer.set_success_callback(_success_callback) - t.doc_syncer.set_failure_callback(_failure_callback) + # set the failure callback + def _failure_callback(idx, total, exception): + _failure_msg = "Soledad sync: error while sending document " \ + "%d/%d: %s" % (idx, total, exception) + logger.warning("%s" % _failure_msg) + logger.warning("Soledad sync: failing gracefully, will " + "recover on next sync.") + + t.doc_syncer.set_failure_callback(_failure_callback) + + # save thread and append + t.start() + + d.addCallback(partial(_configure_and_start_thread, t)) - # save thread and append - t.start() threads.append((t, doc)) # update lock references so they can be used in next call to @@ -1230,6 +1257,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): while threads: # check if there are failures t, doc = threads.pop(0) + t.started.wait() t.join() if t.success: synced.append((doc.doc_id, doc.rev)) @@ -1238,8 +1266,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): raise t.exception # delete documents from the sync database - if defer_encryption: - self.delete_encrypted_docs_from_db(synced) + if self._defer_encryption: + self._delete_encrypted_docs_from_db(synced) # get target gen and trans_id after docs gen_after_send = None @@ -1248,16 +1276,23 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): response_dict = json.loads(last_successful_thread.response[0])[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] - - # get docs from target - if self._sync_db is None: - defer_decryption = False + # ------------------------------------------------------------------- + # end of send documents to target + # ------------------------------------------------------------------- + + # ------------------------------------------------------------------- + # start of fetch documents from target + # ------------------------------------------------------------------- + defer_decryption = defer_decryption and self._defer_decryption if self.stopped is False: cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, last_known_generation, last_known_trans_id, headers, return_doc_cb, ensure_callback, sync_id, defer_decryption=defer_decryption) + # ------------------------------------------------------------------- + # end of fetch documents from target + # ------------------------------------------------------------------- self._syncer_pool.cleanup() @@ -1308,6 +1343,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): with self._stop_lock: return self._stopped is True + # + # Symmetric encryption of syncing docs + # + def get_encrypted_doc_from_db(self, doc_id, doc_rev): """ Retrieve encrypted document from the database of encrypted docs for @@ -1318,33 +1357,31 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :param doc_rev: The document revision :type doc_rev: str + + :return: A deferred which is fired with the document's encrypted + content or None if the document was not found on the sync db. + :rtype: twisted.internet.defer.Deferred """ - encr = SyncEncrypterPool - sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( - encr.TABLE_NAME,)) - res = self._fetchall(sql, (doc_id, doc_rev)) - if res: - val = res.pop() - return val[0] - else: - # no doc found - return None + logger.debug("Looking for encrypted document on sync db: %s" % doc_id) + return self._sync_enc_pool.get_encrypted_doc(doc_id, doc_rev) - def delete_encrypted_docs_from_db(self, docs_ids): + def _delete_encrypted_docs_from_db(self, docs): """ Delete several encrypted documents from the database of symmetrically encrypted docs to sync. - :param docs_ids: an iterable with (doc_id, doc_rev) for all documents - to be deleted. - :type docs_ids: any iterable of tuples of str + :param docs: an iterable with (doc_id, doc_rev) for all documents + to be deleted. + :type docs: any iterable of tuples of str """ - if docs_ids: - encr = SyncEncrypterPool - for doc_id, doc_rev in docs_ids: - sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( - encr.TABLE_NAME,)) - self._sync_db.execute(sql, (doc_id, doc_rev)) + for doc_id, doc_rev in docs: + logger.debug("Removing encrypted document on sync db: %s" + % doc_id) + return self._sync_enc_pool.delete_encrypted_doc(doc_id, doc_rev) + + # + # Symmetric decryption of syncing docs + # def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total): """ @@ -1357,15 +1394,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :param gen: The generation. :type gen: str :param trans_id: Transacion id. - :type gen: str + :param idx: The index count of the current operation. :type idx: int :param total: The total number of operations. :type total: int """ - logger.debug( - "Enqueueing doc for decryption: %d/%d." - % (idx + 1, total)) + logger.debug("Enqueueing doc for decryption: %d/%d." + % (idx + 1, total)) self._sync_decr_pool.insert_encrypted_received_doc( doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) @@ -1384,16 +1420,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :param total: The total number of operations. :type total: int """ - logger.debug( - "Enqueueing doc, no decryption needed: %d/%d." - % (idx + 1, total)) + logger.debug("Enqueueing doc, no decryption needed: %d/%d." + % (idx + 1, total)) self._sync_decr_pool.insert_received_doc( doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) - # - # Symmetric decryption of syncing docs - # - def set_decryption_callback(self, cb): """ Set callback to be called when the decryption finishes. @@ -1410,11 +1441,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ return self._decryption_callback is not None - def has_syncdb(self): - """ - Return True if we have an initialized syncdb. - """ - return self._sync_db is not None + # + # Authentication methods + # def _sign_request(self, method, url_query, params): """ @@ -1442,9 +1471,3 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :type token: str """ TokenBasedAuth.set_token_credentials(self, uuid, token) - - def _fetchall(self, *args, **kwargs): - with self._sync_db: - c = self._sync_db.cursor() - c.execute(*args, **kwargs) - return c.fetchall() -- cgit v1.2.3 From e62dafeba8f08c1f7588e37cf9cd3fb28e79a020 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 19 May 2015 18:46:53 -0300 Subject: [feature] use twisted.web.client in client sync This change uses twisted deferreds for the whole syncing process and paves the way to implementing other transport schemes. It removes a lot of threaded code that used locks and was very difficult to maintain, and lets twisted to the dirty work. Furthermore, all blocking network i/o is now handled asynchronously by the twisted. This commit removes the possibility of interrupting a sync, and we should reimplement it using cancellable deferreds if we need it. --- .../feature_use-twisted-web-for-client-sync | 1 + client/src/leap/soledad/client/encdecpool.py | 11 +- client/src/leap/soledad/client/http_target.py | 570 ++++++++ client/src/leap/soledad/client/sqlcipher.py | 46 +- client/src/leap/soledad/client/sync.py | 83 +- client/src/leap/soledad/client/target.py | 1473 -------------------- 6 files changed, 635 insertions(+), 1549 deletions(-) create mode 100644 client/changes/feature_use-twisted-web-for-client-sync create mode 100644 client/src/leap/soledad/client/http_target.py delete mode 100644 client/src/leap/soledad/client/target.py diff --git a/client/changes/feature_use-twisted-web-for-client-sync b/client/changes/feature_use-twisted-web-for-client-sync new file mode 100644 index 00000000..b4d1d4a4 --- /dev/null +++ b/client/changes/feature_use-twisted-web-for-client-sync @@ -0,0 +1 @@ + o Use twisted.web.client for client sync. diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 0c1f92ea..7c21c30e 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -789,12 +789,5 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._exception = e self._finished.set() - def wait(self): - """ - Wait for the decrypt-and-process loop to finish. - """ - logger.debug("Waiting for asynchronous decryption of incoming documents...") - self._finished.wait() - logger.debug("Asynchronous decryption of incoming documents finished.") - if self._exception: - raise self._exception + def has_finished(self): + return self._finished.is_set() diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py new file mode 100644 index 00000000..041180e6 --- /dev/null +++ b/client/src/leap/soledad/client/http_target.py @@ -0,0 +1,570 @@ +# -*- coding: utf-8 -*- +# target.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +A U1DB backend for encrypting data before sending to server and decrypting +after receiving. +""" + +import json +import base64 +import logging + +from zope.proxy import setProxiedObject +from zope.proxy import ProxyBase +from uuid import uuid4 +from functools import partial +from collections import defaultdict + +from twisted.internet import defer +from twisted.internet import reactor +from twisted.web.client import getPage + +from u1db import errors +from u1db import SyncTarget +from u1db.remote import utils + +from leap.soledad.common.document import SoledadDocument + +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.crypto import encrypt_doc +from leap.soledad.client.crypto import decrypt_doc +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import signal +from leap.soledad.client.encdecpool import SyncDecrypterPool + + +logger = logging.getLogger(__name__) + + +class SoledadHTTPSyncTarget(SyncTarget): + """ + A SyncTarget that encrypts data before sending and decrypts data after + receiving. + + Normally encryption will have been written to the sync database upon + document modification. The sync database is also used to write temporarily + the parsed documents that the remote send us, before being decrypted and + written to the main database. + """ + + # will later keep a reference to the insert-doc callback + # passed to sync_exchange + _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) + + def __init__(self, url, source_replica_uid, creds, crypto, + sync_db=None, sync_enc_pool=None): + """ + Initialize the sync target. + + :param url: The server sync url. + :type url: str + :param source_replica_uid: The source replica uid which we use when + deferring decryption. + :type source_replica_uid: str + :param url: The url of the target replica to sync with. + :type url: str + :param creds: A dictionary containing the uuid and token. + :type creds: creds + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt + document contents when syncing. + :type crypto: soledad.crypto.SoledadCrypto + :param sync_db: Optional. handler for the db with the symmetric + encryption of the syncing documents. If + None, encryption will be done in-place, + instead of retreiving it from the dedicated + database. + :type sync_db: Sqlite handler + """ + if url.endswith("/"): + url = url[:-1] + self._url = str(url) + "/sync-from/" + source_replica_uid + self.source_replica_uid = source_replica_uid + self._auth_header = None + self.set_creds(creds) + self._crypto = crypto + self._sync_db = sync_db + self._sync_enc_pool = sync_enc_pool + # asynchronous encryption/decryption attributes + self._decryption_callback = None + self._sync_decr_pool = None + + def set_creds(self, creds): + """ + Update credentials. + + :param creds: A dictionary containing the uuid and token. + :type creds: dict + """ + uuid = creds['token']['uuid'] + token = creds['token']['token'] + auth = '%s:%s' % (uuid, token) + b64_token = base64.b64encode(auth) + self._auth_header = {'Authorization': 'Token %s' % b64_token} + + @property + def _defer_encryption(self): + return self._sync_enc_pool is not None + + # + # SyncTarget API + # + + @defer.inlineCallbacks + def get_sync_info(self, source_replica_uid): + """ + Return information about known state of remote database. + + Return the replica_uid and the current database generation of the + remote database, and its last-seen database generation for the client + replica. + + :param source_replica_uid: The client-size replica uid. + :type source_replica_uid: str + + :return: A deferred which fires with (target_replica_uid, + target_replica_generation, target_trans_id, + source_replica_last_known_generation, + source_replica_last_known_transaction_id) + :rtype: twisted.internet.defer.Deferred + """ + raw = yield getPage(self._url, headers=self._auth_header) + res = json.loads(raw) + defer.returnValue([ + res['target_replica_uid'], + res['target_replica_generation'], + res['target_replica_transaction_id'], + res['source_replica_generation'], + res['source_transaction_id'] + ]) + + def record_sync_info( + self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + """ + Record tip information for another replica. + + After sync_exchange has been processed, the caller will have + received new content from this replica. This call allows the + source replica instigating the sync to inform us what their + generation became after applying the documents we returned. + + This is used to allow future sync operations to not need to repeat data + that we just talked about. It also means that if this is called at the + wrong time, there can be database records that will never be + synchronized. + + :param source_replica_uid: The identifier for the source replica. + :type source_replica_uid: str + :param source_replica_generation: The database generation for the + source replica. + :type source_replica_generation: int + :param source_replica_transaction_id: The transaction id associated + with the source replica + generation. + :type source_replica_transaction_id: str + + :return: A deferred which fires with the result of the query. + :rtype: twisted.internet.defer.Deferred + """ + data = json.dumps({ + 'generation': source_replica_generation, + 'transaction_id': source_replica_transaction_id + }) + headers = self._auth_header.copy() + headers.update({'content-type': 'application/json'}) + return getPage( + self._url, + method='PUT', + headers=headers, + postdata=data) + + @defer.inlineCallbacks + def sync_exchange(self, docs_by_generation, source_replica_uid, + last_known_generation, last_known_trans_id, + return_doc_cb, ensure_callback=None, + defer_decryption=True, sync_id=None): + """ + Find out which documents the remote database does not know about, + encrypt and send them. After that, receive documents from the remote + database. + + :param docs_by_generations: A list of (doc_id, generation, trans_id) + of local documents that were changed since + the last local generation the remote + replica knows about. + :type docs_by_generations: list of tuples + + :param source_replica_uid: The uid of the source replica. + :type source_replica_uid: str + + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + + :param return_doc_cb: A callback for inserting received documents from + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. + :type return_doc_cb: function + + :param ensure_callback: A callback that ensures we know the target + replica uid if the target replica was just + created. + :type ensure_callback: function + + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + + :return: A deferred which fires with the new generation and + transaction id of the target replica. + :rtype: twisted.internet.defer.Deferred + """ + + self._ensure_callback = ensure_callback + + if sync_id is None: + sync_id = str(uuid4()) + self.source_replica_uid = source_replica_uid + + # let the decrypter pool access the passed callback to insert docs + setProxiedObject(self._insert_doc_cb[source_replica_uid], + return_doc_cb) + + gen_after_send, trans_id_after_send = yield self._send_docs( + docs_by_generation, + last_known_generation, + last_known_trans_id, + sync_id) + + cur_target_gen, cur_target_trans_id = yield self._receive_docs( + last_known_generation, last_known_trans_id, + return_doc_cb, ensure_callback, sync_id, + defer_decryption=defer_decryption) + + # update gen and trans id info in case we just sent and did not + # receive docs. + if gen_after_send is not None and gen_after_send > cur_target_gen: + cur_target_gen = gen_after_send + cur_target_trans_id = trans_id_after_send + + defer.returnValue([cur_target_gen, cur_target_trans_id]) + + # + # methods to send docs + # + + def _prepare(self, comma, entries, **dic): + entry = comma + '\r\n' + json.dumps(dic) + entries.append(entry) + return len(entry) + + @defer.inlineCallbacks + def _send_docs(self, docs_by_generation, last_known_generation, + last_known_trans_id, sync_id): + + if not docs_by_generation: + defer.returnValue([None, None]) + + headers = self._auth_header.copy() + headers.update({'content-type': 'application/x-soledad-sync-put'}) + # add remote replica metadata to the request + first_entries = ['['] + self._prepare( + '', first_entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + idx = 0 + total = len(docs_by_generation) + for doc, gen, trans_id in docs_by_generation: + idx += 1 + result = yield self._send_one_doc( + headers, first_entries, doc, + gen, trans_id, total, idx) + if self._defer_encryption: + self._sync_enc_pool.delete_encrypted_doc( + doc.doc_id, doc.rev) + signal(SOLEDAD_SYNC_SEND_STATUS, + "Soledad sync send status: %d/%d" + % (idx, total)) + response_dict = json.loads(result)[0] + gen_after_send = response_dict['new_generation'] + trans_id_after_send = response_dict['new_transaction_id'] + defer.returnValue([gen_after_send, trans_id_after_send]) + + @defer.inlineCallbacks + def _send_one_doc(self, headers, first_entries, doc, gen, trans_id, + number_of_docs, doc_idx): + entries = first_entries[:] + # add the document to the request + content = yield self._encrypt_doc(doc) + self._prepare( + ',', entries, + id=doc.doc_id, rev=doc.rev, content=content, gen=gen, + trans_id=trans_id, number_of_docs=number_of_docs, + doc_idx=doc_idx) + entries.append('\r\n]') + data = ''.join(entries) + result = yield getPage( + self._url, + method='POST', + headers=headers, + postdata=data) + defer.returnValue(result) + + def _encrypt_doc(self, doc): + d = None + if doc.is_tombstone(): + d = defer.succeed(None) + elif not self._defer_encryption: + # fallback case, for tests + d = defer.succeed(encrypt_doc(self._crypto, doc)) + else: + + def _maybe_encrypt_doc_inline(doc_json): + if doc_json is None: + # the document is not marked as tombstone, but we got + # nothing from the sync db. As it is not encrypted + # yet, we force inline encryption. + return encrypt_doc(self._crypto, doc) + return doc_json + + d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) + d.addCallback(_maybe_encrypt_doc_inline) + return d + + # + # methods to receive doc + # + + @defer.inlineCallbacks + def _receive_docs(self, last_known_generation, last_known_trans_id, + return_doc_cb, ensure_callback, sync_id, + defer_decryption): + # we keep a reference to the callback in case we defer the decryption + self._return_doc_cb = return_doc_cb + self._queue_for_decrypt = defer_decryption \ + and self._sync_db is not None + + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + + if self._queue_for_decrypt: + logger.debug( + "Soledad sync: will queue received docs for decrypting.") + + if defer_decryption: + self._setup_sync_decr_pool() + + headers = self._auth_header.copy() + headers.update({'content-type': 'application/x-soledad-sync-get'}) + + # maybe get one doc + d = self._receive_one_doc( + headers, last_known_generation, last_known_trans_id, + sync_id, 0) + d.addCallback(partial(self._insert_received_doc, 1, 1)) + number_of_changes, ngen, ntrans = yield d + + if defer_decryption: + self._sync_decr_pool.set_docs_to_process( + number_of_changes) + idx = 1 + + # maybe get more documents + deferreds = [] + while idx < number_of_changes: + d = self._receive_one_doc( + headers, last_known_generation, + last_known_trans_id, sync_id, idx) + d.addCallback( + partial( + self._insert_received_doc, + idx + 1, + number_of_changes)) + deferreds.append(d) + idx += 1 + results = yield defer.gatherResults(deferreds) + + # get genration and transaction id of target after insertions + if deferreds: + _, new_generation, new_transaction_id = results.pop() + + # get current target gen and trans id in case no documents were + def _shutdown_and_finish(res): + self._sync_decr_pool.close() + return new_generation, new_transaction_id + + d = defer.Deferred() + d.addCallback(_shutdown_and_finish) + + def _wait_or_finish(): + if not self._sync_decr_pool.has_finished(): + reactor.callLater( + SyncDecrypterPool.DECRYPT_LOOP_PERIOD, + _wait_or_finish) + else: + d.callback(None) + + # decrypt docs in case of deferred decryption + if defer_decryption: + _wait_or_finish() + else: + d.callback(None) + + new_generation, new_transaction_id = yield d + defer.returnValue([new_generation, new_transaction_id]) + + def _receive_one_doc(self, headers, last_known_generation, + last_known_trans_id, sync_id, received): + entries = ['['] + # add remote replica metadata to the request + self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + # inform server of how many documents have already been received + self._prepare( + ',', entries, received=received) + entries.append('\r\n]') + # send headers + return getPage( + self._url, + method='POST', + headers=headers, + postdata=''.join(entries)) + + def _insert_received_doc(self, idx, total, response): + """ + Insert a received document into the local replica. + + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + :param response: The body and headers of the response. + :type response: tuple(str, dict) + """ + new_generation, new_transaction_id, number_of_changes, doc_id, \ + rev, content, gen, trans_id = \ + self._parse_received_doc_response(response) + if doc_id is not None: + print doc_id + # decrypt incoming document and insert into local database + # ------------------------------------------------------------- + # symmetric decryption of document's contents + # ------------------------------------------------------------- + # If arriving content was symmetrically encrypted, we decrypt it. + # We do it inline if defer_decryption flag is False or no sync_db + # was defined, otherwise we defer it writing it to the received + # docs table. + doc = SoledadDocument(doc_id, rev, content) + if is_symmetrically_encrypted(doc): + if self._queue_for_decrypt: + self._sync_decr_pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + # defer_decryption is False or no-sync-db fallback + doc.set_json(decrypt_doc(self._crypto, doc)) + self._return_doc_cb(doc, gen, trans_id) + else: + # not symmetrically encrypted doc, insert it directly + # or save it in the decrypted stage. + if self._queue_for_decrypt: + self._sync_decr_pool.insert_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + self._return_doc_cb(doc, gen, trans_id) + # ------------------------------------------------------------- + # end of symmetric decryption + # ------------------------------------------------------------- + msg = "%d/%d" % (idx, total) + signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg) + logger.debug("Soledad sync receive status: %s" % msg) + return number_of_changes, new_generation, new_transaction_id + + def _parse_received_doc_response(self, response): + """ + Parse the response from the server containing the received document. + + :param response: The body and headers of the response. + :type response: tuple(str, dict) + + :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, + content, gen, trans_id) + :rtype: tuple + """ + # decode incoming stream + parts = response.splitlines() + if not parts or parts[0] != '[' or parts[-1] != ']': + raise errors.BrokenSyncStream + data = parts[1:-1] + # decode metadata + line, comma = utils.check_and_strip_comma(data[0]) + metadata = None + try: + metadata = json.loads(line) + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + number_of_changes = metadata['number_of_changes'] + except (json.JSONDecodeError, KeyError): + raise errors.BrokenSyncStream + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + # parse incoming document info + doc_id = None + rev = None + content = None + gen = None + trans_id = None + if number_of_changes > 0: + try: + entry = json.loads(data[1]) + doc_id = entry['id'] + rev = entry['rev'] + content = entry['content'] + gen = entry['gen'] + trans_id = entry['trans_id'] + except (IndexError, KeyError): + raise errors.BrokenSyncStream + return new_generation, new_transaction_id, number_of_changes, \ + doc_id, rev, content, gen, trans_id + + def _setup_sync_decr_pool(self): + """ + Set up the SyncDecrypterPool for deferred decryption. + """ + if self._sync_decr_pool is None and self._sync_db is not None: + # initialize syncing queue decryption pool + self._sync_decr_pool = SyncDecrypterPool( + self._crypto, + self._sync_db, + insert_doc_cb=self._insert_doc_cb, + source_replica_uid=self.source_replica_uid) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 16241621..53afbda8 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -53,20 +53,17 @@ from u1db.backends import sqlite_backend from hashlib import sha256 from contextlib import contextmanager from collections import defaultdict -from httplib import CannotSendRequest from functools import partial from pysqlcipher import dbapi2 as sqlcipher_dbapi2 from twisted.internet import reactor -from twisted.internet.task import LoopingCall from twisted.internet.threads import deferToThreadPool from twisted.python.threadpool import ThreadPool -from twisted.python import log from twisted.enterprise import adbapi from leap.soledad.client import encdecpool -from leap.soledad.client.target import SoledadSyncTarget +from leap.soledad.client.http_target import SoledadHTTPSyncTarget from leap.soledad.client.sync import SoledadSynchronizer from leap.soledad.client import pragmas @@ -590,33 +587,13 @@ class SQLCipherU1DBSync(SQLCipherDatabase): before the synchronisation was performed. :rtype: Deferred """ - kwargs = {'creds': creds, 'autocreate': autocreate, - 'defer_decryption': defer_decryption} - return self._defer_to_sync_threadpool(self._sync, url, **kwargs) - - def _sync(self, url, creds=None, autocreate=True, defer_decryption=True): - res = None - # the following context manager blocks until the syncing lock can be # acquired. - # TODO review, I think this is no longer needed with a 1-thread - # threadpool. - - log.msg("in _sync") - self.__url = url with self._syncer(url, creds=creds) as syncer: # XXX could mark the critical section here... - try: - log.msg('syncer sync...') - res = syncer.sync(autocreate=autocreate, - defer_decryption=defer_decryption) - except CannotSendRequest: - logger.warning("Connection with sync target couldn't be " - "established. Resetting connection...") - # closing the connection it will be recreated in the next try - syncer.sync_target.close() - return - return res + return syncer.sync( + autocreate=autocreate, + defer_decryption=defer_decryption) def stop_sync(self): """ @@ -673,13 +650,14 @@ class SQLCipherU1DBSync(SQLCipherDatabase): if syncer is None or h != cur_h: syncer = SoledadSynchronizer( self, - SoledadSyncTarget(url, - # XXX is the replica_uid ready? - self._replica_uid, - creds=creds, - crypto=self._crypto, - sync_db=self._sync_db, - sync_enc_pool=self._sync_enc_pool)) + SoledadHTTPSyncTarget( + url, + # XXX is the replica_uid ready? + self._replica_uid, + creds=creds, + crypto=self._crypto, + sync_db=self._sync_db, + sync_enc_pool=self._sync_enc_pool)) self._syncers[url] = (h, syncer) # in order to reuse the same synchronizer multiple times we have to # reset its state (i.e. the number of documents received from target diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index d4ca4258..f8f74ce7 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -25,9 +25,10 @@ Extend u1db Synchronizer with the ability to: * Be interrupted and recovered. """ import logging -import traceback from threading import Lock +from twisted.internet import defer + from u1db import errors from u1db.sync import Synchronizer @@ -90,6 +91,7 @@ class SoledadSynchronizer(Synchronizer): # Synchronizer may be reused later. self.release_syncing_lock() + @defer.inlineCallbacks def _sync(self, autocreate=False, defer_decryption=True): """ Helper function, called from the main `sync` method. @@ -102,7 +104,7 @@ class SoledadSynchronizer(Synchronizer): ensure_callback = None try: (self.target_replica_uid, target_gen, target_trans_id, - target_my_gen, target_my_trans_id) = \ + target_my_gen, target_my_trans_id) = yield \ sync_target.get_sync_info(self.source._replica_uid) except errors.DatabaseDoesNotExist: if not autocreate: @@ -151,15 +153,15 @@ class SoledadSynchronizer(Synchronizer): self.target_replica_uid) logger.debug( "Soledad source sync info:\n" - " source target gen: %d\n" - " source target trans_id: %s" + " last target gen known to source: %d\n" + " last target trans_id known to source: %s" % (target_last_known_gen, target_last_known_trans_id)) # validate transaction ids if not changes and target_last_known_gen == target_gen: if target_trans_id != target_last_known_trans_id: raise errors.InvalidTransactionId - return my_gen + defer.returnValue(my_gen) # prepare to send all the changed docs changed_doc_ids = [doc_id for doc_id, _, _ in changes] @@ -177,33 +179,26 @@ class SoledadSynchronizer(Synchronizer): # # The sync_exchange method may be interrupted, in which case it will # return a tuple of Nones. - try: - new_gen, new_trans_id = sync_target.sync_exchange( - docs_by_generation, self.source._replica_uid, - target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback, - defer_decryption=defer_decryption) - logger.debug( - "Soledad source sync info after sync exchange:\n" - " source target gen: %d\n" - " source target trans_id: %s" - % (new_gen, new_trans_id)) - info = { - "target_replica_uid": self.target_replica_uid, - "new_gen": new_gen, - "new_trans_id": new_trans_id, - "my_gen": my_gen - } - self._syncing_info = info - self.complete_sync() - except Exception as e: - logger.error("Soledad sync error: %s" % str(e)) - logger.error(traceback.format_exc()) - sync_target.stop() - finally: - sync_target.close() - - return my_gen + new_gen, new_trans_id = yield sync_target.sync_exchange( + docs_by_generation, self.source._replica_uid, + target_last_known_gen, target_last_known_trans_id, + self._insert_doc_from_target, ensure_callback=ensure_callback, + defer_decryption=defer_decryption) + logger.debug( + "Soledad source sync info after sync exchange:\n" + " source known target gen: %d\n" + " source known target trans_id: %s" + % (new_gen, new_trans_id)) + info = { + "target_replica_uid": self.target_replica_uid, + "new_gen": new_gen, + "new_trans_id": new_trans_id, + "my_gen": my_gen + } + self._syncing_info = info + yield self.complete_sync() + + defer.returnValue(my_gen) def complete_sync(self): """ @@ -211,6 +206,9 @@ class SoledadSynchronizer(Synchronizer): (a) record last known generation and transaction uid for the remote replica, and (b) make target aware of our current reached generation. + + :return: A deferred which will fire when the sync has been completed. + :rtype: twisted.internet.defer.Deferred """ logger.debug("Completing deferred last step in SYNC...") @@ -221,7 +219,26 @@ class SoledadSynchronizer(Synchronizer): info["target_replica_uid"], info["new_gen"], info["new_trans_id"]) # if gapless record current reached generation with target - self._record_sync_info_with_the_target(info["my_gen"]) + return self._record_sync_info_with_the_target(info["my_gen"]) + + def _record_sync_info_with_the_target(self, start_generation): + """ + Store local replica metadata in server. + + :param start_generation: The local generation when the sync was + started. + :type start_generation: int + + :return: A deferred which will fire when the operation has been + completed. + :rtype: twisted.internet.defer.Deferred + """ + cur_gen, trans_id = self.source._get_generation_info() + if (cur_gen == start_generation + self.num_inserted + and self.num_inserted > 0): + return self.sync_target.record_sync_info( + self.source._replica_uid, cur_gen, trans_id) + return defer.succeed(None) @property def syncing(self): diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py deleted file mode 100644 index 667aab15..00000000 --- a/client/src/leap/soledad/client/target.py +++ /dev/null @@ -1,1473 +0,0 @@ -# -*- coding: utf-8 -*- -# target.py -# Copyright (C) 2013, 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -A U1DB backend for encrypting data before sending to server and decrypting -after receiving. -""" -import cStringIO -import gzip -import logging -import re -import urllib -import threading - -from collections import defaultdict -from time import sleep -from uuid import uuid4 -from functools import partial - -import simplejson as json - -from u1db import errors -from u1db.remote import utils, http_errors -from u1db.remote.http_target import HTTPSyncTarget -from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase -from zope.proxy import ProxyBase -from zope.proxy import setProxiedObject - -from twisted.internet import defer - -from leap.soledad.common.document import SoledadDocument -from leap.soledad.client.auth import TokenBasedAuth -from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.crypto import encrypt_doc, decrypt_doc -from leap.soledad.client.encdecpool import SyncDecrypterPool -from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS -from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS -from leap.soledad.client.events import signal - - -logger = logging.getLogger(__name__) - - -def _gunzip(data): - """ - Uncompress data that is gzipped. - - :param data: gzipped data - :type data: basestring - """ - buffer = cStringIO.StringIO() - buffer.write(data) - buffer.seek(0) - try: - data = gzip.GzipFile(mode='r', fileobj=buffer).read() - except Exception: - logger.warning("Error while decrypting gzipped data") - buffer.close() - return data - - -class DocumentSyncerThread(threading.Thread): - """ - A thread that knowns how to either send or receive a document during the - sync process. - """ - - def __init__(self, doc_syncer, release_method, failed_method, - idx, total, last_request_lock=None, last_callback_lock=None): - """ - Initialize a new syncer thread. - - :param doc_syncer: A document syncer. - :type doc_syncer: HTTPDocumentSyncer - :param release_method: A method to be called when finished running. - :type release_method: callable(DocumentSyncerThread) - :param failed_method: A method to be called when we failed. - :type failed_method: callable(DocumentSyncerThread) - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - :param last_request_lock: A lock to wait for before actually performing - the request. - :type last_request_lock: threading.Lock - :param last_callback_lock: A lock to wait for before actually running - the success callback. - :type last_callback_lock: threading.Lock - """ - threading.Thread.__init__(self) - self._doc_syncer = doc_syncer - self._release_method = release_method - self._failed_method = failed_method - self._idx = idx - self._total = total - self._last_request_lock = last_request_lock - self._last_callback_lock = last_callback_lock - self._response = None - self._exception = None - self._result = None - self._success = False - self.started = threading.Event() - # a lock so we can signal when we're finished - self._request_lock = threading.Lock() - self._request_lock.acquire() - self._callback_lock = threading.Lock() - self._callback_lock.acquire() - # make thread interruptable - self._stopped = None - self._stop_lock = threading.Lock() - - def run(self): - """ - Run the HTTP request and store results. - - This method will block and wait for an eventual previous operation to - finish before actually performing the request. It also traps any - exception and register any failure with the request. - """ - self.started.set() - - with self._stop_lock: - if self._stopped is None: - self._stopped = False - else: - return - - # eventually wait for the previous thread to finish - if self._last_request_lock is not None: - self._last_request_lock.acquire() - - # bail out in case we've been interrupted - if self.stopped is True: - return - - try: - self._response = self._doc_syncer.do_request() - self._request_lock.release() - - # run success callback - if self._doc_syncer.success_callback is not None: - - # eventually wait for callback lock release - if self._last_callback_lock is not None: - self._last_callback_lock.acquire() - - # bail out in case we've been interrupted - if self._stopped is True: - return - - self._result = self._doc_syncer.success_callback( - self._idx, self._total, self._response) - self._success = True - doc_syncer = self._doc_syncer - self._release_method(self, doc_syncer) - self._doc_syncer = None - # let next thread executed its callback - self._callback_lock.release() - - # trap any exception and signal failure - except Exception as e: - self._exception = e - self._success = False - # run failure callback - if self._doc_syncer.failure_callback is not None: - - # eventually wait for callback lock release - if self._last_callback_lock is not None: - self._last_callback_lock.acquire() - - # bail out in case we've been interrupted - if self.stopped is True: - return - - self._doc_syncer.failure_callback( - self._idx, self._total, self._exception) - - self._failed_method() - # we do not release the callback lock here because we - # failed and so we don't want other threads to succeed. - - @property - def doc_syncer(self): - return self._doc_syncer - - @property - def response(self): - return self._response - - @property - def exception(self): - return self._exception - - @property - def callback_lock(self): - return self._callback_lock - - @property - def request_lock(self): - return self._request_lock - - @property - def success(self): - return self._success - - def stop(self): - with self._stop_lock: - self._stopped = True - - @property - def stopped(self): - with self._stop_lock: - return self._stopped - - @property - def result(self): - return self._result - - -class DocumentSyncerPool(object): - """ - A pool of reusable document syncers. - """ - - POOL_SIZE = 10 - """ - The maximum amount of syncer threads running at the same time. - """ - - def __init__(self, raw_url, raw_creds, query_string, headers, - ensure_callback, stop_method): - """ - Initialize the document syncer pool. - - :param raw_url: The complete raw URL for the HTTP request. - :type raw_url: str - :param raw_creds: The credentials for the HTTP request. - :type raw_creds: dict - :param query_string: The query string for the HTTP request. - :type query_string: str - :param headers: The headers for the HTTP request. - :type headers: dict - :param ensure_callback: A callback to ensure we have the correct - target_replica_uid, if it was just created. - :type ensure_callback: callable - - """ - # save syncer params - self._raw_url = raw_url - self._raw_creds = raw_creds - self._query_string = query_string - self._headers = headers - self._ensure_callback = ensure_callback - self._stop_method = stop_method - # pool attributes - self._failures = False - self._semaphore_pool = threading.BoundedSemaphore( - DocumentSyncerPool.POOL_SIZE) - self._pool_access_lock = threading.Lock() - self._doc_syncers = [] - self._threads = [] - - def new_syncer_thread(self, idx, total, last_request_lock=None, - last_callback_lock=None): - """ - Yield a new document syncer thread. - - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - :param last_request_lock: A lock to wait for before actually performing - the request. - :type last_request_lock: threading.Lock - :param last_callback_lock: A lock to wait for before actually running - the success callback. - :type last_callback_lock: threading.Lock - """ - t = None - # wait for available threads - self._semaphore_pool.acquire() - with self._pool_access_lock: - if self._failures is True: - return None - # get a syncer - doc_syncer = self._get_syncer() - # we rely on DocumentSyncerThread.run() to release the lock using - # self.release_syncer so we can launch a new thread. - t = DocumentSyncerThread( - doc_syncer, self.release_syncer, self.stop_threads, - idx, total, - last_request_lock=last_request_lock, - last_callback_lock=last_callback_lock) - self._threads.append(t) - return t - - def _failed(self): - with self._pool_access_lock: - self._failures = True - - @property - def failures(self): - return self._failures - - def _get_syncer(self): - """ - Get a document syncer from the pool. - - This method will create a new syncer whenever there is no syncer - available in the pool. - - :return: A syncer. - :rtype: HTTPDocumentSyncer - """ - syncer = None - # get an available syncer or create a new one - try: - syncer = self._doc_syncers.pop() - except IndexError: - syncer = HTTPDocumentSyncer( - self._raw_url, self._raw_creds, self._query_string, - self._headers, self._ensure_callback) - return syncer - - def release_syncer(self, syncer_thread, doc_syncer): - """ - Return a syncer to the pool after use and check for any failures. - - :param syncer: The syncer to be returned to the pool. - :type syncer: HTTPDocumentSyncer - """ - with self._pool_access_lock: - self._doc_syncers.append(doc_syncer) - if syncer_thread.success is True: - self._threads.remove(syncer_thread) - self._semaphore_pool.release() - - def stop_threads(self, fail=True): - """ - Stop all threads in the pool. - - :param fail: Whether we are stopping because of a failure. - :type fail: bool - """ - # stop sync - self._stop_method() - stopped = [] - # stop all threads - with self._pool_access_lock: - if fail: - self._failures = True - logger.error("sync failed: cancelling sync threads...") - while self._threads: - t = self._threads.pop(0) - t.stop() - self._doc_syncers.append(t.doc_syncer) - stopped.append(t) - # release locks and join - while stopped: - t = stopped.pop(0) - t.request_lock.acquire(False) # just in case - t.request_lock.release() - t.callback_lock.acquire(False) # just in case - t.callback_lock.release() - # release any blocking semaphores - for i in xrange(DocumentSyncerPool.POOL_SIZE): - try: - self._semaphore_pool.release() - except ValueError: - break - if fail: - logger.error("Soledad sync: cancelled sync threads.") - - def cleanup(self): - """ - Close and remove any syncers from the pool. - """ - with self._pool_access_lock: - while self._doc_syncers: - syncer = self._doc_syncers.pop() - syncer.close() - del syncer - - -class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): - - def __init__(self, raw_url, creds, query_string, headers, ensure_callback): - """ - Initialize the client. - - :param raw_url: The raw URL of the target HTTP server. - :type raw_url: str - :param creds: Authentication credentials. - :type creds: dict - :param query_string: The query string for the HTTP request. - :type query_string: str - :param headers: The headers for the HTTP request. - :type headers: dict - :param ensure_callback: A callback to ensure we have the correct - target_replica_uid, if it was just created. - :type ensure_callback: callable - """ - HTTPClientBase.__init__(self, raw_url, creds=creds) - # info needed to perform the request - self._query_string = query_string - self._headers = headers - self._ensure_callback = ensure_callback - # the actual request method - self._request_method = None - self._success_callback = None - self._failure_callback = None - - def _reset(self): - """ - Reset this document syncer so we can reuse it. - """ - self._request_method = None - self._success_callback = None - self._failure_callback = None - self._request_method = None - - def set_request_method(self, method, *args, **kwargs): - """ - Set the actual method to perform the request. - - :param method: Either 'get' or 'put'. - :type method: str - :param args: Arguments for the request method. - :type args: list - :param kwargs: Keyworded arguments for the request method. - :type kwargs: dict - """ - self._reset() - # resolve request method - if method is 'get': - self._request_method = self._get_doc - elif method is 'put': - self._request_method = self._put_doc - else: - raise Exception - # store request method args - self._args = args - self._kwargs = kwargs - - def set_success_callback(self, callback): - self._success_callback = callback - - def set_failure_callback(self, callback): - self._failure_callback = callback - - @property - def success_callback(self): - return self._success_callback - - @property - def failure_callback(self): - return self._failure_callback - - def do_request(self): - """ - Actually perform the request. - - :return: The body and headers of the response. - :rtype: tuple - """ - self._ensure_connection() - args = self._args - kwargs = self._kwargs - return self._request_method(*args, **kwargs) - - def _request(self, method, url_parts, params=None, body=None, - content_type=None): - """ - Perform an HTTP request. - - :param method: The HTTP request method. - :type method: str - :param url_parts: A list representing the request path. - :type url_parts: list - :param params: Parameters for the URL query string. - :type params: dict - :param body: The body of the request. - :type body: str - :param content-type: The content-type of the request. - :type content-type: str - - :return: The body and headers of the response. - :rtype: tuple - - :raise errors.Unavailable: Raised after a number of unsuccesful - request attempts. - :raise Exception: Raised for any other exception ocurring during the - request. - """ - - self._ensure_connection() - unquoted_url = url_query = self._url.path - if url_parts: - if not url_query.endswith('/'): - url_query += '/' - unquoted_url = url_query - url_query += '/'.join(urllib.quote(part, safe='') - for part in url_parts) - # oauth performs its own quoting - unquoted_url += '/'.join(url_parts) - encoded_params = {} - if params: - for key, value in params.items(): - key = unicode(key).encode('utf-8') - encoded_params[key] = _encode_query_parameter(value) - url_query += ('?' + urllib.urlencode(encoded_params)) - if body is not None and not isinstance(body, basestring): - body = json.dumps(body) - content_type = 'application/json' - headers = {} - if content_type: - headers['content-type'] = content_type - - # Patched: We would like to receive gzip pretty please - # ---------------------------------------------------- - headers['accept-encoding'] = "gzip" - # ---------------------------------------------------- - - headers.update( - self._sign_request(method, unquoted_url, encoded_params)) - - for delay in self._delays: - try: - self._conn.request(method, url_query, body, headers) - return self._response() - except errors.Unavailable, e: - sleep(delay) - raise e - - def _response(self): - """ - Return the response of the (possibly gzipped) HTTP request. - - :return: The body and headers of the response. - :rtype: tuple - """ - resp = self._conn.getresponse() - body = resp.read() - headers = dict(resp.getheaders()) - - # Patched: We would like to decode gzip - # ---------------------------------------------------- - encoding = headers.get('content-encoding', '') - if "gzip" in encoding: - body = _gunzip(body) - # ---------------------------------------------------- - - if resp.status in (200, 201): - return body, headers - elif resp.status in http_errors.ERROR_STATUSES: - try: - respdic = json.loads(body) - except ValueError: - pass - else: - self._error(respdic) - # special case - if resp.status == 503: - raise errors.Unavailable(body, headers) - raise errors.HTTPError(resp.status, body, headers) - - def _prepare(self, comma, entries, **dic): - """ - Prepare an entry to be sent through a syncing POST request. - - :param comma: A string to be prepended to the current entry. - :type comma: str - :param entries: A list of entries accumulated to be sent on the - request. - :type entries: list - :param dic: The data to be included in this entry. - :type dic: dict - - :return: The size of the prepared entry. - :rtype: int - """ - entry = comma + '\r\n' + json.dumps(dic) - entries.append(entry) - return len(entry) - - def _init_post_request(self, action, content_length): - """ - Initiate a syncing POST request. - - :param url: The syncing URL. - :type url: str - :param action: The syncing action, either 'get' or 'receive'. - :type action: str - :param headers: The initial headers to be sent on this request. - :type headers: dict - :param content_length: The content-length of the request. - :type content_length: int - """ - self._conn.putrequest('POST', self._query_string) - self._conn.putheader( - 'content-type', 'application/x-soledad-sync-%s' % action) - for header_name, header_value in self._headers: - self._conn.putheader(header_name, header_value) - self._conn.putheader('accept-encoding', 'gzip') - self._conn.putheader('content-length', str(content_length)) - self._conn.endheaders() - - def _get_doc(self, received, sync_id, last_known_generation, - last_known_trans_id): - """ - Get a sync document from server by means of a POST request. - - :param received: The number of documents already received in the - current sync session. - :type received: int - :param sync_id: The id for the current sync session. - :type sync_id: str - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - - :return: The body and headers of the response. - :rtype: tuple - """ - entries = ['['] - size = 1 - # add remote replica metadata to the request - size += self._prepare( - '', entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=self._ensure_callback is not None) - # inform server of how many documents have already been received - size += self._prepare( - ',', entries, received=received) - entries.append('\r\n]') - size += len(entries[-1]) - # send headers - self._init_post_request('get', size) - # get document - for entry in entries: - self._conn.send(entry) - return self._response() - - def _put_doc(self, sync_id, last_known_generation, last_known_trans_id, - id, rev, content, gen, trans_id, number_of_docs, doc_idx): - """ - Put a sync document on server by means of a POST request. - - :param sync_id: The id for the current sync session. - :type sync_id: str - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - :param id: The document id. - :type id: str - :param rev: The document revision. - :type rev: str - :param content: The serialized document content. - :type content: str - :param gen: The generation of the modification of the document. - :type gen: int - :param trans_id: The transaction id of the modification of the - document. - :type trans_id: str - :param number_of_docs: The total amount of documents sent on this sync - session. - :type number_of_docs: int - :param doc_idx: The index of the current document being sent. - :type doc_idx: int - - :return: The body and headers of the response. - :rtype: tuple - """ - # prepare to send the document - entries = ['['] - size = 1 - # add remote replica metadata to the request - size += self._prepare( - '', entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=self._ensure_callback is not None) - # add the document to the request - size += self._prepare( - ',', entries, - id=id, rev=rev, content=content, gen=gen, trans_id=trans_id, - number_of_docs=number_of_docs, doc_idx=doc_idx) - entries.append('\r\n]') - size += len(entries[-1]) - # send headers - self._init_post_request('put', size) - # send document - for entry in entries: - self._conn.send(entry) - return self._response() - - def _sign_request(self, method, url_query, params): - """ - Return an authorization header to be included in the HTTP request. - - :param method: The HTTP method. - :type method: str - :param url_query: The URL query string. - :type url_query: str - :param params: A list with encoded query parameters. - :type param: list - - :return: The Authorization header. - :rtype: list of tuple - """ - return TokenBasedAuth._sign_request(self, method, url_query, params) - - def set_token_credentials(self, uuid, token): - """ - Store given credentials so we can sign the request later. - - :param uuid: The user's uuid. - :type uuid: str - :param token: The authentication token. - :type token: str - """ - TokenBasedAuth.set_token_credentials(self, uuid, token) - - -class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): - """ - A SyncTarget that encrypts data before sending and decrypts data after - receiving. - - Normally encryption will have been written to the sync database upon - document modification. The sync database is also used to write temporarily - the parsed documents that the remote send us, before being decrypted and - written to the main database. - """ - - # will later keep a reference to the insert-doc callback - # passed to sync_exchange - _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) - - # - # Modified HTTPSyncTarget methods. - # - - def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, - sync_db=None, sync_enc_pool=None): - """ - Initialize the SoledadSyncTarget. - - :param source_replica_uid: The source replica uid which we use when - deferring decryption. - :type source_replica_uid: str - :param url: The url of the target replica to sync with. - :type url: str - :param creds: Optional dictionary giving credentials. - to authorize the operation with the server. - :type creds: dict - :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt - document contents when syncing. - :type crypto: soledad.crypto.SoledadCrypto - :param sync_db: Optional. handler for the db with the symmetric - encryption of the syncing documents. If - None, encryption will be done in-place, - instead of retreiving it from the dedicated - database. - :type sync_db: Sqlite handler - """ - HTTPSyncTarget.__init__(self, url, creds) - self._raw_url = url - self._raw_creds = creds - self._crypto = crypto - self._stopped = True - self._stop_lock = threading.Lock() - self.source_replica_uid = source_replica_uid - self._syncer_pool = None - - # asynchronous encryption/decryption attributes - self._sync_db = sync_db - self._sync_enc_pool = sync_enc_pool - self._decryption_callback = None - self._sync_decr_pool = None - - def _setup_sync_decr_pool(self): - """ - Set up the SyncDecrypterPool for deferred decryption. - """ - if self._sync_decr_pool is None and self._sync_db is not None: - # initialize syncing queue decryption pool - self._sync_decr_pool = SyncDecrypterPool( - self._crypto, - self._sync_db, - insert_doc_cb=self._insert_doc_cb, - source_replica_uid=self.source_replica_uid) - - def _teardown_sync_decr_pool(self): - """ - Tear down the SyncDecrypterPool. - """ - self._sync_decr_pool.close() - self._sync_decr_pool = None - - def _get_replica_uid(self, url): - """ - Return replica uid from the url, or None. - - :param url: the replica url - :type url: str - """ - replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url) - return replica_uid_match[0] if len(replica_uid_match) > 0 else None - - @staticmethod - def connect(url, source_replica_uid=None, crypto=None): - return SoledadSyncTarget( - url, source_replica_uid=source_replica_uid, crypto=crypto) - - def _parse_received_doc_response(self, response): - """ - Parse the response from the server containing the received document. - - :param response: The body and headers of the response. - :type response: tuple(str, dict) - """ - data, _ = response - # decode incoming stream - parts = data.splitlines() - if not parts or parts[0] != '[' or parts[-1] != ']': - raise errors.BrokenSyncStream - data = parts[1:-1] - # decode metadata - line, comma = utils.check_and_strip_comma(data[0]) - metadata = None - try: - metadata = json.loads(line) - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - number_of_changes = metadata['number_of_changes'] - except (json.JSONDecodeError, KeyError): - raise errors.BrokenSyncStream - # make sure we have replica_uid from fresh new dbs - if self._ensure_callback and 'replica_uid' in metadata: - self._ensure_callback(metadata['replica_uid']) - # parse incoming document info - doc_id = None - rev = None - content = None - gen = None - trans_id = None - if number_of_changes > 0: - try: - entry = json.loads(data[1]) - doc_id = entry['id'] - rev = entry['rev'] - content = entry['content'] - gen = entry['gen'] - trans_id = entry['trans_id'] - except (IndexError, KeyError): - raise errors.BrokenSyncStream - return new_generation, new_transaction_id, number_of_changes, \ - doc_id, rev, content, gen, trans_id - - def _insert_received_doc(self, idx, total, response): - """ - Insert a received document into the local replica. - - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - :param response: The body and headers of the response. - :type response: tuple(str, dict) - """ - new_generation, new_transaction_id, number_of_changes, doc_id, \ - rev, content, gen, trans_id = \ - self._parse_received_doc_response(response) - if doc_id is not None: - # decrypt incoming document and insert into local database - # ------------------------------------------------------------- - # symmetric decryption of document's contents - # ------------------------------------------------------------- - # If arriving content was symmetrically encrypted, we decrypt it. - # We do it inline if defer_decryption flag is False or no sync_db - # was defined, otherwise we defer it writing it to the received - # docs table. - doc = SoledadDocument(doc_id, rev, content) - if is_symmetrically_encrypted(doc): - if self._queue_for_decrypt: - self._enqueue_encrypted_received_doc( - doc, gen, trans_id, idx, total) - else: - # defer_decryption is False or no-sync-db fallback - doc.set_json(decrypt_doc(self._crypto, doc)) - self._return_doc_cb(doc, gen, trans_id) - else: - # not symmetrically encrypted doc, insert it directly - # or save it in the decrypted stage. - if self._queue_for_decrypt: - self._enqueue_received_doc(doc, gen, trans_id, idx, total) - else: - self._return_doc_cb(doc, gen, trans_id) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- - msg = "%d/%d" % (idx + 1, total) - signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg) - logger.debug("Soledad sync receive status: %s" % msg) - return number_of_changes, new_generation, new_transaction_id - - def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, - headers, return_doc_cb, ensure_callback, sync_id, - defer_decryption=False): - """ - Fetch sync documents from the remote database and insert them in the - local database. - - If an incoming document's encryption scheme is equal to - EncryptionSchemes.SYMKEY, then this method will decrypt it with - Soledad's symmetric key. - - :param url: The syncing URL. - :type url: str - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - :param headers: The headers of the HTTP request. - :type headers: dict - :param return_doc_cb: A callback to insert docs from target. - :type return_doc_cb: callable - :param ensure_callback: A callback to ensure we have the correct - target_replica_uid, if it was just created. - :type ensure_callback: callable - :param sync_id: The id for the current sync session. - :type sync_id: str - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - - :raise BrokenSyncStream: If `data` is malformed. - - :return: A dictionary representing the first line of the response got - from remote replica. - :rtype: dict - """ - # we keep a reference to the callback in case we defer the decryption - self._return_doc_cb = return_doc_cb - self._queue_for_decrypt = defer_decryption \ - and self._sync_db is not None - - new_generation = last_known_generation - new_transaction_id = last_known_trans_id - - if self._queue_for_decrypt: - logger.debug( - "Soledad sync: will queue received docs for decrypting.") - - idx = 0 - number_of_changes = 1 - - first_request = True - last_callback_lock = None - threads = [] - - # get incoming documents - while idx < number_of_changes: - # bail out if sync process was interrupted - if self.stopped is True: - break - - # launch a thread to fetch one document from target - t = self._syncer_pool.new_syncer_thread( - idx, number_of_changes, - last_callback_lock=last_callback_lock) - - # bail out if any thread failed - if t is None: - self.stop(fail=True) - break - - if defer_decryption: - self._setup_sync_decr_pool() - - t.doc_syncer.set_request_method( - 'get', idx, sync_id, last_known_generation, - last_known_trans_id) - t.doc_syncer.set_success_callback(self._insert_received_doc) - - def _failure_callback(idx, total, exception): - _failure_msg = "Soledad sync: error while getting document " \ - "%d/%d: %s" \ - % (idx + 1, total, exception) - logger.warning("%s" % _failure_msg) - logger.warning("Soledad sync: failing gracefully, will " - "recover on next sync.") - - t.doc_syncer.set_failure_callback(_failure_callback) - threads.append(t) - t.start() - last_callback_lock = t.callback_lock - idx += 1 - - # if this is the first request, wait to update the number of - # changes - if first_request is True: - t.join() - if t.success: - number_of_changes, _, _ = t.result - if defer_decryption: - self._sync_decr_pool.set_docs_to_process( - number_of_changes) - else: - raise t.exception - first_request = False - - # make sure all threads finished and we have up-to-date info - last_successful_thread = None - while threads: - # check if there are failures - t = threads.pop(0) - t.join() - if t.success: - last_successful_thread = t - else: - raise t.exception - - # get information about last successful thread - if last_successful_thread is not None: - body, _ = last_successful_thread.response - parsed_body = json.loads(body) - # get current target gen and trans id in case no documents were - # transferred - if len(parsed_body) == 1: - metadata = parsed_body[0] - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - # get current target gen and trans id from last transferred - # document - else: - doc_data = parsed_body[1] - new_generation = doc_data['gen'] - new_transaction_id = doc_data['trans_id'] - - # decrypt docs in case of deferred decryption - if defer_decryption: - self._sync_decr_pool.wait() - self._teardown_sync_decr_pool() - - return new_generation, new_transaction_id - - @property - def _defer_encryption(self): - return self._sync_enc_pool is not None - - @property - def _defer_decryption(self): - return self._sync_decr_pool is not None - - def sync_exchange(self, docs_by_generations, - source_replica_uid, last_known_generation, - last_known_trans_id, return_doc_cb, - ensure_callback=None, defer_decryption=True, - sync_id=None): - """ - Find out which documents the remote database does not know about, - encrypt and send them. - - This does the same as the parent's method but encrypts content before - syncing. - - :param docs_by_generations: A list of (doc_id, generation, trans_id) - of local documents that were changed since - the last local generation the remote - replica knows about. - :type docs_by_generations: list of tuples - - :param source_replica_uid: The uid of the source replica. - :type source_replica_uid: str - - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - - :param return_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type return_doc_cb: function - - :param ensure_callback: A callback that ensures we know the target - replica uid if the target replica was just - created. - :type ensure_callback: function - - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - - :return: The new generation and transaction id of the target replica. - :rtype: tuple - """ - self._ensure_callback = ensure_callback - - self.start() - - if sync_id is None: - sync_id = str(uuid4()) - self.source_replica_uid = source_replica_uid - # let the decrypter pool access the passed callback to insert docs - setProxiedObject(self._insert_doc_cb[source_replica_uid], - return_doc_cb) - - self._ensure_connection() - if self._trace_hook: # for tests - self._trace_hook('sync_exchange') - url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) - headers = self._sign_request('POST', url, {}) - - cur_target_gen = last_known_generation - cur_target_trans_id = last_known_trans_id - - # ------------------------------------------------------------------- - # start of send documents to target - # ------------------------------------------------------------------- - msg = "%d/%d" % (0, len(docs_by_generations)) - signal(SOLEDAD_SYNC_SEND_STATUS, msg) - logger.debug("Soledad sync send status: %s" % msg) - - self._syncer_pool = DocumentSyncerPool( - self._raw_url, self._raw_creds, url, headers, ensure_callback, - self.stop_syncer) - threads = [] - last_callback_lock = None - - sent = 0 - total = len(docs_by_generations) - - synced = [] - number_of_docs = len(docs_by_generations) - - last_request_lock = None - for doc, gen, trans_id in docs_by_generations: - # allow for interrupting the sync process - if self.stopped is True: - break - - # skip non-syncable docs - if isinstance(doc, SoledadDocument) and not doc.syncable: - continue - - # ------------------------------------------------------------- - # symmetric encryption of document's contents - # ------------------------------------------------------------- - - # the following var will hold a deferred because we may try to - # fetch the encrypted document from the sync db - d = None - - if doc.is_tombstone(): - d = defer.succeed(None) - elif not self._defer_encryption: - # fallback case, for tests - d = defer.succeed(encrypt_doc(self._crypto, doc)) - else: - - def _maybe_encrypt_doc_inline(doc_json): - if doc_json is None: - # the document is not marked as tombstone, but we got - # nothing from the sync db. As it is not encrypted - # yet, we force inline encryption. - # TODO: implement a queue to deal with these cases. - return encrypt_doc(self._crypto, doc) - return doc_json - - d = self.get_encrypted_doc_from_db(doc.doc_id, doc.rev) - d.addCallback(_maybe_encrypt_doc_inline) - # ------------------------------------------------------------- - # end of symmetric encryption - # ------------------------------------------------------------- - - t = self._syncer_pool.new_syncer_thread( - sent + 1, total, last_request_lock=last_request_lock, - last_callback_lock=last_callback_lock) - - # bail out if creation of any thread failed - if t is None: - self.stop(fail=True) - break - - # the following callback will be called when the document's - # encrypted content is available, either because it was found on - # the sync db or because it has been encrypted inline. - - def _configure_and_start_thread(t, doc_json): - # set the request method - t.doc_syncer.set_request_method( - 'put', sync_id, cur_target_gen, cur_target_trans_id, - id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, - trans_id=trans_id, number_of_docs=number_of_docs, - doc_idx=sent + 1) - # set the success calback - - def _success_callback(idx, total, response): - _success_msg = "Soledad sync send status: %d/%d" \ - % (idx, total) - signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) - logger.debug(_success_msg) - - t.doc_syncer.set_success_callback(_success_callback) - - # set the failure callback - def _failure_callback(idx, total, exception): - _failure_msg = "Soledad sync: error while sending document " \ - "%d/%d: %s" % (idx, total, exception) - logger.warning("%s" % _failure_msg) - logger.warning("Soledad sync: failing gracefully, will " - "recover on next sync.") - - t.doc_syncer.set_failure_callback(_failure_callback) - - # save thread and append - t.start() - - d.addCallback(partial(_configure_and_start_thread, t)) - - threads.append((t, doc)) - - # update lock references so they can be used in next call to - # syncer_pool.new_syncer_thread() above - last_callback_lock = t.callback_lock - last_request_lock = t.request_lock - - sent += 1 - - # make sure all threads finished and we have up-to-date info - last_successful_thread = None - while threads: - # check if there are failures - t, doc = threads.pop(0) - t.started.wait() - t.join() - if t.success: - synced.append((doc.doc_id, doc.rev)) - last_successful_thread = t - else: - raise t.exception - - # delete documents from the sync database - if self._defer_encryption: - self._delete_encrypted_docs_from_db(synced) - - # get target gen and trans_id after docs - gen_after_send = None - trans_id_after_send = None - if last_successful_thread is not None: - response_dict = json.loads(last_successful_thread.response[0])[0] - gen_after_send = response_dict['new_generation'] - trans_id_after_send = response_dict['new_transaction_id'] - # ------------------------------------------------------------------- - # end of send documents to target - # ------------------------------------------------------------------- - - # ------------------------------------------------------------------- - # start of fetch documents from target - # ------------------------------------------------------------------- - defer_decryption = defer_decryption and self._defer_decryption - if self.stopped is False: - cur_target_gen, cur_target_trans_id = self._get_remote_docs( - url, - last_known_generation, last_known_trans_id, headers, - return_doc_cb, ensure_callback, sync_id, - defer_decryption=defer_decryption) - # ------------------------------------------------------------------- - # end of fetch documents from target - # ------------------------------------------------------------------- - - self._syncer_pool.cleanup() - - # update gen and trans id info in case we just sent and did not - # receive docs. - if gen_after_send is not None and gen_after_send > cur_target_gen: - cur_target_gen = gen_after_send - cur_target_trans_id = trans_id_after_send - - self.stop(fail=False) - self._syncer_pool = None - return cur_target_gen, cur_target_trans_id - - def start(self): - """ - Mark current sync session as running. - """ - with self._stop_lock: - self._stopped = False - - def stop_syncer(self): - with self._stop_lock: - self._stopped = True - - def stop(self, fail=False): - """ - Mark current sync session as stopped. - - This will eventually interrupt the sync_exchange() method and return - enough information to the synchronizer so the sync session can be - recovered afterwards. - - :param fail: Whether we are stopping because of a failure. - :type fail: bool - """ - self.stop_syncer() - if self._syncer_pool: - self._syncer_pool.stop_threads(fail=fail) - - @property - def stopped(self): - """ - Return whether this sync session is stopped. - - :return: Whether this sync session is stopped. - :rtype: bool - """ - with self._stop_lock: - return self._stopped is True - - # - # Symmetric encryption of syncing docs - # - - def get_encrypted_doc_from_db(self, doc_id, doc_rev): - """ - Retrieve encrypted document from the database of encrypted docs for - sync. - - :param doc_id: The Document id. - :type doc_id: str - - :param doc_rev: The document revision - :type doc_rev: str - - :return: A deferred which is fired with the document's encrypted - content or None if the document was not found on the sync db. - :rtype: twisted.internet.defer.Deferred - """ - logger.debug("Looking for encrypted document on sync db: %s" % doc_id) - return self._sync_enc_pool.get_encrypted_doc(doc_id, doc_rev) - - def _delete_encrypted_docs_from_db(self, docs): - """ - Delete several encrypted documents from the database of symmetrically - encrypted docs to sync. - - :param docs: an iterable with (doc_id, doc_rev) for all documents - to be deleted. - :type docs: any iterable of tuples of str - """ - for doc_id, doc_rev in docs: - logger.debug("Removing encrypted document on sync db: %s" - % doc_id) - return self._sync_enc_pool.delete_encrypted_doc(doc_id, doc_rev) - - # - # Symmetric decryption of syncing docs - # - - def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total): - """ - Save a symmetrically encrypted incoming document into the received - docs table in the sync db. A decryption task will pick it up - from here in turn. - - :param doc: The document to save. - :type doc: SoledadDocument - :param gen: The generation. - :type gen: str - :param trans_id: Transacion id. - - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - """ - logger.debug("Enqueueing doc for decryption: %d/%d." - % (idx + 1, total)) - self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) - - def _enqueue_received_doc(self, doc, gen, trans_id, idx, total): - """ - Save any incoming document into the received docs table in the sync db. - - :param doc: The document to save. - :type doc: SoledadDocument - :param gen: The generation. - :type gen: str - :param trans_id: Transacion id. - :type gen: str - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - """ - logger.debug("Enqueueing doc, no decryption needed: %d/%d." - % (idx + 1, total)) - self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) - - def set_decryption_callback(self, cb): - """ - Set callback to be called when the decryption finishes. - - :param cb: The callback to be set. - :type cb: callable - """ - self._decryption_callback = cb - - def has_decryption_callback(self): - """ - Return True if there is a decryption callback set. - :rtype: bool - """ - return self._decryption_callback is not None - - # - # Authentication methods - # - - def _sign_request(self, method, url_query, params): - """ - Return an authorization header to be included in the HTTP request. - - :param method: The HTTP method. - :type method: str - :param url_query: The URL query string. - :type url_query: str - :param params: A list with encoded query parameters. - :type param: list - - :return: The Authorization header. - :rtype: list of tuple - """ - return TokenBasedAuth._sign_request(self, method, url_query, params) - - def set_token_credentials(self, uuid, token): - """ - Store given credentials so we can sign the request later. - - :param uuid: The user's uuid. - :type uuid: str - :param token: The authentication token. - :type token: str - """ - TokenBasedAuth.set_token_credentials(self, uuid, token) -- cgit v1.2.3 From 5376b0eb9ff906fc755b18b39c87ffdc36849d1c Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 19 May 2015 19:04:26 -0300 Subject: [refactor] cleanup sync, remove unused stuff This commit does the following: * Remove the autocreate parameter from the sync() method. * Remove the syncing lock from the sync module because it did the same job as the lock in the sqlcipher module. * Remove the close/stop methods from sync module as they don't make sense after we started to use twisted in client-side sync. --- client/src/leap/soledad/client/api.py | 5 +- client/src/leap/soledad/client/sqlcipher.py | 25 +------- client/src/leap/soledad/client/sync.py | 96 +++-------------------------- 3 files changed, 10 insertions(+), 116 deletions(-) diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 0f29503f..7b45dd7f 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -650,7 +650,7 @@ class Soledad(object): sync_url = urlparse.urljoin(self._server_url, 'user-%s' % self.uuid) d = self._dbsyncer.sync( sync_url, - creds=self._creds, autocreate=False, + creds=self._creds, defer_decryption=defer_decryption) def _sync_callback(local_gen): @@ -670,9 +670,6 @@ class Soledad(object): d.addCallbacks(_sync_callback, _sync_errback) return d - def stop_sync(self): - self._dbsyncer.stop_sync() - @property def syncing(self): """ diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 53afbda8..7fde9a7c 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -414,7 +414,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): Soledad syncer implementation. """ - _sync_loop = None _sync_enc_pool = None """ @@ -557,7 +556,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): decr.TABLE_NAME, decr.FIELD_NAMES)) return (sql_encr_table_query, sql_decr_table_query) - def sync(self, url, creds=None, autocreate=True, defer_decryption=True): + def sync(self, url, creds=None, defer_decryption=True): """ Synchronize documents with remote replica exposed at url. @@ -575,8 +574,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): optional dictionary giving credentials. to authorize the operation with the server. :type creds: dict - :param autocreate: Ask the target to create the db if non-existent. - :type autocreate: bool :param defer_decryption: Whether to defer the decryption process using the intermediate database. If False, decryption will be done inline. @@ -591,20 +588,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # acquired. with self._syncer(url, creds=creds) as syncer: # XXX could mark the critical section here... - return syncer.sync( - autocreate=autocreate, - defer_decryption=defer_decryption) - - def stop_sync(self): - """ - Interrupt all ongoing syncs. - """ - self._stop_sync() - - def _stop_sync(self): - for url in self._syncers: - _, syncer = self._syncers[url] - syncer.stop() + return syncer.sync(defer_decryption=defer_decryption) @contextmanager def _syncer(self, url, creds=None): @@ -687,11 +671,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): """ Close the syncer and syncdb orderly """ - # stop the sync loop for deferred encryption - if self._sync_loop is not None: - self._sync_loop.reset() - self._sync_loop.stop() - self._sync_loop = None # close all open syncers for url in self._syncers: _, syncer = self._syncers[url] diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index f8f74ce7..53172f31 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -16,16 +16,8 @@ # along with this program. If not, see . """ Soledad synchronization utilities. - -Extend u1db Synchronizer with the ability to: - - * Postpone the update of the known replica uid until all the decryption of - the incoming messages has been processed. - - * Be interrupted and recovered. """ import logging -from threading import Lock from twisted.internet import defer @@ -48,17 +40,8 @@ class SoledadSynchronizer(Synchronizer): Also modified to allow for interrupting the synchronization process. """ - # TODO can delegate the syncing to the api object, living in the reactor - # thread, and use a simple flag. - syncing_lock = Lock() - - def stop(self): - """ - Stop the current sync in progress. - """ - self.sync_target.stop() - - def sync(self, autocreate=False, defer_decryption=True): + @defer.inlineCallbacks + def sync(self, defer_decryption=True): """ Synchronize documents between source and target. @@ -70,49 +53,22 @@ class SoledadSynchronizer(Synchronizer): This is done to allow the ongoing parallel decryption of the incoming docs to proceed without `InvalidGeneration` conflicts. - :param autocreate: Whether the target replica should be created or not. - :type autocreate: bool :param defer_decryption: Whether to defer the decryption process using the intermediate database. If False, decryption will be done inline. :type defer_decryption: bool - """ - self.syncing_lock.acquire() - try: - return self._sync(autocreate=autocreate, - defer_decryption=defer_decryption) - except Exception: - # we want this exception to reach either SQLCipherU1DBSync.sync or - # the Solead api object itself, so it is poperly handled and/or - # logged... - raise - finally: - # ... but we also want to release the syncing lock so this - # Synchronizer may be reused later. - self.release_syncing_lock() - @defer.inlineCallbacks - def _sync(self, autocreate=False, defer_decryption=True): - """ - Helper function, called from the main `sync` method. - See `sync` docstring. + :return: A deferred which will fire after the sync has finished. + :rtype: twisted.internet.defer.Deferred """ sync_target = self.sync_target # get target identifier, its current generation, # and its last-seen database generation for this source ensure_callback = None - try: - (self.target_replica_uid, target_gen, target_trans_id, - target_my_gen, target_my_trans_id) = yield \ - sync_target.get_sync_info(self.source._replica_uid) - except errors.DatabaseDoesNotExist: - if not autocreate: - raise - # will try to ask sync_exchange() to create the db - self.target_replica_uid = None - target_gen, target_trans_id = (0, '') - target_my_gen, target_my_trans_id = (0, '') + (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id) = yield \ + sync_target.get_sync_info(self.source._replica_uid) logger.debug( "Soledad target sync info:\n" @@ -176,9 +132,6 @@ class SoledadSynchronizer(Synchronizer): # exchange documents and try to insert the returned ones with # the target, return target synced-up-to gen. - # - # The sync_exchange method may be interrupted, in which case it will - # return a tuple of Nones. new_gen, new_trans_id = yield sync_target.sync_exchange( docs_by_generation, self.source._replica_uid, target_last_known_gen, target_last_known_trans_id, @@ -239,38 +192,3 @@ class SoledadSynchronizer(Synchronizer): return self.sync_target.record_sync_info( self.source._replica_uid, cur_gen, trans_id) return defer.succeed(None) - - @property - def syncing(self): - """ - Return True if a sync is ongoing, False otherwise. - :rtype: bool - """ - # XXX FIXME we need some mechanism for timeout: should cleanup and - # release if something in the syncdb-decrypt goes wrong. we could keep - # track of the release date and cleanup unrealistic sync entries after - # some time. - - # TODO use cancellable deferreds instead - locked = self.syncing_lock.locked() - return locked - - def release_syncing_lock(self): - """ - Release syncing lock if it's locked. - """ - if self.syncing_lock.locked(): - self.syncing_lock.release() - - def close(self): - """ - Close sync target pool of workers. - """ - self.release_syncing_lock() - self.sync_target.close() - - def __del__(self): - """ - Cleanup: release lock. - """ - self.release_syncing_lock() -- cgit v1.2.3 From 0c23b1c767d98b5a63bb4b94d56b1fe69ce71c43 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 19 May 2015 18:53:02 -0300 Subject: [bug] ensure sync failures are not ignored --- client/src/leap/soledad/client/api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 7b45dd7f..cd06fba1 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -666,6 +666,7 @@ class Soledad(object): failure.printDetailedTraceback(file=s) msg = "Soledad exception when syncing!\n" + s.getvalue() logger.error(msg) + return failure d.addCallbacks(_sync_callback, _sync_errback) return d -- cgit v1.2.3 From ec55459fa697f5d8676e16e5fee8a0c0f75c8c2c Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 20 May 2015 10:57:25 -0300 Subject: [bug] wrap unauth errors as invalid token errors --- client/src/leap/soledad/client/api.py | 3 -- client/src/leap/soledad/client/http_target.py | 40 ++++++++++++++++++--------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index cd06fba1..ffd95f6c 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -658,9 +658,6 @@ class Soledad(object): soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid) return local_gen - # prevent sync failures from crashing the app by adding an errback - # that logs the failure and does not propagate it down the callback - # chain def _sync_errback(failure): s = StringIO() failure.printDetailedTraceback(file=s) diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index 041180e6..5f18e4a9 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -34,12 +34,14 @@ from collections import defaultdict from twisted.internet import defer from twisted.internet import reactor from twisted.web.client import getPage +from twisted.web.error import Error from u1db import errors from u1db import SyncTarget from u1db.remote import utils from leap.soledad.common.document import SoledadDocument +from leap.soledad.common.errors import InvalidAuthTokenError from leap.soledad.client.crypto import is_symmetrically_encrypted from leap.soledad.client.crypto import encrypt_doc @@ -53,6 +55,19 @@ from leap.soledad.client.encdecpool import SyncDecrypterPool logger = logging.getLogger(__name__) +def _unauth_to_invalid_token_error(failure): + failure.trap(Error) + if failure.getErrorMessage() == "401 Unauthorized": + raise InvalidAuthTokenError + return failure + + +def getSoledadPage(*args, **kwargs): + d = getPage(*args, **kwargs) + d.addErrback(_unauth_to_invalid_token_error) + return d + + class SoledadHTTPSyncTarget(SyncTarget): """ A SyncTarget that encrypts data before sending and decrypts data after @@ -69,7 +84,7 @@ class SoledadHTTPSyncTarget(SyncTarget): _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) def __init__(self, url, source_replica_uid, creds, crypto, - sync_db=None, sync_enc_pool=None): + sync_db=None, sync_enc_pool=None): """ Initialize the sync target. @@ -124,7 +139,7 @@ class SoledadHTTPSyncTarget(SyncTarget): # # SyncTarget API - # + # @defer.inlineCallbacks def get_sync_info(self, source_replica_uid): @@ -144,7 +159,7 @@ class SoledadHTTPSyncTarget(SyncTarget): source_replica_last_known_transaction_id) :rtype: twisted.internet.defer.Deferred """ - raw = yield getPage(self._url, headers=self._auth_header) + raw = yield getSoledadPage(self._url, headers=self._auth_header) res = json.loads(raw) defer.returnValue([ res['target_replica_uid'], @@ -189,7 +204,7 @@ class SoledadHTTPSyncTarget(SyncTarget): }) headers = self._auth_header.copy() headers.update({'content-type': 'application/json'}) - return getPage( + return getSoledadPage( self._url, method='PUT', headers=headers, @@ -281,7 +296,7 @@ class SoledadHTTPSyncTarget(SyncTarget): @defer.inlineCallbacks def _send_docs(self, docs_by_generation, last_known_generation, - last_known_trans_id, sync_id): + last_known_trans_id, sync_id): if not docs_by_generation: defer.returnValue([None, None]) @@ -307,8 +322,8 @@ class SoledadHTTPSyncTarget(SyncTarget): self._sync_enc_pool.delete_encrypted_doc( doc.doc_id, doc.rev) signal(SOLEDAD_SYNC_SEND_STATUS, - "Soledad sync send status: %d/%d" - % (idx, total)) + "Soledad sync send status: %d/%d" + % (idx, total)) response_dict = json.loads(result)[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] @@ -316,7 +331,7 @@ class SoledadHTTPSyncTarget(SyncTarget): @defer.inlineCallbacks def _send_one_doc(self, headers, first_entries, doc, gen, trans_id, - number_of_docs, doc_idx): + number_of_docs, doc_idx): entries = first_entries[:] # add the document to the request content = yield self._encrypt_doc(doc) @@ -327,7 +342,7 @@ class SoledadHTTPSyncTarget(SyncTarget): doc_idx=doc_idx) entries.append('\r\n]') data = ''.join(entries) - result = yield getPage( + result = yield getSoledadPage( self._url, method='POST', headers=headers, @@ -354,7 +369,7 @@ class SoledadHTTPSyncTarget(SyncTarget): d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) d.addCallback(_maybe_encrypt_doc_inline) return d - + # # methods to receive doc # @@ -438,7 +453,7 @@ class SoledadHTTPSyncTarget(SyncTarget): defer.returnValue([new_generation, new_transaction_id]) def _receive_one_doc(self, headers, last_known_generation, - last_known_trans_id, sync_id, received): + last_known_trans_id, sync_id, received): entries = ['['] # add remote replica metadata to the request self._prepare( @@ -452,7 +467,7 @@ class SoledadHTTPSyncTarget(SyncTarget): ',', entries, received=received) entries.append('\r\n]') # send headers - return getPage( + return getSoledadPage( self._url, method='POST', headers=headers, @@ -473,7 +488,6 @@ class SoledadHTTPSyncTarget(SyncTarget): rev, content, gen, trans_id = \ self._parse_received_doc_response(response) if doc_id is not None: - print doc_id # decrypt incoming document and insert into local database # ------------------------------------------------------------- # symmetric decryption of document's contents -- cgit v1.2.3 From d59ac3b5ce713787cd7a46e181f2381de3a8fde2 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 20 May 2015 10:58:16 -0300 Subject: [feature] ensure reactor stops on client db script --- client/src/leap/soledad/client/encdecpool.py | 9 +++++---- client/src/leap/soledad/client/sqlcipher.py | 1 - scripts/db_access/client_side_db.py | 30 ++++++++++++++++------------ 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 7c21c30e..02eeb590 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -766,11 +766,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # wait until we know how many documents we need to process while self._docs_to_process is None: time.sleep(self.DECRYPT_LOOP_PERIOD) - # because all database operations are asynchronous, we use an event to - # make sure we don't start the next loop before the current one has - # finished. + # because all database operations are asynchronous, we use an + # event to make sure we don't start the next loop before the + # current one has finished. event = threading.Event() - # loop until we have processes as many docs as the number of changes + # loop until we have processes as many docs as the number of + # changes while self._processed_docs < self._docs_to_process: if sameProxiedObjects( self._insert_doc_cb.get(self.source_replica_uid), diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 7fde9a7c..96732325 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -653,7 +653,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # Symmetric encryption of syncing docs # - def get_generation(self): # FIXME # XXX this SHOULD BE a callback diff --git a/scripts/db_access/client_side_db.py b/scripts/db_access/client_side_db.py index 5dd2bd95..1d8d32e2 100644 --- a/scripts/db_access/client_side_db.py +++ b/scripts/db_access/client_side_db.py @@ -214,19 +214,23 @@ def _get_all_docs(soledad): @inlineCallbacks def _main(soledad, km, args): - if args.create_doc: - yield soledad.create_doc({'content': args.create_doc}) - if args.sync: - yield soledad.sync() - if args.get_all_docs: - yield _get_all_docs(soledad) - if args.export_private_key: - yield _export_key(args, km, args.export_private_key, private=True) - if args.export_public_key: - yield _export_key(args, km, args.expoert_public_key, private=False) - if args.export_incoming_messages: - yield _export_incoming_messages(soledad, args.export_incoming_messages) - reactor.stop() + try: + if args.create_doc: + yield soledad.create_doc({'content': args.create_doc}) + if args.sync: + yield soledad.sync() + if args.get_all_docs: + yield _get_all_docs(soledad) + if args.export_private_key: + yield _export_key(args, km, args.export_private_key, private=True) + if args.export_public_key: + yield _export_key(args, km, args.expoert_public_key, private=False) + if args.export_incoming_messages: + yield _export_incoming_messages(soledad, args.export_incoming_messages) + except: + pass + finally: + reactor.stop() if __name__ == '__main__': -- cgit v1.2.3 From ce161f9623a1dea6eda9fc2350c60073dbcdce06 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 20 May 2015 17:28:27 -0300 Subject: [bug] ensure async decryption failures are logged We have to make sure any failures in asynchronous decryption code is grabbed and properly transmitted up the deferred chain so it can be logged. This commit adds errbacks in the decryption pool that grab any failure and a check on the http target the failure if that is the case. --- client/src/leap/soledad/client/encdecpool.py | 89 +++++++++++++++++---------- client/src/leap/soledad/client/http_target.py | 51 +++++++++++---- 2 files changed, 97 insertions(+), 43 deletions(-) diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 02eeb590..d9f3d28c 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -32,6 +32,7 @@ from zope.proxy import sameProxiedObjects from twisted.internet import defer from twisted.internet.threads import deferToThread +from twisted.python.failure import Failure from leap.soledad.common.document import SoledadDocument from leap.soledad.common import soledad_assert @@ -390,7 +391,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._processed_docs = 0 self._async_results = [] - self._exception = None + self._failure = None self._finished = threading.Event() # clear the database before starting the sync @@ -399,10 +400,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): d.addCallback(lambda _: self._empty_db.set()) # start the decryption loop + def _maybe_store_failure_and_finish(result): + if isinstance(result, Failure): + self._set_failure(result) + self._finished.set() + logger.debug("Finished decrypter thread.") + self._deferred_loop = deferToThread( self._decrypt_and_process_docs_loop) - self._deferred_loop.addCallback( - lambda _: logger.debug("Finished decrypter thread.")) + self._deferred_loop.addBoth( + _maybe_store_failure_and_finish) + + @property + def failure(self): + return self._failure + + def _set_failure(self, failure): + self._failure = failure + + def succeeded(self): + return self._failure is None def set_docs_to_process(self, docs_to_process): """ @@ -760,35 +777,43 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): This method runs in its own thread, so sleeping will not interfere with the main thread. """ - try: - # wait for database to be emptied - self._empty_db.wait() - # wait until we know how many documents we need to process - while self._docs_to_process is None: - time.sleep(self.DECRYPT_LOOP_PERIOD) - # because all database operations are asynchronous, we use an - # event to make sure we don't start the next loop before the - # current one has finished. - event = threading.Event() - # loop until we have processes as many docs as the number of - # changes - while self._processed_docs < self._docs_to_process: - if sameProxiedObjects( - self._insert_doc_cb.get(self.source_replica_uid), - None): - continue - event.clear() - d = self._decrypt_received_docs() - d.addCallback(lambda _: self._raise_if_async_fails()) - d.addCallback(lambda _: self._process_decrypted()) - d.addCallback(self._delete_processed_docs) - d.addCallback(lambda _: event.set()) - event.wait() - # sleep a bit to give time for some decryption work - time.sleep(self.DECRYPT_LOOP_PERIOD) - except Exception as e: - self._exception = e - self._finished.set() + # wait for database to be emptied + self._empty_db.wait() + + # wait until we know how many documents we need to process + while self._docs_to_process is None: + time.sleep(self.DECRYPT_LOOP_PERIOD) + + # because all database operations are asynchronous, we use an + # event to make sure we don't start the next loop before the + # current one has finished. + event = threading.Event() + + # loop until we have processes as many docs as the number of + # changes + while self._processed_docs < self._docs_to_process: + + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + continue + + event.clear() + + d = self._decrypt_received_docs() + d.addCallback(lambda _: self._raise_if_async_fails()) + d.addCallback(lambda _: self._process_decrypted()) + d.addCallback(lambda r: self._delete_processed_docs(r)) + d.addErrback(self._set_failure) # grab failure and continue + d.addCallback(lambda _: event.set()) + + event.wait() + + if not self.succeeded(): + break + + # sleep a bit to give time for some decryption work + time.sleep(self.DECRYPT_LOOP_PERIOD) def has_finished(self): return self._finished.is_set() diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index 5f18e4a9..bf397cfe 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -396,7 +396,14 @@ class SoledadHTTPSyncTarget(SyncTarget): headers = self._auth_header.copy() headers.update({'content-type': 'application/x-soledad-sync-get'}) - # maybe get one doc + #--------------------------------------------------------------------- + # maybe receive the first document + #--------------------------------------------------------------------- + + # we fetch the first document before fetching the rest because we need + # to know the total number of documents to be received, and this + # information comes as metadata to each request. + d = self._receive_one_doc( headers, last_known_generation, last_known_trans_id, sync_id, 0) @@ -406,28 +413,48 @@ class SoledadHTTPSyncTarget(SyncTarget): if defer_decryption: self._sync_decr_pool.set_docs_to_process( number_of_changes) - idx = 1 - # maybe get more documents + #--------------------------------------------------------------------- + # maybe receive the rest of the documents + #--------------------------------------------------------------------- + + # launch many asynchronous fetches and inserts of received documents + # in the temporary sync db. Will wait for all results before + # continuing. + + received = 1 deferreds = [] - while idx < number_of_changes: + while received < number_of_changes: d = self._receive_one_doc( headers, last_known_generation, - last_known_trans_id, sync_id, idx) + last_known_trans_id, sync_id, received) d.addCallback( partial( self._insert_received_doc, - idx + 1, + received + 1, # the index of the current received doc number_of_changes)) deferreds.append(d) - idx += 1 + received += 1 results = yield defer.gatherResults(deferreds) - # get genration and transaction id of target after insertions + # get generation and transaction id of target after insertions if deferreds: _, new_generation, new_transaction_id = results.pop() - # get current target gen and trans id in case no documents were + #--------------------------------------------------------------------- + # wait for async decryption to finish + #--------------------------------------------------------------------- + + # below we do a trick so we can wait for the SyncDecrypterPool to + # finish its work before finally returning the new generation and + # transaction id of the remote replica. To achieve that, we create a + # Deferred that will return the results of the sync and, if we are + # decrypting asynchronously, we use reactor.callLater() to + # periodically poll the decrypter and check if it has finished its + # work. When it has finished, we either call the callback or errback + # of that deferred. In case we are not asynchronously decrypting, we + # just fire the deferred. + def _shutdown_and_finish(res): self._sync_decr_pool.close() return new_generation, new_transaction_id @@ -441,9 +468,11 @@ class SoledadHTTPSyncTarget(SyncTarget): SyncDecrypterPool.DECRYPT_LOOP_PERIOD, _wait_or_finish) else: - d.callback(None) + if self._sync_decr_pool.succeeded(): + d.callback(None) + else: + d.errback(self._sync_decr_pool.failure) - # decrypt docs in case of deferred decryption if defer_decryption: _wait_or_finish() else: -- cgit v1.2.3 From 33fa691e1df4d64d10313d5d192b3c064aafadb7 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 20 May 2015 18:19:28 -0300 Subject: [refactor] remove unneeded proxy for insert_doc_cb When we initialized the async decrypter pool in the target's init method we needed a proxy to ensure we could update the insert doc callback with the correct method later on. Now we initialize the decrypter only when we need it, so we don't need this proxy anymore. This commit removes the unneeded proxy. --- client/src/leap/soledad/client/encdecpool.py | 22 ++----------------- client/src/leap/soledad/client/http_target.py | 31 ++++++++++----------------- 2 files changed, 13 insertions(+), 40 deletions(-) diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index d9f3d28c..2f58d06c 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -28,8 +28,6 @@ import time import json import logging -from zope.proxy import sameProxiedObjects - from twisted.internet import defer from twisted.internet.threads import deferToThread from twisted.python.failure import Failure @@ -535,16 +533,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): decrypted and inserted in the sync db. :rtype: twisted.internet.defer.Deferred """ - # insert_doc_cb is a proxy object that gets updated with the right - # insert function only when the sync_target invokes the sync_exchange - # method. so, if we don't still have a non-empty callback, we refuse - # to proceed. - if sameProxiedObjects( - self._insert_doc_cb.get(self.source_replica_uid), - None): - logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") - return - soledad_assert(self._crypto is not None, "need a crypto object") content = json.loads(content) @@ -713,7 +701,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): gen, trans_id, idx): """ Insert the decrypted document into the local sqlcipher database. - Makes use of the passed callback `return_doc_cb` passed to the caller + Makes use of the passed callback `insert_doc_cb` passed to the caller by u1db sync. :param doc_id: The document id. @@ -730,7 +718,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type trans_id: str """ # could pass source_replica in params for callback chain - insert_fun = self._insert_doc_cb[self.source_replica_uid] logger.debug("Sync decrypter pool: inserting doc in local db: " "%s:%s %s" % (doc_id, doc_rev, gen)) @@ -739,7 +726,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): content = None doc = SoledadDocument(doc_id, doc_rev, content) gen = int(gen) - insert_fun(doc, gen, trans_id) + self._insert_doc_cb(doc, gen, trans_id) # store info about processed docs self._last_inserted_idx = idx @@ -793,11 +780,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # changes while self._processed_docs < self._docs_to_process: - if sameProxiedObjects( - self._insert_doc_cb.get(self.source_replica_uid), - None): - continue - event.clear() d = self._decrypt_received_docs() diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index bf397cfe..bf563b34 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -25,11 +25,8 @@ import json import base64 import logging -from zope.proxy import setProxiedObject -from zope.proxy import ProxyBase from uuid import uuid4 from functools import partial -from collections import defaultdict from twisted.internet import defer from twisted.internet import reactor @@ -79,10 +76,6 @@ class SoledadHTTPSyncTarget(SyncTarget): written to the main database. """ - # will later keep a reference to the insert-doc callback - # passed to sync_exchange - _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) - def __init__(self, url, source_replica_uid, creds, crypto, sync_db=None, sync_enc_pool=None): """ @@ -116,6 +109,7 @@ class SoledadHTTPSyncTarget(SyncTarget): self._crypto = crypto self._sync_db = sync_db self._sync_enc_pool = sync_enc_pool + self._insert_doc_cb = None # asynchronous encryption/decryption attributes self._decryption_callback = None self._sync_decr_pool = None @@ -213,7 +207,7 @@ class SoledadHTTPSyncTarget(SyncTarget): @defer.inlineCallbacks def sync_exchange(self, docs_by_generation, source_replica_uid, last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback=None, + insert_doc_cb, ensure_callback=None, defer_decryption=True, sync_id=None): """ Find out which documents the remote database does not know about, @@ -235,11 +229,11 @@ class SoledadHTTPSyncTarget(SyncTarget): :param last_known_trans_id: Target's last known transaction id. :type last_known_trans_id: str - :param return_doc_cb: A callback for inserting received documents from + :param insert_doc_cb: A callback for inserting received documents from target. If not overriden, this will call u1db insert_doc_from_target in synchronizer, which implements the TAKE OTHER semantics. - :type return_doc_cb: function + :type insert_doc_cb: function :param ensure_callback: A callback that ensures we know the target replica uid if the target replica was just @@ -262,9 +256,8 @@ class SoledadHTTPSyncTarget(SyncTarget): sync_id = str(uuid4()) self.source_replica_uid = source_replica_uid - # let the decrypter pool access the passed callback to insert docs - setProxiedObject(self._insert_doc_cb[source_replica_uid], - return_doc_cb) + # save a reference to the callback so we can use it after decrypting + self._insert_doc_cb = insert_doc_cb gen_after_send, trans_id_after_send = yield self._send_docs( docs_by_generation, @@ -274,7 +267,7 @@ class SoledadHTTPSyncTarget(SyncTarget): cur_target_gen, cur_target_trans_id = yield self._receive_docs( last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback, sync_id, + ensure_callback, sync_id, defer_decryption=defer_decryption) # update gen and trans id info in case we just sent and did not @@ -376,10 +369,8 @@ class SoledadHTTPSyncTarget(SyncTarget): @defer.inlineCallbacks def _receive_docs(self, last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback, sync_id, - defer_decryption): - # we keep a reference to the callback in case we defer the decryption - self._return_doc_cb = return_doc_cb + ensure_callback, sync_id, defer_decryption): + self._queue_for_decrypt = defer_decryption \ and self._sync_db is not None @@ -534,7 +525,7 @@ class SoledadHTTPSyncTarget(SyncTarget): else: # defer_decryption is False or no-sync-db fallback doc.set_json(decrypt_doc(self._crypto, doc)) - self._return_doc_cb(doc, gen, trans_id) + self._insert_doc_cb(doc, gen, trans_id) else: # not symmetrically encrypted doc, insert it directly # or save it in the decrypted stage. @@ -543,7 +534,7 @@ class SoledadHTTPSyncTarget(SyncTarget): doc.doc_id, doc.rev, doc.content, gen, trans_id, idx) else: - self._return_doc_cb(doc, gen, trans_id) + self._insert_doc_cb(doc, gen, trans_id) # ------------------------------------------------------------- # end of symmetric decryption # ------------------------------------------------------------- -- cgit v1.2.3 From 478dd0eba5129e2e68c85b7b93561bf9f9de2f19 Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 22 May 2015 16:59:58 -0300 Subject: [refactor] remove inline enc/dec from client pool The whole idea of the encrypter/decrypter pool is to be able to use multiple cores to allow parallel encryption/decryption. Previous to this commit, the encryptor/decryptor pools could be configured to not use workers and instead do encryption/decryption inline. That was meant for testing purposes and defeated the purpose of the pools. This commit removes the possibility of inline encrypting/decrypting when using the pools. It also refactors the enc/dec pool code so any failures while using the pool are correctly grabbed and raised to the top of the sync deferred chain. --- client/src/leap/soledad/client/encdecpool.py | 220 ++++++++++---------------- client/src/leap/soledad/client/http_target.py | 5 +- 2 files changed, 84 insertions(+), 141 deletions(-) diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 2f58d06c..c0a05d38 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -23,14 +23,12 @@ during synchronization. import multiprocessing -import threading -import time import json import logging +from twisted.internet import reactor from twisted.internet import defer from twisted.internet.threads import deferToThread -from twisted.python.failure import Failure from leap.soledad.common.document import SoledadDocument from leap.soledad.common import soledad_assert @@ -50,6 +48,8 @@ class SyncEncryptDecryptPool(object): """ Base class for encrypter/decrypter pools. """ + + # TODO implement throttling to reduce cpu usage?? WORKERS = multiprocessing.cpu_count() def __init__(self, crypto, sync_db): @@ -62,9 +62,9 @@ class SyncEncryptDecryptPool(object): :param sync_db: A database connection handle :type sync_db: pysqlcipher.dbapi2.Connection """ - self._pool = multiprocessing.Pool(self.WORKERS) self._crypto = crypto self._sync_db = sync_db + self._pool = multiprocessing.Pool(self.WORKERS) def close(self): """ @@ -143,8 +143,6 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): Pool of workers that spawn subprocesses to execute the symmetric encryption of documents to be synced. """ - # TODO implement throttling to reduce cpu usage?? - WORKERS = multiprocessing.cpu_count() TABLE_NAME = "docs_tosync" FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" @@ -191,7 +189,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): except multiprocessing.Queue.Empty: pass - def _encrypt_doc(self, doc, workers=True): + def _encrypt_doc(self, doc): """ Symmetrically encrypt a document. @@ -207,19 +205,10 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): key = self._crypto.doc_passphrase(doc.doc_id) secret = self._crypto.secret args = doc.doc_id, doc.rev, docstr, key, secret - - if workers: - # encrypt asynchronously - self._pool.apply_async( - encrypt_doc_task, args, - callback=self._encrypt_doc_cb) - else: - # encrypt inline - try: - res = encrypt_doc_task(*args) - self._encrypt_doc_cb(res) - except Exception as exc: - logger.exception(exc) + # encrypt asynchronously + self._pool.apply_async( + encrypt_doc_task, args, + callback=self._encrypt_doc_cb) def _encrypt_doc_cb(self, result): """ @@ -390,24 +379,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._async_results = [] self._failure = None - self._finished = threading.Event() + self._finished = False - # clear the database before starting the sync - self._empty_db = threading.Event() - d = self._empty() - d.addCallback(lambda _: self._empty_db.set()) + # XXX we want to empty the database before starting, but this is an + # asynchronous call, so we have to somehow make sure that it is + # executed before any other call to the database, without + # blocking. + self._empty() - # start the decryption loop - def _maybe_store_failure_and_finish(result): - if isinstance(result, Failure): - self._set_failure(result) - self._finished.set() - logger.debug("Finished decrypter thread.") + def _launch_decrypt_and_process(self): + d = self._decrypt_and_process_docs() + d.addErrback(lambda f: self._set_failure(f)) - self._deferred_loop = deferToThread( - self._decrypt_and_process_docs_loop) - self._deferred_loop.addBoth( - _maybe_store_failure_and_finish) + def _schedule_decrypt_and_process(self): + reactor.callLater( + self.DECRYPT_LOOP_PERIOD, + self._launch_decrypt_and_process) @property def failure(self): @@ -415,11 +402,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): def _set_failure(self, failure): self._failure = failure + self._finished = True - def succeeded(self): - return self._failure is None + def failed(self): + return bool(self._failure) - def set_docs_to_process(self, docs_to_process): + def start(self, docs_to_process): """ Set the number of documents we expect to process. @@ -430,6 +418,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type docs_to_process: int """ self._docs_to_process = docs_to_process + self._schedule_decrypt_and_process() def insert_encrypted_received_doc( self, doc_id, doc_rev, content, gen, trans_id, idx): @@ -506,10 +495,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): % self.TABLE_NAME return self._runOperation(query, (doc_id,)) - def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx, - workers=True): + def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx): """ - Symmetrically decrypt a document and store in the sync db. + Dispatch an asynchronous document decrypting routine and save the + result object. :param doc_id: The ID for the document with contents to be encrypted. :type doc: str @@ -525,9 +514,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type trans_id: str :param idx: The index of this document in the current sync process. :type idx: int - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool :return: A deferred that will fire after the document hasa been decrypted and inserted in the sync db. @@ -539,35 +525,15 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): key = self._crypto.doc_passphrase(doc_id) secret = self._crypto.secret args = doc_id, rev, content, gen, trans_id, key, secret, idx - - if workers: - # when using multiprocessing, we need to wait for all parallel - # processing to finish before continuing with the - # decrypt-and-process loop. We do this by using an extra deferred - # that will be fired by the multiprocessing callback when it has - # finished processing. - d1 = defer.Deferred() - - def _multiprocessing_callback(result): - d2 = self._decrypt_doc_cb(result) - d2.addCallback(lambda defres: d1.callback(defres)) - - # save the async result object so we can inspect it for failures - self._async_results.append( - self._pool.apply_async( - decrypt_doc_task, args, - callback=_multiprocessing_callback)) - - return d1 - else: - # decrypt inline - res = decrypt_doc_task(*args) - return self._decrypt_doc_cb(res) + # decrypt asynchronously + self._async_results.append( + self._pool.apply_async( + decrypt_doc_task, args)) def _decrypt_doc_cb(self, result): """ Store the decryption result in the sync db from where it will later be - picked by _process_decrypted. + picked by _process_decrypted_docs. :param result: A tuple containing the document's id, revision, content, generation, transaction id and sync index. @@ -636,7 +602,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): last_idx += 1 defer.returnValue(insertable) - def _decrypt_received_docs(self): + @defer.inlineCallbacks + def _async_decrypt_received_docs(self): """ Get all the encrypted documents from the sync database and dispatch a decrypt worker to decrypt each one of them. @@ -645,37 +612,25 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): decrypted and inserted back in the sync db. :rtype: twisted.internet.defer.Deferred """ + docs = yield self._get_docs(encrypted=True) + for doc_id, rev, content, gen, trans_id, _, idx in docs: + self._async_decrypt_doc( + doc_id, rev, content, gen, trans_id, idx) - def _callback(received_docs): - deferreds = [] - for doc_id, rev, content, gen, trans_id, _, idx in received_docs: - deferreds.append( - self._decrypt_doc( - doc_id, rev, content, gen, trans_id, idx)) - return defer.gatherResults(deferreds) - - d = self._get_docs(encrypted=True) - d.addCallback(_callback) - return d - - def _process_decrypted(self): + @defer.inlineCallbacks + def _process_decrypted_docs(self): """ Fetch as many decrypted documents as can be taken from the expected - order and insert them in the database. + order and insert them in the local replica. :return: A deferred that will fire with the list of inserted documents. :rtype: twisted.internet.defer.Deferred """ - - def _callback(insertable): - for doc_fields in insertable: - self._insert_decrypted_local_doc(*doc_fields) - return insertable - - d = self._get_insertable_docs() - d.addCallback(_callback) - return d + insertable = yield self._get_insertable_docs() + for doc_fields in insertable: + self._insert_decrypted_local_doc(*doc_fields) + defer.returnValue(insertable) def _delete_processed_docs(self, inserted): """ @@ -700,8 +655,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, gen, trans_id, idx): """ - Insert the decrypted document into the local sqlcipher database. - Makes use of the passed callback `insert_doc_cb` passed to the caller + Insert the decrypted document into the local replica. + + Make use of the passed callback `insert_doc_cb` passed to the caller by u1db sync. :param doc_id: The document id. @@ -743,59 +699,47 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) return self._runOperation(query) - def _raise_if_async_fails(self): + def _collect_async_decryption_results(self): """ - Raise any exception raised by a multiprocessing async decryption - call. + Collect the results of the asynchronous doc decryptions and re-raise + any exception raised by a multiprocessing async decryption call. :raise Exception: Raised if an async call has raised an exception. """ - for res in self._async_results: + async_results = self._async_results[:] + for res in async_results: if res.ready(): - if not res.successful(): - # re-raise the exception raised by the remote call - res.get() + self._decrypt_doc_cb(res.get()) # might raise an exception! + self._async_results.remove(res) - def _decrypt_and_process_docs_loop(self): + @defer.inlineCallbacks + def _decrypt_and_process_docs(self): """ Decrypt the documents received from remote replica and insert them into the local one. - This method runs in its own thread, so sleeping will not interfere - with the main thread. - """ - # wait for database to be emptied - self._empty_db.wait() - - # wait until we know how many documents we need to process - while self._docs_to_process is None: - time.sleep(self.DECRYPT_LOOP_PERIOD) - - # because all database operations are asynchronous, we use an - # event to make sure we don't start the next loop before the - # current one has finished. - event = threading.Event() - - # loop until we have processes as many docs as the number of - # changes - while self._processed_docs < self._docs_to_process: + This method implicitelly returns a defferred (see the decorator + above). It should only be called by _launch_decrypt_and_process(). + because this way any exceptions raised here will be stored by the + errback attached to the deferred returned. - event.clear() - - d = self._decrypt_received_docs() - d.addCallback(lambda _: self._raise_if_async_fails()) - d.addCallback(lambda _: self._process_decrypted()) - d.addCallback(lambda r: self._delete_processed_docs(r)) - d.addErrback(self._set_failure) # grab failure and continue - d.addCallback(lambda _: event.set()) - - event.wait() - - if not self.succeeded(): - break - - # sleep a bit to give time for some decryption work - time.sleep(self.DECRYPT_LOOP_PERIOD) + :return: A deferred which will fire after all decrypt, process and + delete operations have been executed. + :rtype: twisted.internet.defer.Deferred + """ + if not self.failed(): + if self._processed_docs < self._docs_to_process: + yield self._async_decrypt_received_docs() + yield self._collect_async_decryption_results() + docs = yield self._process_decrypted_docs() + yield self._delete_processed_docs(docs) + # recurse + self._schedule_decrypt_and_process() + else: + self._finished = True def has_finished(self): - return self._finished.is_set() + """ + Return whether the decrypter has finished its work. + """ + return self._finished diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index bf563b34..75af9cf7 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -402,8 +402,7 @@ class SoledadHTTPSyncTarget(SyncTarget): number_of_changes, ngen, ntrans = yield d if defer_decryption: - self._sync_decr_pool.set_docs_to_process( - number_of_changes) + self._sync_decr_pool.start(number_of_changes) #--------------------------------------------------------------------- # maybe receive the rest of the documents @@ -459,7 +458,7 @@ class SoledadHTTPSyncTarget(SyncTarget): SyncDecrypterPool.DECRYPT_LOOP_PERIOD, _wait_or_finish) else: - if self._sync_decr_pool.succeeded(): + if not self._sync_decr_pool.failed(): d.callback(None) else: d.errback(self._sync_decr_pool.failure) -- cgit v1.2.3 From 31757168f6ad243ec83ba52b2e022298ba08f8d1 Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 25 May 2015 11:46:24 -0300 Subject: [feature] add pool of http/https connections Instead of opening one TCP connection for each HTTP request, we want to reuse connections. Also, we need to be able to verify SSL certificates. This commit implements both features in the twisted http client sync. --- .../feature_add-pool-of-http-https-connections | 2 + client/src/leap/soledad/client/api.py | 4 +- client/src/leap/soledad/client/http_client.py | 194 +++++++++++++++++++++ client/src/leap/soledad/client/http_target.py | 53 +++--- client/src/leap/soledad/client/sqlcipher.py | 13 +- 5 files changed, 232 insertions(+), 34 deletions(-) create mode 100644 client/changes/feature_add-pool-of-http-https-connections create mode 100644 client/src/leap/soledad/client/http_client.py diff --git a/client/changes/feature_add-pool-of-http-https-connections b/client/changes/feature_add-pool-of-http-https-connections new file mode 100644 index 00000000..7ff2a4ee --- /dev/null +++ b/client/changes/feature_add-pool-of-http-https-connections @@ -0,0 +1,2 @@ + o Add a pool of HTTP/HTTPS connections that is able to verify the server + certificate against a given CA certificate. diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index ffd95f6c..91e0a4a0 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -272,7 +272,8 @@ class Soledad(object): replica_uid = self._dbpool.replica_uid self._dbsyncer = SQLCipherU1DBSync( self._sqlcipher_opts, self._crypto, replica_uid, - self._defer_encryption) + SOLEDAD_CERT, + defer_encryption=self._defer_encryption) # # Closing methods @@ -630,6 +631,7 @@ class Soledad(object): Whether to defer decryption of documents, or do it inline while syncing. :type defer_decryption: bool + :return: A deferred whose callback will be invoked with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred diff --git a/client/src/leap/soledad/client/http_client.py b/client/src/leap/soledad/client/http_client.py new file mode 100644 index 00000000..b08d199e --- /dev/null +++ b/client/src/leap/soledad/client/http_client.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +# http_client.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +Twisted HTTP/HTTPS client. +""" + +import os + +from zope.interface import implements + +from OpenSSL.crypto import load_certificate +from OpenSSL.crypto import FILETYPE_PEM + +from twisted.internet import reactor +from twisted.internet.ssl import ClientContextFactory +from twisted.internet.ssl import CertificateOptions +from twisted.internet.defer import succeed + +from twisted.web.client import Agent +from twisted.web.client import HTTPConnectionPool +from twisted.web.client import readBody +from twisted.web.http_headers import Headers +from twisted.web.error import Error +from twisted.web.iweb import IBodyProducer + + +from leap.soledad.common.errors import InvalidAuthTokenError + + +# +# Setup a pool of connections +# + +_pool = HTTPConnectionPool(reactor, persistent=True) +_pool.maxPersistentPerHost = 10 +_agent = None + +# if we ever want to trust the system's CAs, we should use an agent like this: +# from twisted.web.client import BrowserLikePolicyForHTTPS +# _agent = Agent(reactor, BrowserLikePolicyForHTTPS(), pool=_pool) + + +# +# SSL/TLS certificate configuration +# + +def configure_certificate(cert_file): + """ + Configure an agent that verifies server certificates against a CA cert + file. + + :param cert_file: The path to the certificate file. + :type cert_file: str + """ + global _agent + cert = _load_cert(cert_file) + _agent = Agent( + reactor, + SoledadClientContextFactory(cert), + pool=_pool) + + +def _load_cert(cert_file): + """ + Load a X509 certificate from a file. + + :param cert_file: The path to the certificate file. + :type cert_file: str + + :return: The X509 certificate. + :rtype: OpenSSL.crypto.X509 + """ + if os.path.exists(cert_file): + with open(cert_file) as f: + data = f.read() + return load_certificate(FILETYPE_PEM, data) + + +class SoledadClientContextFactory(ClientContextFactory): + """ + A context factory that will verify the server's certificate against a + given CA certificate. + """ + + def __init__(self, cacert): + """ + Initialize the context factory. + + :param cacert: The CA certificate. + :type cacert: OpenSSL.crypto.X509 + """ + self._cacert = cacert + + def getContext(self, hostname, port): + opts = CertificateOptions(verify=True, caCerts=[self._cacert]) + return opts.getContext() + + +# +# HTTP request facilities +# + +def _unauth_to_invalid_token_error(failure): + """ + An errback to translate unauthorized errors to our own invalid token + class. + + :param failure: The original failure. + :type failure: twisted.python.failure.Failure + + :return: Either the original failure or an invalid auth token error. + :rtype: twisted.python.failure.Failure + """ + failure.trap(Error) + if failure.getErrorMessage() == "401 Unauthorized": + raise InvalidAuthTokenError + return failure + + +class StringBodyProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + implements(IBodyProducer) + + def __init__(self, body): + """ + Initialize the string produer. + + :param body: The body of the request. + :type body: str + """ + self.body = body + self.length = len(body) + + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A successful deferred. + :rtype: twisted.internet.defer.Deferred + """ + consumer.write(self.body) + return succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass + + +def httpRequest(url, method='GET', body=None, headers={}): + """ + Perform an HTTP request. + + :param url: The URL for the request. + :type url: str + :param method: The HTTP method of the request. + :type method: str + :param body: The body of the request, if any. + :type body: str + :param headers: The headers of the request. + :type headers: dict + + :return: A deferred that fires with the body of the request. + :rtype: twisted.internet.defer.Deferred + """ + if body: + body = StringBodyProducer(body) + d = _agent.request( + method, url, headers=Headers(headers), bodyProducer=body) + d.addCallbacks(readBody, _unauth_to_invalid_token_error) + return d diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index 75af9cf7..dc6c0e0a 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# target.py +# http_target.py # Copyright (C) 2015 LEAP # # This program is free software: you can redistribute it and/or modify @@ -21,6 +21,7 @@ A U1DB backend for encrypting data before sending to server and decrypting after receiving. """ + import json import base64 import logging @@ -30,15 +31,12 @@ from functools import partial from twisted.internet import defer from twisted.internet import reactor -from twisted.web.client import getPage -from twisted.web.error import Error from u1db import errors from u1db import SyncTarget from u1db.remote import utils from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.errors import InvalidAuthTokenError from leap.soledad.client.crypto import is_symmetrically_encrypted from leap.soledad.client.crypto import encrypt_doc @@ -47,24 +45,13 @@ from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import signal from leap.soledad.client.encdecpool import SyncDecrypterPool +from leap.soledad.client.http_client import httpRequest +from leap.soledad.client.http_client import configure_certificate logger = logging.getLogger(__name__) -def _unauth_to_invalid_token_error(failure): - failure.trap(Error) - if failure.getErrorMessage() == "401 Unauthorized": - raise InvalidAuthTokenError - return failure - - -def getSoledadPage(*args, **kwargs): - d = getPage(*args, **kwargs) - d.addErrback(_unauth_to_invalid_token_error) - return d - - class SoledadHTTPSyncTarget(SyncTarget): """ A SyncTarget that encrypts data before sending and decrypts data after @@ -76,7 +63,7 @@ class SoledadHTTPSyncTarget(SyncTarget): written to the main database. """ - def __init__(self, url, source_replica_uid, creds, crypto, + def __init__(self, url, source_replica_uid, creds, crypto, cert_file, sync_db=None, sync_enc_pool=None): """ Initialize the sync target. @@ -93,12 +80,19 @@ class SoledadHTTPSyncTarget(SyncTarget): :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt document contents when syncing. :type crypto: soledad.crypto.SoledadCrypto + :param cert_file: Path to the certificate of the ca used to validate + the SSL certificate used by the remote soledad + server. + :type cert_file: str :param sync_db: Optional. handler for the db with the symmetric encryption of the syncing documents. If None, encryption will be done in-place, instead of retreiving it from the dedicated database. :type sync_db: Sqlite handler + :param verify_ssl: Whether we should perform SSL server certificate + verification. + :type verify_ssl: bool """ if url.endswith("/"): url = url[:-1] @@ -113,6 +107,7 @@ class SoledadHTTPSyncTarget(SyncTarget): # asynchronous encryption/decryption attributes self._decryption_callback = None self._sync_decr_pool = None + configure_certificate(cert_file) def set_creds(self, creds): """ @@ -125,7 +120,7 @@ class SoledadHTTPSyncTarget(SyncTarget): token = creds['token']['token'] auth = '%s:%s' % (uuid, token) b64_token = base64.b64encode(auth) - self._auth_header = {'Authorization': 'Token %s' % b64_token} + self._auth_header = {'Authorization': ['Token %s' % b64_token]} @property def _defer_encryption(self): @@ -153,7 +148,7 @@ class SoledadHTTPSyncTarget(SyncTarget): source_replica_last_known_transaction_id) :rtype: twisted.internet.defer.Deferred """ - raw = yield getSoledadPage(self._url, headers=self._auth_header) + raw = yield httpRequest(self._url, headers=self._auth_header) res = json.loads(raw) defer.returnValue([ res['target_replica_uid'], @@ -197,12 +192,12 @@ class SoledadHTTPSyncTarget(SyncTarget): 'transaction_id': source_replica_transaction_id }) headers = self._auth_header.copy() - headers.update({'content-type': 'application/json'}) - return getSoledadPage( + headers.update({'content-type': ['application/json']}) + return httpRequest( self._url, method='PUT', headers=headers, - postdata=data) + body=data) @defer.inlineCallbacks def sync_exchange(self, docs_by_generation, source_replica_uid, @@ -295,7 +290,7 @@ class SoledadHTTPSyncTarget(SyncTarget): defer.returnValue([None, None]) headers = self._auth_header.copy() - headers.update({'content-type': 'application/x-soledad-sync-put'}) + headers.update({'content-type': ['application/x-soledad-sync-put']}) # add remote replica metadata to the request first_entries = ['['] self._prepare( @@ -335,11 +330,11 @@ class SoledadHTTPSyncTarget(SyncTarget): doc_idx=doc_idx) entries.append('\r\n]') data = ''.join(entries) - result = yield getSoledadPage( + result = yield httpRequest( self._url, method='POST', headers=headers, - postdata=data) + body=data) defer.returnValue(result) def _encrypt_doc(self, doc): @@ -385,7 +380,7 @@ class SoledadHTTPSyncTarget(SyncTarget): self._setup_sync_decr_pool() headers = self._auth_header.copy() - headers.update({'content-type': 'application/x-soledad-sync-get'}) + headers.update({'content-type': ['application/x-soledad-sync-get']}) #--------------------------------------------------------------------- # maybe receive the first document @@ -486,11 +481,11 @@ class SoledadHTTPSyncTarget(SyncTarget): ',', entries, received=received) entries.append('\r\n]') # send headers - return getSoledadPage( + return httpRequest( self._url, method='POST', headers=headers, - postdata=''.join(entries)) + body=''.join(entries)) def _insert_received_doc(self, idx, total, response): """ diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 96732325..ed9e95dc 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -434,13 +434,14 @@ class SQLCipherU1DBSync(SQLCipherDatabase): """ syncing_lock = defaultdict(threading.Lock) - def __init__(self, opts, soledad_crypto, replica_uid, + def __init__(self, opts, soledad_crypto, replica_uid, cert_file, defer_encryption=False): self._opts = opts self._path = opts.path self._crypto = soledad_crypto self.__replica_uid = replica_uid + self._cert_file = cert_file self._sync_db_key = opts.sync_db_key self._sync_db = None @@ -570,9 +571,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :param url: The url of the target replica to sync with. :type url: str - :param creds: - optional dictionary giving credentials. - to authorize the operation with the server. + :param creds: optional dictionary giving credentials to authorize the + operation with the server. :type creds: dict :param defer_decryption: Whether to defer the decryption process using the intermediate @@ -599,6 +599,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase): one instance synchronizing the same database replica at the same time. Because of that, this method blocks until the syncing lock can be acquired. + + :param creds: optional dictionary giving credentials to authorize the + operation with the server. + :type creds: dict """ with self.syncing_lock[self._path]: syncer = self._get_syncer(url, creds=creds) @@ -640,6 +644,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._replica_uid, creds=creds, crypto=self._crypto, + cert_file=self._cert_file, sync_db=self._sync_db, sync_enc_pool=self._sync_enc_pool)) self._syncers[url] = (h, syncer) -- cgit v1.2.3 From 3e6e51649bb6206125f20ac6773f6744ec8bf175 Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 25 May 2015 13:47:57 -0300 Subject: [bug] remove client syncer call to close method --- client/src/leap/soledad/client/sqlcipher.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index ed9e95dc..8e7d39c2 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -677,9 +677,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase): """ # close all open syncers for url in self._syncers: - _, syncer = self._syncers[url] - syncer.close() - self._syncers = [] + del self._syncers[url] + # stop the encryption pool if self._sync_enc_pool is not None: self._sync_enc_pool.close() -- cgit v1.2.3