diff options
author | Kali Kaneko <kali@leap.se> | 2015-05-25 16:17:20 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2015-05-25 16:17:20 -0400 |
commit | 745ac11c78455e7487bef88da3c3ffeb4fe4351c (patch) | |
tree | cc14a46120a69921ba9eed6e5584012c23fc694f /client | |
parent | 340b0dcfbc0a819738a28f9c803fdbf848754897 (diff) | |
parent | 3e6e51649bb6206125f20ac6773f6744ec8bf175 (diff) |
Merge remote-tracking branch 'leapcode/pr/216' into develop
this branch includes many changes that improve the asynchronous
retrieval of the sync docs, and the parallel decryption of the encrypted
documents.
Diffstat (limited to 'client')
20 files changed, 1769 insertions, 2375 deletions
diff --git a/client/changes/bug_6757_fix-order-of-insertion-when-syncing b/client/changes/bug_6757_fix-order-of-insertion-when-syncing new file mode 100644 index 00000000..c0470f5a --- /dev/null +++ b/client/changes/bug_6757_fix-order-of-insertion-when-syncing @@ -0,0 +1,2 @@ + o Fix the order of insertion of documents when using workers for decrypting + incoming documents during a sync. Closes #6757. diff --git a/client/changes/bug_6892_fix-log-message-for-local-secret b/client/changes/bug_6892_fix-log-message-for-local-secret new file mode 100644 index 00000000..39c13257 --- /dev/null +++ b/client/changes/bug_6892_fix-log-message-for-local-secret @@ -0,0 +1,2 @@ + o Fix the log message when a local secret is not found so it's less + confusing. Closes #6892. diff --git a/client/changes/bug_always-initialize-the-sync-db b/client/changes/bug_always-initialize-the-sync-db new file mode 100644 index 00000000..2b12989a --- /dev/null +++ b/client/changes/bug_always-initialize-the-sync-db @@ -0,0 +1,2 @@ + o Always initialize the sync db to allow for both asynchronous encryption + and asynchronous decryption when syncing. diff --git a/client/changes/bug_fix-async-decrypt b/client/changes/bug_fix-async-decrypt new file mode 100644 index 00000000..eb0ce7b5 --- /dev/null +++ b/client/changes/bug_fix-async-decrypt @@ -0,0 +1,2 @@ + o Refactor asynchronous encryption/decryption code to its own file. + o Fix logging and graceful failing when exceptions are raised during sync. diff --git a/client/changes/bug_improve-log-when-fetching-documents b/client/changes/bug_improve-log-when-fetching-documents new file mode 100644 index 00000000..a67ce028 --- /dev/null +++ b/client/changes/bug_improve-log-when-fetching-documents @@ -0,0 +1 @@ + o Improve log messages when concurrently fetching documents from the server. diff --git a/client/changes/feature_add-pool-of-http-https-connections b/client/changes/feature_add-pool-of-http-https-connections new file mode 100644 index 00000000..7ff2a4ee --- /dev/null +++ b/client/changes/feature_add-pool-of-http-https-connections @@ -0,0 +1,2 @@ + o Add a pool of HTTP/HTTPS connections that is able to verify the server + certificate against a given CA certificate. diff --git a/client/changes/feature_use-twisted-adbapi-for-sync-db b/client/changes/feature_use-twisted-adbapi-for-sync-db new file mode 100644 index 00000000..41e5e6e3 --- /dev/null +++ b/client/changes/feature_use-twisted-adbapi-for-sync-db @@ -0,0 +1 @@ + o Use twisted.enterprise.adbapi for access to the sync database. diff --git a/client/changes/feature_use-twisted-web-for-client-sync b/client/changes/feature_use-twisted-web-for-client-sync new file mode 100644 index 00000000..b4d1d4a4 --- /dev/null +++ b/client/changes/feature_use-twisted-web-for-client-sync @@ -0,0 +1 @@ + o Use twisted.web.client for client sync. diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 7ad10db5..5b882bbe 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -32,6 +32,7 @@ from zope.proxy import ProxyBase, setProxiedObject from pysqlcipher.dbapi2 import OperationalError from leap.soledad.client import sqlcipher as soledad_sqlcipher +from leap.soledad.client.pragmas import set_init_pragmas logger = logging.getLogger(name=__name__) @@ -72,7 +73,7 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher"): :rtype: U1DBConnectionPool """ if openfun is None and driver == "pysqlcipher": - openfun = partial(soledad_sqlcipher.set_init_pragmas, opts=opts) + openfun = partial(set_init_pragmas, opts=opts) return U1DBConnectionPool( "%s.dbapi2" % driver, database=opts.path, check_same_thread=False, cp_openfun=openfun, diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 0f29503f..91e0a4a0 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -272,7 +272,8 @@ class Soledad(object): replica_uid = self._dbpool.replica_uid self._dbsyncer = SQLCipherU1DBSync( self._sqlcipher_opts, self._crypto, replica_uid, - self._defer_encryption) + SOLEDAD_CERT, + defer_encryption=self._defer_encryption) # # Closing methods @@ -630,6 +631,7 @@ class Soledad(object): Whether to defer decryption of documents, or do it inline while syncing. :type defer_decryption: bool + :return: A deferred whose callback will be invoked with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred @@ -650,7 +652,7 @@ class Soledad(object): sync_url = urlparse.urljoin(self._server_url, 'user-%s' % self.uuid) d = self._dbsyncer.sync( sync_url, - creds=self._creds, autocreate=False, + creds=self._creds, defer_decryption=defer_decryption) def _sync_callback(local_gen): @@ -658,21 +660,16 @@ class Soledad(object): soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid) return local_gen - # prevent sync failures from crashing the app by adding an errback - # that logs the failure and does not propagate it down the callback - # chain def _sync_errback(failure): s = StringIO() failure.printDetailedTraceback(file=s) msg = "Soledad exception when syncing!\n" + s.getvalue() logger.error(msg) + return failure d.addCallbacks(_sync_callback, _sync_errback) return d - def stop_sync(self): - self._dbsyncer.stop_sync() - @property def syncing(self): """ diff --git a/client/src/leap/soledad/client/auth.py b/client/src/leap/soledad/client/auth.py index 72ab0008..6dfabeb4 100644 --- a/client/src/leap/soledad/client/auth.py +++ b/client/src/leap/soledad/client/auth.py @@ -14,15 +14,13 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ Methods for token-based authentication. These methods have to be included in all classes that extend HTTPClient so they can do token-based auth requests to the Soledad server. """ - +import base64 from u1db import errors @@ -49,7 +47,7 @@ class TokenBasedAuth(object): Return an authorization header to be included in the HTTP request, in the form: - [('Authorization', 'Token <base64 encoded creds')] + [('Authorization', 'Token <(base64 encoded) uuid:token>')] :param method: The HTTP method. :type method: str @@ -64,7 +62,8 @@ class TokenBasedAuth(object): if 'token' in self._creds: uuid, token = self._creds['token'] auth = '%s:%s' % (uuid, token) - return [('Authorization', 'Token %s' % auth.encode('base64')[:-1])] + b64_token = base64.b64encode(auth) + return [('Authorization', 'Token %s' % b64_token)] else: raise errors.UnknownAuthMethod( 'Wrong credentials: %s' % self._creds) diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 107bf7f1..bdbaa8e0 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -23,17 +23,13 @@ import hmac import hashlib import json import logging -import multiprocessing -import threading from pycryptopp.cipher.aes import AES from pycryptopp.cipher.xsalsa20 import XSalsa20 -from zope.proxy import sameProxiedObjects from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type from leap.soledad.common import crypto -from leap.soledad.common.document import SoledadDocument logger = logging.getLogger(__name__) @@ -227,7 +223,7 @@ class SoledadCrypto(object): # def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv, - mac_method, secret): + mac_method, secret): """ Calculate a MAC for C{doc} using C{ciphertext}. @@ -378,7 +374,7 @@ def decrypt_doc(crypto, doc): def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, - enc_iv, mac_method, secret, doc_mac): + enc_iv, mac_method, secret, doc_mac): """ Verify that C{doc_mac} is a correct MAC for the given document. @@ -511,525 +507,3 @@ def is_symmetrically_encrypted(doc): == crypto.EncryptionSchemes.SYMKEY: return True return False - - -# -# Encrypt/decrypt pools of workers -# - -class SyncEncryptDecryptPool(object): - """ - Base class for encrypter/decrypter pools. - """ - WORKERS = multiprocessing.cpu_count() - - def __init__(self, crypto, sync_db, write_lock): - """ - Initialize the pool of encryption-workers. - - :param crypto: A SoledadCryto instance to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - - :param sync_db: A database connection handle - :type sync_db: pysqlcipher.dbapi2.Connection - - :param write_lock: a write lock for controlling concurrent access - to the sync_db - :type write_lock: threading.Lock - """ - self._pool = multiprocessing.Pool(self.WORKERS) - self._crypto = crypto - self._sync_db = sync_db - self._sync_db_write_lock = write_lock - - def close(self): - """ - Cleanly close the pool of workers. - """ - logger.debug("Closing %s" % (self.__class__.__name__,)) - self._pool.close() - try: - self._pool.join() - except Exception: - pass - - def terminate(self): - """ - Terminate the pool of workers. - """ - logger.debug("Terminating %s" % (self.__class__.__name__,)) - self._pool.terminate() - - -def encrypt_doc_task(doc_id, doc_rev, content, key, secret): - """ - Encrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad storage secret (used for MAC auth). - :type secret: str - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - encrypted_content = encrypt_docstr( - content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, encrypted_content - - -class SyncEncrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric encryption - of documents to be synced. - """ - # TODO implement throttling to reduce cpu usage?? - WORKERS = multiprocessing.cpu_count() - TABLE_NAME = "docs_tosync" - FIELD_NAMES = "doc_id, rev, content" - - def encrypt_doc(self, doc, workers=True): - """ - Symmetrically encrypt a document. - - :param doc: The document with contents to be encrypted. - :type doc: SoledadDocument - - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool - """ - soledad_assert(self._crypto is not None, "need a crypto object") - docstr = doc.get_json() - key = self._crypto.doc_passphrase(doc.doc_id) - secret = self._crypto.secret - args = doc.doc_id, doc.rev, docstr, key, secret - - try: - if workers: - res = self._pool.apply_async( - encrypt_doc_task, args, - callback=self.encrypt_doc_cb) - else: - # encrypt inline - res = encrypt_doc_task(*args) - self.encrypt_doc_cb(res) - - except Exception as exc: - logger.exception(exc) - - def encrypt_doc_cb(self, result): - """ - Insert results of encryption routine into the local sync database. - - :param result: A tuple containing the doc id, revision and encrypted - content. - :type result: tuple(str, str, str) - """ - doc_id, doc_rev, content = result - self.insert_encrypted_local_doc(doc_id, doc_rev, content) - - def insert_encrypted_local_doc(self, doc_id, doc_rev, content): - """ - Insert the contents of the encrypted doc into the local sync - database. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param content: The encrypted document. - :type content: str - """ - # FIXME --- callback should complete immediately since otherwise the - # thread which handles the results will get blocked - # Right now we're blocking the dispatcher with the writes to sqlite. - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,) - - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, )) - con.execute(sql_ins, (doc_id, doc_rev, content)) - - -def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): - """ - Decrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The encrypted content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification of - that document. - :type trans_id: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad storage secret (used for MAC auth). - :type secret: str - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - decrypted_content = decrypt_doc_dict( - content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, decrypted_content, gen, trans_id - - -class SyncDecrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric decryption - of documents that were received. - - The decryption of the received documents is done in two steps: - - 1. All the encrypted docs are collected, together with their generation - and transaction-id - 2. The docs are enqueued for decryption. When completed, they are - inserted following the generation order. - """ - # TODO implement throttling to reduce cpu usage?? - TABLE_NAME = "docs_received" - FIELD_NAMES = "doc_id, rev, content, gen, trans_id, encrypted" - - write_encrypted_lock = threading.Lock() - - def __init__(self, *args, **kwargs): - """ - Initialize the decrypter pool, and setup a dict for putting the - results of the decrypted docs until they are picked by the insert - routine that gets them in order. - - :param insert_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type insert_doc_cb: function - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - """ - self._insert_doc_cb = kwargs.pop("insert_doc_cb") - SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self.source_replica_uid = None - self._async_results = [] - - def set_source_replica_uid(self, source_replica_uid): - """ - Set the source replica uid for this decrypter pool instance. - - :param source_replica_uid: The uid of the source replica. - :type source_replica_uid: str - """ - self.source_replica_uid = source_replica_uid - - def insert_encrypted_received_doc(self, doc_id, doc_rev, content, - gen, trans_id): - """ - Insert a received message with encrypted content, to be decrypted later - on. - - :param doc_id: The Document ID. - :type doc_id: str - :param doc_rev: The Document Revision - :param doc_rev: str - :param content: the Content of the document - :type content: str - :param gen: the Document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - """ - docstr = json.dumps(content) - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( - self.TABLE_NAME,) - - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, )) - con.execute( - sql_ins, - (doc_id, doc_rev, docstr, gen, trans_id, 1)) - - def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): - """ - Insert a document that is not symmetrically encrypted. - We store it in the staging area (the decrypted_docs dictionary) to be - picked up in order as the preceding documents are decrypted. - - :param doc_id: The Document ID. - :type doc_id: str - :param doc_rev: The Document Revision - :param doc_rev: str - :param content: the Content of the document - :type content: str - :param gen: the Document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - """ - if not isinstance(content, str): - content = json.dumps(content) - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % ( - self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( - self.TABLE_NAME,) - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id,)) - con.execute( - sql_ins, - (doc_id, doc_rev, content, gen, trans_id, 0)) - - def delete_received_doc(self, doc_id, doc_rev): - """ - Delete a received doc after it was inserted into the local db. - - :param doc_id: Document ID. - :type doc_id: str - :param doc_rev: Document revision. - :type doc_rev: str - """ - sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( - self.TABLE_NAME,) - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, doc_rev)) - - def decrypt_doc(self, doc_id, rev, content, gen, trans_id, - source_replica_uid, workers=True): - """ - Symmetrically decrypt a document. - - :param doc_id: The ID for the document with contents to be encrypted. - :type doc: str - :param rev: The revision of the document. - :type rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - :param source_replica_uid: - :type source_replica_uid: str - - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool - """ - self.source_replica_uid = source_replica_uid - - # insert_doc_cb is a proxy object that gets updated with the right - # insert function only when the sync_target invokes the sync_exchange - # method. so, if we don't still have a non-empty callback, we refuse - # to proceed. - if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), - None): - logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") - return - - soledad_assert(self._crypto is not None, "need a crypto object") - - if len(content) == 0: - # not encrypted payload - return - - content = json.loads(content) - key = self._crypto.doc_passphrase(doc_id) - secret = self._crypto.secret - args = doc_id, rev, content, gen, trans_id, key, secret - - if workers: - # save the async result object so we can inspect it for failures - self._async_results.append(self._pool.apply_async( - decrypt_doc_task, args, - callback=self.decrypt_doc_cb)) - else: - # decrypt inline - res = decrypt_doc_task(*args) - self.decrypt_doc_cb(res) - - def decrypt_doc_cb(self, result): - """ - Store the decryption result in the sync db from where it will later be - picked by process_decrypted. - - :param result: A tuple containing the doc id, revision and encrypted - content. - :type result: tuple(str, str, str) - """ - doc_id, rev, content, gen, trans_id = result - logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" - % (doc_id, rev, gen, trans_id)) - self.insert_received_doc(doc_id, rev, content, gen, trans_id) - - def get_docs_by_generation(self, encrypted=None): - """ - Get all documents in the received table from the sync db, - ordered by generation. - - :param encrypted: If not None, only return documents with encrypted - field equal to given parameter. - :type encrypted: bool or None - - :return: list of doc_id, rev, generation, gen, trans_id - :rtype: list - """ - sql = "SELECT doc_id, rev, content, gen, trans_id, encrypted FROM %s" \ - % self.TABLE_NAME - if encrypted is not None: - sql += " WHERE encrypted = %d" % int(encrypted) - sql += " ORDER BY gen ASC" - return self._fetchall(sql) - - def get_insertable_docs_by_gen(self): - """ - Return a list of non-encrypted documents ready to be inserted. - """ - # here, we compare the list of all available docs with the list of - # decrypted docs and find the longest common prefix between these two - # lists. Note that the order of lists fetch matters: if instead we - # first fetch the list of decrypted docs and then the list of all - # docs, then some document might have been decrypted between these two - # calls, and if it is just the right doc then it might not be caught - # by the next loop. - all_docs = self.get_docs_by_generation() - decrypted_docs = self.get_docs_by_generation(encrypted=False) - insertable = [] - for doc_id, rev, _, gen, trans_id, encrypted in all_docs: - for next_doc_id, _, next_content, _, _, _ in decrypted_docs: - if doc_id == next_doc_id: - content = next_content - insertable.append((doc_id, rev, content, gen, trans_id)) - else: - break - return insertable - - def count_docs_in_sync_db(self, encrypted=None): - """ - Count how many documents we have in the table for received docs. - - :param encrypted: If not None, return count of documents with - encrypted field equal to given parameter. - :type encrypted: bool or None - - :return: The count of documents. - :rtype: int - """ - if self._sync_db is None: - logger.warning("cannot return count with null sync_db") - return - sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) - if encrypted is not None: - sql += " WHERE encrypted = %d" % int(encrypted) - res = self._fetchall(sql) - if res: - val = res.pop() - return val[0] - else: - return 0 - - def decrypt_received_docs(self): - """ - Get all the encrypted documents from the sync database and dispatch a - decrypt worker to decrypt each one of them. - """ - docs_by_generation = self.get_docs_by_generation(encrypted=True) - for doc_id, rev, content, gen, trans_id, _ \ - in filter(None, docs_by_generation): - self.decrypt_doc( - doc_id, rev, content, gen, trans_id, self.source_replica_uid) - - def process_decrypted(self): - """ - Process the already decrypted documents, and insert as many documents - as can be taken from the expected order without finding a gap. - - :return: Whether we have processed all the pending docs. - :rtype: bool - """ - # Acquire the lock to avoid processing while we're still - # getting data from the syncing stream, to avoid InvalidGeneration - # problems. - with self.write_encrypted_lock: - for doc_fields in self.get_insertable_docs_by_gen(): - self.insert_decrypted_local_doc(*doc_fields) - remaining = self.count_docs_in_sync_db() - return remaining == 0 - - def insert_decrypted_local_doc(self, doc_id, doc_rev, content, - gen, trans_id): - """ - Insert the decrypted document into the local sqlcipher database. - Makes use of the passed callback `return_doc_cb` passed to the caller - by u1db sync. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - """ - # could pass source_replica in params for callback chain - insert_fun = self._insert_doc_cb[self.source_replica_uid] - logger.debug("Sync decrypter pool: inserting doc in local db: " - "%s:%s %s" % (doc_id, doc_rev, gen)) - - # convert deleted documents to avoid error on document creation - if content == 'null': - content = None - doc = SoledadDocument(doc_id, doc_rev, content) - gen = int(gen) - insert_fun(doc, gen, trans_id) - - # If no errors found, remove it from the received database. - self.delete_received_doc(doc_id, doc_rev) - - def empty(self): - """ - Empty the received docs table of the sync database. - """ - sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) - self._sync_db.execute(sql) - - def _fetchall(self, *args, **kwargs): - with self._sync_db: - c = self._sync_db.cursor() - c.execute(*args, **kwargs) - return c.fetchall() - - def raise_in_case_of_failed_async_calls(self): - """ - Re-raise any exception raised by an async call. - - :raise Exception: Raised if an async call has raised an exception. - """ - for res in self._async_results: - if res.ready(): - if not res.successful(): - # re-raise the exception raised by the remote call - res.get() diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py new file mode 100644 index 00000000..c0a05d38 --- /dev/null +++ b/client/src/leap/soledad/client/encdecpool.py @@ -0,0 +1,745 @@ +# -*- coding: utf-8 -*- +# encdecpool.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +A pool of encryption/decryption concurrent and parallel workers for using +during synchronization. +""" + + +import multiprocessing +import json +import logging + +from twisted.internet import reactor +from twisted.internet import defer +from twisted.internet.threads import deferToThread + +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common import soledad_assert + +from leap.soledad.client.crypto import encrypt_docstr +from leap.soledad.client.crypto import decrypt_doc_dict + + +logger = logging.getLogger(__name__) + + +# +# Encrypt/decrypt pools of workers +# + +class SyncEncryptDecryptPool(object): + """ + Base class for encrypter/decrypter pools. + """ + + # TODO implement throttling to reduce cpu usage?? + WORKERS = multiprocessing.cpu_count() + + def __init__(self, crypto, sync_db): + """ + Initialize the pool of encryption-workers. + + :param crypto: A SoledadCryto instance to perform the encryption. + :type crypto: leap.soledad.crypto.SoledadCrypto + + :param sync_db: A database connection handle + :type sync_db: pysqlcipher.dbapi2.Connection + """ + self._crypto = crypto + self._sync_db = sync_db + self._pool = multiprocessing.Pool(self.WORKERS) + + def close(self): + """ + Cleanly close the pool of workers. + """ + logger.debug("Closing %s" % (self.__class__.__name__,)) + self._pool.close() + try: + self._pool.join() + except Exception: + pass + + def terminate(self): + """ + Terminate the pool of workers. + """ + logger.debug("Terminating %s" % (self.__class__.__name__,)) + self._pool.terminate() + + def _runOperation(self, query, *args): + """ + Run an operation on the sync db. + + :param query: The query to be executed. + :type query: str + :param args: A list of query arguments. + :type args: list + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + return self._sync_db.runOperation(query, *args) + + def _runQuery(self, query, *args): + """ + Run a query on the sync db. + + :param query: The query to be executed. + :type query: str + :param args: A list of query arguments. + :type args: list + + :return: A deferred that will fire with the results of the database + query. + :rtype: twisted.internet.defer.Deferred + """ + return self._sync_db.runQuery(query, *args) + + +def encrypt_doc_task(doc_id, doc_rev, content, key, secret): + """ + Encrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad storage secret (used for MAC auth). + :type secret: str + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + encrypted_content = encrypt_docstr( + content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, encrypted_content + + +class SyncEncrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric encryption + of documents to be synced. + """ + TABLE_NAME = "docs_tosync" + FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" + + ENCRYPT_LOOP_PERIOD = 0.5 + + def __init__(self, *args, **kwargs): + """ + Initialize the sync encrypter pool. + """ + SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + + self._stopped = False + self._sync_queue = multiprocessing.Queue() + + # start the encryption loop + self._deferred_loop = deferToThread(self._encrypt_docs_loop) + self._deferred_loop.addCallback( + lambda _: logger.debug("Finished encrypter thread.")) + + def enqueue_doc_for_encryption(self, doc): + """ + Enqueue a document for encryption. + + :param doc: The document to be encrypted. + :type doc: SoledadDocument + """ + try: + self.sync_queue.put_nowait(doc) + except multiprocessing.Queue.Full: + # do not asynchronously encrypt this file if the queue is full + pass + + def _encrypt_docs_loop(self): + """ + Process the syncing queue and send the documents there to be encrypted + in the sync db. They will be read by the SoledadSyncTarget during the + sync_exchange. + """ + logger.debug("Starting encrypter thread.") + while not self._stopped: + try: + doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD) + self._encrypt_doc(doc) + except multiprocessing.Queue.Empty: + pass + + def _encrypt_doc(self, doc): + """ + Symmetrically encrypt a document. + + :param doc: The document with contents to be encrypted. + :type doc: SoledadDocument + + :param workers: Whether to defer the decryption to the multiprocess + pool of workers. Useful for debugging purposes. + :type workers: bool + """ + soledad_assert(self._crypto is not None, "need a crypto object") + docstr = doc.get_json() + key = self._crypto.doc_passphrase(doc.doc_id) + secret = self._crypto.secret + args = doc.doc_id, doc.rev, docstr, key, secret + # encrypt asynchronously + self._pool.apply_async( + encrypt_doc_task, args, + callback=self._encrypt_doc_cb) + + def _encrypt_doc_cb(self, result): + """ + Insert results of encryption routine into the local sync database. + + :param result: A tuple containing the doc id, revision and encrypted + content. + :type result: tuple(str, str, str) + """ + doc_id, doc_rev, content = result + return self._insert_encrypted_local_doc(doc_id, doc_rev, content) + + def _insert_encrypted_local_doc(self, doc_id, doc_rev, content): + """ + Insert the contents of the encrypted doc into the local sync + database. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + """ + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ + % (self.TABLE_NAME,) + return self._runOperation(query, (doc_id, doc_rev, content)) + + @defer.inlineCallbacks + def get_encrypted_doc(self, doc_id, doc_rev): + """ + Get an encrypted document from the sync db. + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + + :return: A deferred that will fire with the encrypted content of the + document or None if the document was not found in the sync + db. + :rtype: twisted.internet.defer.Deferred + """ + logger.debug("Trying to get encrypted doc from sync db: %s" % doc_id) + query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \ + % self.TABLE_NAME + result = yield self._runQuery(query, (doc_id, doc_rev)) + if result: + val = result.pop() + defer.returnValue(val[0]) + defer.returnValue(None) + + def delete_encrypted_doc(self, doc_id, doc_rev): + """ + Delete an encrypted document from the sync db. + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM %s WHERE doc_id=? and rev=?" \ + % self.TABLE_NAME + self._runOperation(query, (doc_id, doc_rev)) + + def close(self): + """ + Close the encrypter pool. + """ + self._stopped = True + self._sync_queue.close() + q = self._sync_queue + del q + self._sync_queue = None + + +def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, + idx): + """ + Decrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The encrypted content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification of + that document. + :type trans_id: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad storage secret (used for MAC auth). + :type secret: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, decrypted_content, gen, trans_id, idx + + +class SyncDecrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric decryption + of documents that were received. + + The decryption of the received documents is done in two steps: + + 1. Encrypted documents are stored in the sync db by the actual soledad + sync loop. + 2. The soledad sync loop tells us how many documents we should expect + to process. + 3. We start a decrypt-and-process loop: + + a. Encrypted documents are fetched. + b. Encrypted documents are decrypted. + c. The longest possible list of decrypted documents are inserted + in the soledad db (this depends on which documents have already + arrived and which documents have already been decrypte, because + the order of insertion in the local soledad db matters). + d. Processed documents are deleted from the database. + + 4. When we have processed as many documents as we should, the loop + finishes. + """ + # TODO implement throttling to reduce cpu usage?? + TABLE_NAME = "docs_received" + FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ + "trans_id, encrypted, idx" + + """ + Period of recurrence of the periodic decrypting task, in seconds. + """ + DECRYPT_LOOP_PERIOD = 0.5 + + def __init__(self, *args, **kwargs): + """ + Initialize the decrypter pool, and setup a dict for putting the + results of the decrypted docs until they are picked by the insert + routine that gets them in order. + + :param insert_doc_cb: A callback for inserting received documents from + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. + :type insert_doc_cb: function + :param source_replica_uid: The source replica uid, used to find the + correct callback for inserting documents. + :type source_replica_uid: str + """ + self._insert_doc_cb = kwargs.pop("insert_doc_cb") + self.source_replica_uid = kwargs.pop("source_replica_uid") + SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + + self._last_inserted_idx = 0 + self._docs_to_process = None + self._processed_docs = 0 + + self._async_results = [] + self._failure = None + self._finished = False + + # XXX we want to empty the database before starting, but this is an + # asynchronous call, so we have to somehow make sure that it is + # executed before any other call to the database, without + # blocking. + self._empty() + + def _launch_decrypt_and_process(self): + d = self._decrypt_and_process_docs() + d.addErrback(lambda f: self._set_failure(f)) + + def _schedule_decrypt_and_process(self): + reactor.callLater( + self.DECRYPT_LOOP_PERIOD, + self._launch_decrypt_and_process) + + @property + def failure(self): + return self._failure + + def _set_failure(self, failure): + self._failure = failure + self._finished = True + + def failed(self): + return bool(self._failure) + + def start(self, docs_to_process): + """ + Set the number of documents we expect to process. + + This should be called by the during the sync exchange process as soon + as we know how many documents are arriving from the server. + + :param docs_to_process: The number of documents to process. + :type docs_to_process: int + """ + self._docs_to_process = docs_to_process + self._schedule_decrypt_and_process() + + def insert_encrypted_received_doc( + self, doc_id, doc_rev, content, gen, trans_id, idx): + """ + Insert a received message with encrypted content, to be decrypted later + on. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + docstr = json.dumps(content) + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ + % self.TABLE_NAME + return self._runOperation( + query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx)) + + def insert_received_doc( + self, doc_id, doc_rev, content, gen, trans_id, idx): + """ + Insert a document that is not symmetrically encrypted. + We store it in the staging area (the decrypted_docs dictionary) to be + picked up in order as the preceding documents are decrypted. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + if not isinstance(content, str): + content = json.dumps(content) + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ + % self.TABLE_NAME + return self._runOperation( + query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) + + def _delete_received_doc(self, doc_id): + """ + Delete a received doc after it was inserted into the local db. + + :param doc_id: Document ID. + :type doc_id: str + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM '%s' WHERE doc_id=?" \ + % self.TABLE_NAME + return self._runOperation(query, (doc_id,)) + + def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx): + """ + Dispatch an asynchronous document decrypting routine and save the + result object. + + :param doc_id: The ID for the document with contents to be encrypted. + :type doc: str + :param rev: The revision of the document. + :type rev: str + :param content: The serialized content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification + of that document. + :type trans_id: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A deferred that will fire after the document hasa been + decrypted and inserted in the sync db. + :rtype: twisted.internet.defer.Deferred + """ + soledad_assert(self._crypto is not None, "need a crypto object") + + content = json.loads(content) + key = self._crypto.doc_passphrase(doc_id) + secret = self._crypto.secret + args = doc_id, rev, content, gen, trans_id, key, secret, idx + # decrypt asynchronously + self._async_results.append( + self._pool.apply_async( + decrypt_doc_task, args)) + + def _decrypt_doc_cb(self, result): + """ + Store the decryption result in the sync db from where it will later be + picked by _process_decrypted_docs. + + :param result: A tuple containing the document's id, revision, + content, generation, transaction id and sync index. + :type result: tuple(str, str, str, int, str, int) + + :return: A deferred that will fire after the document has been + inserted in the sync db. + :rtype: twisted.internet.defer.Deferred + """ + doc_id, rev, content, gen, trans_id, idx = result + logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" + % (doc_id, rev, gen, trans_id)) + return self.insert_received_doc( + doc_id, rev, content, gen, trans_id, idx) + + def _get_docs(self, encrypted=None, order_by='idx', order='ASC'): + """ + Get documents from the received docs table in the sync db. + + :param encrypted: If not None, only return documents with encrypted + field equal to given parameter. + :type encrypted: bool or None + :param order_by: The name of the field to order results. + :type order_by: str + :param order: Whether the order should be ASC or DESC. + :type order: str + + :return: A deferred that will fire with the results of the database + query. + :rtype: twisted.internet.defer.Deferred + """ + query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ + "idx FROM %s" % self.TABLE_NAME + if encrypted is not None: + query += " WHERE encrypted = %d" % int(encrypted) + query += " ORDER BY %s %s" % (order_by, order) + return self._runQuery(query) + + @defer.inlineCallbacks + def _get_insertable_docs(self): + """ + Return a list of non-encrypted documents ready to be inserted. + + :return: A deferred that will fire with the list of insertable + documents. + :rtype: twisted.internet.defer.Deferred + """ + # here, we fetch the list of decrypted documents and compare with the + # index of the last succesfully processed document. + decrypted_docs = yield self._get_docs(encrypted=False) + insertable = [] + last_idx = self._last_inserted_idx + for doc_id, rev, content, gen, trans_id, encrypted, idx in \ + decrypted_docs: + # XXX for some reason, a document might not have been deleted from + # the database. This is a bug. In this point, already + # processed documents should have been removed from the sync + # database and we should not have to skip them here. We need + # to find out why this is happening, fix, and remove the + # skipping below. + if (idx < last_idx + 1): + continue + if (idx != last_idx + 1): + break + insertable.append((doc_id, rev, content, gen, trans_id, idx)) + last_idx += 1 + defer.returnValue(insertable) + + @defer.inlineCallbacks + def _async_decrypt_received_docs(self): + """ + Get all the encrypted documents from the sync database and dispatch a + decrypt worker to decrypt each one of them. + + :return: A deferred that will fire after all documents have been + decrypted and inserted back in the sync db. + :rtype: twisted.internet.defer.Deferred + """ + docs = yield self._get_docs(encrypted=True) + for doc_id, rev, content, gen, trans_id, _, idx in docs: + self._async_decrypt_doc( + doc_id, rev, content, gen, trans_id, idx) + + @defer.inlineCallbacks + def _process_decrypted_docs(self): + """ + Fetch as many decrypted documents as can be taken from the expected + order and insert them in the local replica. + + :return: A deferred that will fire with the list of inserted + documents. + :rtype: twisted.internet.defer.Deferred + """ + insertable = yield self._get_insertable_docs() + for doc_fields in insertable: + self._insert_decrypted_local_doc(*doc_fields) + defer.returnValue(insertable) + + def _delete_processed_docs(self, inserted): + """ + Delete from the sync db documents that have been processed. + + :param inserted: List of documents inserted in the previous process + step. + :type inserted: list + + :return: A list of deferreds that will fire when each operation in the + database has finished. + :rtype: twisted.internet.defer.DeferredList + """ + deferreds = [] + for doc_id, doc_rev, _, _, _, _ in inserted: + deferreds.append( + self._delete_received_doc(doc_id)) + if not deferreds: + return defer.succeed(None) + return defer.gatherResults(deferreds) + + def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, + gen, trans_id, idx): + """ + Insert the decrypted document into the local replica. + + Make use of the passed callback `insert_doc_cb` passed to the caller + by u1db sync. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification + of that document. + :type trans_id: str + """ + # could pass source_replica in params for callback chain + logger.debug("Sync decrypter pool: inserting doc in local db: " + "%s:%s %s" % (doc_id, doc_rev, gen)) + + # convert deleted documents to avoid error on document creation + if content == 'null': + content = None + doc = SoledadDocument(doc_id, doc_rev, content) + gen = int(gen) + self._insert_doc_cb(doc, gen, trans_id) + + # store info about processed docs + self._last_inserted_idx = idx + self._processed_docs += 1 + + def _empty(self): + """ + Empty the received docs table of the sync database. + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) + return self._runOperation(query) + + def _collect_async_decryption_results(self): + """ + Collect the results of the asynchronous doc decryptions and re-raise + any exception raised by a multiprocessing async decryption call. + + :raise Exception: Raised if an async call has raised an exception. + """ + async_results = self._async_results[:] + for res in async_results: + if res.ready(): + self._decrypt_doc_cb(res.get()) # might raise an exception! + self._async_results.remove(res) + + @defer.inlineCallbacks + def _decrypt_and_process_docs(self): + """ + Decrypt the documents received from remote replica and insert them + into the local one. + + This method implicitelly returns a defferred (see the decorator + above). It should only be called by _launch_decrypt_and_process(). + because this way any exceptions raised here will be stored by the + errback attached to the deferred returned. + + :return: A deferred which will fire after all decrypt, process and + delete operations have been executed. + :rtype: twisted.internet.defer.Deferred + """ + if not self.failed(): + if self._processed_docs < self._docs_to_process: + yield self._async_decrypt_received_docs() + yield self._collect_async_decryption_results() + docs = yield self._process_decrypted_docs() + yield self._delete_processed_docs(docs) + # recurse + self._schedule_decrypt_and_process() + else: + self._finished = True + + def has_finished(self): + """ + Return whether the decrypter has finished its work. + """ + return self._finished diff --git a/client/src/leap/soledad/client/http_client.py b/client/src/leap/soledad/client/http_client.py new file mode 100644 index 00000000..b08d199e --- /dev/null +++ b/client/src/leap/soledad/client/http_client.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +# http_client.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Twisted HTTP/HTTPS client. +""" + +import os + +from zope.interface import implements + +from OpenSSL.crypto import load_certificate +from OpenSSL.crypto import FILETYPE_PEM + +from twisted.internet import reactor +from twisted.internet.ssl import ClientContextFactory +from twisted.internet.ssl import CertificateOptions +from twisted.internet.defer import succeed + +from twisted.web.client import Agent +from twisted.web.client import HTTPConnectionPool +from twisted.web.client import readBody +from twisted.web.http_headers import Headers +from twisted.web.error import Error +from twisted.web.iweb import IBodyProducer + + +from leap.soledad.common.errors import InvalidAuthTokenError + + +# +# Setup a pool of connections +# + +_pool = HTTPConnectionPool(reactor, persistent=True) +_pool.maxPersistentPerHost = 10 +_agent = None + +# if we ever want to trust the system's CAs, we should use an agent like this: +# from twisted.web.client import BrowserLikePolicyForHTTPS +# _agent = Agent(reactor, BrowserLikePolicyForHTTPS(), pool=_pool) + + +# +# SSL/TLS certificate configuration +# + +def configure_certificate(cert_file): + """ + Configure an agent that verifies server certificates against a CA cert + file. + + :param cert_file: The path to the certificate file. + :type cert_file: str + """ + global _agent + cert = _load_cert(cert_file) + _agent = Agent( + reactor, + SoledadClientContextFactory(cert), + pool=_pool) + + +def _load_cert(cert_file): + """ + Load a X509 certificate from a file. + + :param cert_file: The path to the certificate file. + :type cert_file: str + + :return: The X509 certificate. + :rtype: OpenSSL.crypto.X509 + """ + if os.path.exists(cert_file): + with open(cert_file) as f: + data = f.read() + return load_certificate(FILETYPE_PEM, data) + + +class SoledadClientContextFactory(ClientContextFactory): + """ + A context factory that will verify the server's certificate against a + given CA certificate. + """ + + def __init__(self, cacert): + """ + Initialize the context factory. + + :param cacert: The CA certificate. + :type cacert: OpenSSL.crypto.X509 + """ + self._cacert = cacert + + def getContext(self, hostname, port): + opts = CertificateOptions(verify=True, caCerts=[self._cacert]) + return opts.getContext() + + +# +# HTTP request facilities +# + +def _unauth_to_invalid_token_error(failure): + """ + An errback to translate unauthorized errors to our own invalid token + class. + + :param failure: The original failure. + :type failure: twisted.python.failure.Failure + + :return: Either the original failure or an invalid auth token error. + :rtype: twisted.python.failure.Failure + """ + failure.trap(Error) + if failure.getErrorMessage() == "401 Unauthorized": + raise InvalidAuthTokenError + return failure + + +class StringBodyProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + implements(IBodyProducer) + + def __init__(self, body): + """ + Initialize the string produer. + + :param body: The body of the request. + :type body: str + """ + self.body = body + self.length = len(body) + + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A successful deferred. + :rtype: twisted.internet.defer.Deferred + """ + consumer.write(self.body) + return succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass + + +def httpRequest(url, method='GET', body=None, headers={}): + """ + Perform an HTTP request. + + :param url: The URL for the request. + :type url: str + :param method: The HTTP method of the request. + :type method: str + :param body: The body of the request, if any. + :type body: str + :param headers: The headers of the request. + :type headers: dict + + :return: A deferred that fires with the body of the request. + :rtype: twisted.internet.defer.Deferred + """ + if body: + body = StringBodyProducer(body) + d = _agent.request( + method, url, headers=Headers(headers), bodyProducer=body) + d.addCallbacks(readBody, _unauth_to_invalid_token_error) + return d diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py new file mode 100644 index 00000000..dc6c0e0a --- /dev/null +++ b/client/src/leap/soledad/client/http_target.py @@ -0,0 +1,598 @@ +# -*- coding: utf-8 -*- +# http_target.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +A U1DB backend for encrypting data before sending to server and decrypting +after receiving. +""" + + +import json +import base64 +import logging + +from uuid import uuid4 +from functools import partial + +from twisted.internet import defer +from twisted.internet import reactor + +from u1db import errors +from u1db import SyncTarget +from u1db.remote import utils + +from leap.soledad.common.document import SoledadDocument + +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.crypto import encrypt_doc +from leap.soledad.client.crypto import decrypt_doc +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import signal +from leap.soledad.client.encdecpool import SyncDecrypterPool +from leap.soledad.client.http_client import httpRequest +from leap.soledad.client.http_client import configure_certificate + + +logger = logging.getLogger(__name__) + + +class SoledadHTTPSyncTarget(SyncTarget): + """ + A SyncTarget that encrypts data before sending and decrypts data after + receiving. + + Normally encryption will have been written to the sync database upon + document modification. The sync database is also used to write temporarily + the parsed documents that the remote send us, before being decrypted and + written to the main database. + """ + + def __init__(self, url, source_replica_uid, creds, crypto, cert_file, + sync_db=None, sync_enc_pool=None): + """ + Initialize the sync target. + + :param url: The server sync url. + :type url: str + :param source_replica_uid: The source replica uid which we use when + deferring decryption. + :type source_replica_uid: str + :param url: The url of the target replica to sync with. + :type url: str + :param creds: A dictionary containing the uuid and token. + :type creds: creds + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt + document contents when syncing. + :type crypto: soledad.crypto.SoledadCrypto + :param cert_file: Path to the certificate of the ca used to validate + the SSL certificate used by the remote soledad + server. + :type cert_file: str + :param sync_db: Optional. handler for the db with the symmetric + encryption of the syncing documents. If + None, encryption will be done in-place, + instead of retreiving it from the dedicated + database. + :type sync_db: Sqlite handler + :param verify_ssl: Whether we should perform SSL server certificate + verification. + :type verify_ssl: bool + """ + if url.endswith("/"): + url = url[:-1] + self._url = str(url) + "/sync-from/" + source_replica_uid + self.source_replica_uid = source_replica_uid + self._auth_header = None + self.set_creds(creds) + self._crypto = crypto + self._sync_db = sync_db + self._sync_enc_pool = sync_enc_pool + self._insert_doc_cb = None + # asynchronous encryption/decryption attributes + self._decryption_callback = None + self._sync_decr_pool = None + configure_certificate(cert_file) + + def set_creds(self, creds): + """ + Update credentials. + + :param creds: A dictionary containing the uuid and token. + :type creds: dict + """ + uuid = creds['token']['uuid'] + token = creds['token']['token'] + auth = '%s:%s' % (uuid, token) + b64_token = base64.b64encode(auth) + self._auth_header = {'Authorization': ['Token %s' % b64_token]} + + @property + def _defer_encryption(self): + return self._sync_enc_pool is not None + + # + # SyncTarget API + # + + @defer.inlineCallbacks + def get_sync_info(self, source_replica_uid): + """ + Return information about known state of remote database. + + Return the replica_uid and the current database generation of the + remote database, and its last-seen database generation for the client + replica. + + :param source_replica_uid: The client-size replica uid. + :type source_replica_uid: str + + :return: A deferred which fires with (target_replica_uid, + target_replica_generation, target_trans_id, + source_replica_last_known_generation, + source_replica_last_known_transaction_id) + :rtype: twisted.internet.defer.Deferred + """ + raw = yield httpRequest(self._url, headers=self._auth_header) + res = json.loads(raw) + defer.returnValue([ + res['target_replica_uid'], + res['target_replica_generation'], + res['target_replica_transaction_id'], + res['source_replica_generation'], + res['source_transaction_id'] + ]) + + def record_sync_info( + self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + """ + Record tip information for another replica. + + After sync_exchange has been processed, the caller will have + received new content from this replica. This call allows the + source replica instigating the sync to inform us what their + generation became after applying the documents we returned. + + This is used to allow future sync operations to not need to repeat data + that we just talked about. It also means that if this is called at the + wrong time, there can be database records that will never be + synchronized. + + :param source_replica_uid: The identifier for the source replica. + :type source_replica_uid: str + :param source_replica_generation: The database generation for the + source replica. + :type source_replica_generation: int + :param source_replica_transaction_id: The transaction id associated + with the source replica + generation. + :type source_replica_transaction_id: str + + :return: A deferred which fires with the result of the query. + :rtype: twisted.internet.defer.Deferred + """ + data = json.dumps({ + 'generation': source_replica_generation, + 'transaction_id': source_replica_transaction_id + }) + headers = self._auth_header.copy() + headers.update({'content-type': ['application/json']}) + return httpRequest( + self._url, + method='PUT', + headers=headers, + body=data) + + @defer.inlineCallbacks + def sync_exchange(self, docs_by_generation, source_replica_uid, + last_known_generation, last_known_trans_id, + insert_doc_cb, ensure_callback=None, + defer_decryption=True, sync_id=None): + """ + Find out which documents the remote database does not know about, + encrypt and send them. After that, receive documents from the remote + database. + + :param docs_by_generations: A list of (doc_id, generation, trans_id) + of local documents that were changed since + the last local generation the remote + replica knows about. + :type docs_by_generations: list of tuples + + :param source_replica_uid: The uid of the source replica. + :type source_replica_uid: str + + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + + :param insert_doc_cb: A callback for inserting received documents from + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. + :type insert_doc_cb: function + + :param ensure_callback: A callback that ensures we know the target + replica uid if the target replica was just + created. + :type ensure_callback: function + + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + + :return: A deferred which fires with the new generation and + transaction id of the target replica. + :rtype: twisted.internet.defer.Deferred + """ + + self._ensure_callback = ensure_callback + + if sync_id is None: + sync_id = str(uuid4()) + self.source_replica_uid = source_replica_uid + + # save a reference to the callback so we can use it after decrypting + self._insert_doc_cb = insert_doc_cb + + gen_after_send, trans_id_after_send = yield self._send_docs( + docs_by_generation, + last_known_generation, + last_known_trans_id, + sync_id) + + cur_target_gen, cur_target_trans_id = yield self._receive_docs( + last_known_generation, last_known_trans_id, + ensure_callback, sync_id, + defer_decryption=defer_decryption) + + # update gen and trans id info in case we just sent and did not + # receive docs. + if gen_after_send is not None and gen_after_send > cur_target_gen: + cur_target_gen = gen_after_send + cur_target_trans_id = trans_id_after_send + + defer.returnValue([cur_target_gen, cur_target_trans_id]) + + # + # methods to send docs + # + + def _prepare(self, comma, entries, **dic): + entry = comma + '\r\n' + json.dumps(dic) + entries.append(entry) + return len(entry) + + @defer.inlineCallbacks + def _send_docs(self, docs_by_generation, last_known_generation, + last_known_trans_id, sync_id): + + if not docs_by_generation: + defer.returnValue([None, None]) + + headers = self._auth_header.copy() + headers.update({'content-type': ['application/x-soledad-sync-put']}) + # add remote replica metadata to the request + first_entries = ['['] + self._prepare( + '', first_entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + idx = 0 + total = len(docs_by_generation) + for doc, gen, trans_id in docs_by_generation: + idx += 1 + result = yield self._send_one_doc( + headers, first_entries, doc, + gen, trans_id, total, idx) + if self._defer_encryption: + self._sync_enc_pool.delete_encrypted_doc( + doc.doc_id, doc.rev) + signal(SOLEDAD_SYNC_SEND_STATUS, + "Soledad sync send status: %d/%d" + % (idx, total)) + response_dict = json.loads(result)[0] + gen_after_send = response_dict['new_generation'] + trans_id_after_send = response_dict['new_transaction_id'] + defer.returnValue([gen_after_send, trans_id_after_send]) + + @defer.inlineCallbacks + def _send_one_doc(self, headers, first_entries, doc, gen, trans_id, + number_of_docs, doc_idx): + entries = first_entries[:] + # add the document to the request + content = yield self._encrypt_doc(doc) + self._prepare( + ',', entries, + id=doc.doc_id, rev=doc.rev, content=content, gen=gen, + trans_id=trans_id, number_of_docs=number_of_docs, + doc_idx=doc_idx) + entries.append('\r\n]') + data = ''.join(entries) + result = yield httpRequest( + self._url, + method='POST', + headers=headers, + body=data) + defer.returnValue(result) + + def _encrypt_doc(self, doc): + d = None + if doc.is_tombstone(): + d = defer.succeed(None) + elif not self._defer_encryption: + # fallback case, for tests + d = defer.succeed(encrypt_doc(self._crypto, doc)) + else: + + def _maybe_encrypt_doc_inline(doc_json): + if doc_json is None: + # the document is not marked as tombstone, but we got + # nothing from the sync db. As it is not encrypted + # yet, we force inline encryption. + return encrypt_doc(self._crypto, doc) + return doc_json + + d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) + d.addCallback(_maybe_encrypt_doc_inline) + return d + + # + # methods to receive doc + # + + @defer.inlineCallbacks + def _receive_docs(self, last_known_generation, last_known_trans_id, + ensure_callback, sync_id, defer_decryption): + + self._queue_for_decrypt = defer_decryption \ + and self._sync_db is not None + + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + + if self._queue_for_decrypt: + logger.debug( + "Soledad sync: will queue received docs for decrypting.") + + if defer_decryption: + self._setup_sync_decr_pool() + + headers = self._auth_header.copy() + headers.update({'content-type': ['application/x-soledad-sync-get']}) + + #--------------------------------------------------------------------- + # maybe receive the first document + #--------------------------------------------------------------------- + + # we fetch the first document before fetching the rest because we need + # to know the total number of documents to be received, and this + # information comes as metadata to each request. + + d = self._receive_one_doc( + headers, last_known_generation, last_known_trans_id, + sync_id, 0) + d.addCallback(partial(self._insert_received_doc, 1, 1)) + number_of_changes, ngen, ntrans = yield d + + if defer_decryption: + self._sync_decr_pool.start(number_of_changes) + + #--------------------------------------------------------------------- + # maybe receive the rest of the documents + #--------------------------------------------------------------------- + + # launch many asynchronous fetches and inserts of received documents + # in the temporary sync db. Will wait for all results before + # continuing. + + received = 1 + deferreds = [] + while received < number_of_changes: + d = self._receive_one_doc( + headers, last_known_generation, + last_known_trans_id, sync_id, received) + d.addCallback( + partial( + self._insert_received_doc, + received + 1, # the index of the current received doc + number_of_changes)) + deferreds.append(d) + received += 1 + results = yield defer.gatherResults(deferreds) + + # get generation and transaction id of target after insertions + if deferreds: + _, new_generation, new_transaction_id = results.pop() + + #--------------------------------------------------------------------- + # wait for async decryption to finish + #--------------------------------------------------------------------- + + # below we do a trick so we can wait for the SyncDecrypterPool to + # finish its work before finally returning the new generation and + # transaction id of the remote replica. To achieve that, we create a + # Deferred that will return the results of the sync and, if we are + # decrypting asynchronously, we use reactor.callLater() to + # periodically poll the decrypter and check if it has finished its + # work. When it has finished, we either call the callback or errback + # of that deferred. In case we are not asynchronously decrypting, we + # just fire the deferred. + + def _shutdown_and_finish(res): + self._sync_decr_pool.close() + return new_generation, new_transaction_id + + d = defer.Deferred() + d.addCallback(_shutdown_and_finish) + + def _wait_or_finish(): + if not self._sync_decr_pool.has_finished(): + reactor.callLater( + SyncDecrypterPool.DECRYPT_LOOP_PERIOD, + _wait_or_finish) + else: + if not self._sync_decr_pool.failed(): + d.callback(None) + else: + d.errback(self._sync_decr_pool.failure) + + if defer_decryption: + _wait_or_finish() + else: + d.callback(None) + + new_generation, new_transaction_id = yield d + defer.returnValue([new_generation, new_transaction_id]) + + def _receive_one_doc(self, headers, last_known_generation, + last_known_trans_id, sync_id, received): + entries = ['['] + # add remote replica metadata to the request + self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + # inform server of how many documents have already been received + self._prepare( + ',', entries, received=received) + entries.append('\r\n]') + # send headers + return httpRequest( + self._url, + method='POST', + headers=headers, + body=''.join(entries)) + + def _insert_received_doc(self, idx, total, response): + """ + Insert a received document into the local replica. + + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + :param response: The body and headers of the response. + :type response: tuple(str, dict) + """ + new_generation, new_transaction_id, number_of_changes, doc_id, \ + rev, content, gen, trans_id = \ + self._parse_received_doc_response(response) + if doc_id is not None: + # decrypt incoming document and insert into local database + # ------------------------------------------------------------- + # symmetric decryption of document's contents + # ------------------------------------------------------------- + # If arriving content was symmetrically encrypted, we decrypt it. + # We do it inline if defer_decryption flag is False or no sync_db + # was defined, otherwise we defer it writing it to the received + # docs table. + doc = SoledadDocument(doc_id, rev, content) + if is_symmetrically_encrypted(doc): + if self._queue_for_decrypt: + self._sync_decr_pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + # defer_decryption is False or no-sync-db fallback + doc.set_json(decrypt_doc(self._crypto, doc)) + self._insert_doc_cb(doc, gen, trans_id) + else: + # not symmetrically encrypted doc, insert it directly + # or save it in the decrypted stage. + if self._queue_for_decrypt: + self._sync_decr_pool.insert_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + self._insert_doc_cb(doc, gen, trans_id) + # ------------------------------------------------------------- + # end of symmetric decryption + # ------------------------------------------------------------- + msg = "%d/%d" % (idx, total) + signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg) + logger.debug("Soledad sync receive status: %s" % msg) + return number_of_changes, new_generation, new_transaction_id + + def _parse_received_doc_response(self, response): + """ + Parse the response from the server containing the received document. + + :param response: The body and headers of the response. + :type response: tuple(str, dict) + + :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, + content, gen, trans_id) + :rtype: tuple + """ + # decode incoming stream + parts = response.splitlines() + if not parts or parts[0] != '[' or parts[-1] != ']': + raise errors.BrokenSyncStream + data = parts[1:-1] + # decode metadata + line, comma = utils.check_and_strip_comma(data[0]) + metadata = None + try: + metadata = json.loads(line) + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + number_of_changes = metadata['number_of_changes'] + except (json.JSONDecodeError, KeyError): + raise errors.BrokenSyncStream + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + # parse incoming document info + doc_id = None + rev = None + content = None + gen = None + trans_id = None + if number_of_changes > 0: + try: + entry = json.loads(data[1]) + doc_id = entry['id'] + rev = entry['rev'] + content = entry['content'] + gen = entry['gen'] + trans_id = entry['trans_id'] + except (IndexError, KeyError): + raise errors.BrokenSyncStream + return new_generation, new_transaction_id, number_of_changes, \ + doc_id, rev, content, gen, trans_id + + def _setup_sync_decr_pool(self): + """ + Set up the SyncDecrypterPool for deferred decryption. + """ + if self._sync_decr_pool is None and self._sync_db is not None: + # initialize syncing queue decryption pool + self._sync_decr_pool = SyncDecrypterPool( + self._crypto, + self._sync_db, + insert_doc_cb=self._insert_doc_cb, + source_replica_uid=self.source_replica_uid) diff --git a/client/src/leap/soledad/client/pragmas.py b/client/src/leap/soledad/client/pragmas.py index 2e9c53a3..55397d10 100644 --- a/client/src/leap/soledad/client/pragmas.py +++ b/client/src/leap/soledad/client/pragmas.py @@ -19,10 +19,53 @@ Different pragmas used in the initialization of the SQLCipher database. """ import logging import string +import threading +import os + +from leap.soledad.common import soledad_assert + logger = logging.getLogger(__name__) +_db_init_lock = threading.Lock() + + +def set_init_pragmas(conn, opts=None, extra_queries=None): + """ + Set the initialization pragmas. + + This includes the crypto pragmas, and any other options that must + be passed early to sqlcipher db. + """ + soledad_assert(opts is not None) + extra_queries = [] if extra_queries is None else extra_queries + with _db_init_lock: + # only one execution path should initialize the db + _set_init_pragmas(conn, opts, extra_queries) + + +def _set_init_pragmas(conn, opts, extra_queries): + + sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') + memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') + nowal = os.environ.get('LEAP_SQLITE_NOWAL') + + set_crypto_pragmas(conn, opts) + + if not nowal: + set_write_ahead_logging(conn) + if sync_off: + set_synchronous_off(conn) + else: + set_synchronous_normal(conn) + if memstore: + set_mem_temp_store(conn) + + for query in extra_queries: + conn.cursor().execute(query) + + def set_crypto_pragmas(db_handle, sqlcipher_opts): """ Set cryptographic params (key, cipher, KDF number of iterations and diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index af781a26..96f7e906 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -246,22 +246,26 @@ class SoledadSecrets(object): :return: Whether there's a storage secret for symmetric encryption. :rtype: bool """ - if self._secret_id is None or self._secret_id not in self._secrets: + logger.info("Checking if there's a secret in local storage...") + if (self._secret_id is None or self._secret_id not in self._secrets) \ + and os.path.isfile(self._secrets_path): try: self._load_secrets() # try to load from disk except IOError as e: logger.warning( 'IOError while loading secrets from disk: %s' % str(e)) - return False - return self.storage_secret is not None + + if self.storage_secret is not None: + logger.info("Found a secret in local storage.") + return True + + logger.info("Could not find a secret in local storage.") + return False def _load_secrets(self): """ Load storage secrets from local file. """ - # does the file exist in disk? - if not os.path.isfile(self._secrets_path): - raise IOError('File does not exist: %s' % self._secrets_path) # read storage secrets from file content = None with open(self._secrets_path, 'r') as f: diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index db3cb5cb..8e7d39c2 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -42,7 +42,6 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases handled by Soledad should be created by SQLCipher >= 2.0. """ import logging -import multiprocessing import os import threading import json @@ -54,19 +53,17 @@ from u1db.backends import sqlite_backend from hashlib import sha256 from contextlib import contextmanager from collections import defaultdict -from httplib import CannotSendRequest +from functools import partial from pysqlcipher import dbapi2 as sqlcipher_dbapi2 from twisted.internet import reactor -from twisted.internet.task import LoopingCall from twisted.internet.threads import deferToThreadPool from twisted.python.threadpool import ThreadPool -from twisted.python import log +from twisted.enterprise import adbapi -from leap.soledad.client import crypto -from leap.soledad.client.target import SoledadSyncTarget -from leap.soledad.client.target import PendingReceivedDocsSyncError +from leap.soledad.client import encdecpool +from leap.soledad.client.http_target import SoledadHTTPSyncTarget from leap.soledad.client.sync import SoledadSynchronizer from leap.soledad.client import pragmas @@ -102,46 +99,14 @@ def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True): conn = sqlcipher_dbapi2.connect( opts.path, check_same_thread=check_same_thread) - set_init_pragmas(conn, opts, extra_queries=on_init) + pragmas.set_init_pragmas(conn, opts, extra_queries=on_init) return conn -_db_init_lock = threading.Lock() - - -def set_init_pragmas(conn, opts=None, extra_queries=None): - """ - Set the initialization pragmas. - - This includes the crypto pragmas, and any other options that must - be passed early to sqlcipher db. - """ - soledad_assert(opts is not None) - extra_queries = [] if extra_queries is None else extra_queries - with _db_init_lock: - # only one execution path should initialize the db - _set_init_pragmas(conn, opts, extra_queries) - - -def _set_init_pragmas(conn, opts, extra_queries): - - sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') - memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') - nowal = os.environ.get('LEAP_SQLITE_NOWAL') - - pragmas.set_crypto_pragmas(conn, opts) - - if not nowal: - pragmas.set_write_ahead_logging(conn) - if sync_off: - pragmas.set_synchronous_off(conn) - else: - pragmas.set_synchronous_normal(conn) - if memstore: - pragmas.set_mem_temp_store(conn) - - for query in extra_queries: - conn.cursor().execute(query) +def initialize_sqlcipher_adbapi_db(opts, extra_queries=None): + from leap.soledad.client import sqlcipher_adbapi + return sqlcipher_adbapi.getConnectionPool( + opts, extra_queries=extra_queries) class SQLCipherOptions(object): @@ -151,22 +116,32 @@ class SQLCipherOptions(object): @classmethod def copy(cls, source, path=None, key=None, create=None, - is_raw_key=None, cipher=None, kdf_iter=None, cipher_page_size=None, - defer_encryption=None, sync_db_key=None): + is_raw_key=None, cipher=None, kdf_iter=None, + cipher_page_size=None, defer_encryption=None, sync_db_key=None): """ Return a copy of C{source} with parameters different than None replaced by new values. """ - return SQLCipherOptions( - path if path else source.path, - key if key else source.key, - create=create if create else source.create, - is_raw_key=is_raw_key if is_raw_key else source.is_raw_key, - cipher=cipher if cipher else source.cipher, - kdf_iter=kdf_iter if kdf_iter else source.kdf_iter, - cipher_page_size=cipher_page_size if cipher_page_size else source.cipher_page_size, - defer_encryption=defer_encryption if defer_encryption else source.defer_encryption, - sync_db_key=sync_db_key if sync_db_key else source.sync_db_key) + local_vars = locals() + args = [] + kwargs = {} + + for name in ["path", "key"]: + val = local_vars[name] + if val is not None: + args.append(val) + else: + args.append(getattr(source, name)) + + for name in ["create", "is_raw_key", "cipher", "kdf_iter", + "cipher_page_size", "defer_encryption", "sync_db_key"]: + val = local_vars[name] + if val is not None: + kwargs[name] = val + else: + kwargs[name] = getattr(source, name) + + return SQLCipherOptions(*args, **kwargs) def __init__(self, path, key, create=True, is_raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024, @@ -307,10 +282,9 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :rtype: str """ doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) - - # TODO XXX move to API XXX if self.defer_encryption: - self.sync_queue.put_nowait(doc) + # TODO move to api? + self._sync_enc_pool.enqueue_doc_for_encryption(doc) return doc_rev # @@ -440,7 +414,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): Soledad syncer implementation. """ - _sync_loop = None _sync_enc_pool = None """ @@ -450,13 +423,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db' """ - A dictionary that hold locks which avoid multiple sync attempts from the - same database replica. - """ - # XXX We do not need the lock here now. Remove. - encrypting_lock = threading.Lock() - - """ Period or recurrence of the Looping Call that will do the encryption to the syncdb (in seconds). """ @@ -468,19 +434,18 @@ class SQLCipherU1DBSync(SQLCipherDatabase): """ syncing_lock = defaultdict(threading.Lock) - def __init__(self, opts, soledad_crypto, replica_uid, + def __init__(self, opts, soledad_crypto, replica_uid, cert_file, defer_encryption=False): self._opts = opts self._path = opts.path self._crypto = soledad_crypto self.__replica_uid = replica_uid + self._cert_file = cert_file self._sync_db_key = opts.sync_db_key self._sync_db = None - self._sync_db_write_lock = None self._sync_enc_pool = None - self.sync_queue = None # we store syncers in a dictionary indexed by the target URL. We also # store a hash of the auth info in case auth info expires and we need @@ -490,8 +455,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # self._syncers = {'<url>': ('<auth_hash>', syncer), ...} self._syncers = {} - self._sync_db_write_lock = threading.Lock() - self.sync_queue = multiprocessing.Queue() self.running = False self._sync_threadpool = None @@ -503,25 +466,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._db_handle = None self._initialize_main_db() - if defer_encryption: - self._initialize_sync_db(opts) + # the sync_db is used both for deferred encryption and decryption, so + # we want to initialize it anyway to allow for all combinations of + # deferred encryption and decryption configurations. + self._initialize_sync_db(opts) + if defer_encryption: # initialize syncing queue encryption pool - self._sync_enc_pool = crypto.SyncEncrypterPool( - self._crypto, self._sync_db, self._sync_db_write_lock) - - # ----------------------------------------------------------------- - # From the documentation: If f returns a deferred, rescheduling - # will not take place until the deferred has fired. The result - # value is ignored. - - # TODO use this to avoid multiple sync attempts if the sync has not - # finished! - # ----------------------------------------------------------------- - - # XXX this was called sync_watcher --- trace any remnants - self._sync_loop = LoopingCall(self._encrypt_syncing_docs) - self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD) + self._sync_enc_pool = encdecpool.SyncEncrypterPool( + self._crypto, self._sync_db) self.shutdownID = None @@ -584,11 +537,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # somewhere else sync_opts = SQLCipherOptions.copy( opts, path=sync_db_path, create=True) - self._sync_db = initialize_sqlcipher_db( - sync_opts, on_init=self._sync_db_extra_init, - check_same_thread=False) - pragmas.set_crypto_pragmas(self._sync_db, opts) - # --------------------------------------------------------- + self._sync_db = getConnectionPool( + sync_opts, extra_queries=self._sync_db_extra_init) @property def _sync_db_extra_init(self): @@ -599,15 +549,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :rtype: tuple of strings """ maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)" - encr = crypto.SyncEncrypterPool - decr = crypto.SyncDecrypterPool + encr = encdecpool.SyncEncrypterPool + decr = encdecpool.SyncDecrypterPool sql_encr_table_query = (maybe_create % ( encr.TABLE_NAME, encr.FIELD_NAMES)) sql_decr_table_query = (maybe_create % ( decr.TABLE_NAME, decr.FIELD_NAMES)) return (sql_encr_table_query, sql_decr_table_query) - def sync(self, url, creds=None, autocreate=True, defer_decryption=True): + def sync(self, url, creds=None, defer_decryption=True): """ Synchronize documents with remote replica exposed at url. @@ -621,12 +571,9 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :param url: The url of the target replica to sync with. :type url: str - :param creds: - optional dictionary giving credentials. - to authorize the operation with the server. + :param creds: optional dictionary giving credentials to authorize the + operation with the server. :type creds: dict - :param autocreate: Ask the target to create the db if non-existent. - :type autocreate: bool :param defer_decryption: Whether to defer the decryption process using the intermediate database. If False, decryption will be done inline. @@ -637,49 +584,11 @@ class SQLCipherU1DBSync(SQLCipherDatabase): before the synchronisation was performed. :rtype: Deferred """ - kwargs = {'creds': creds, 'autocreate': autocreate, - 'defer_decryption': defer_decryption} - return self._defer_to_sync_threadpool(self._sync, url, **kwargs) - - def _sync(self, url, creds=None, autocreate=True, defer_decryption=True): - res = None - # the following context manager blocks until the syncing lock can be # acquired. - # TODO review, I think this is no longer needed with a 1-thread - # threadpool. - - log.msg("in _sync") - self.__url = url with self._syncer(url, creds=creds) as syncer: # XXX could mark the critical section here... - try: - log.msg('syncer sync...') - res = syncer.sync(autocreate=autocreate, - defer_decryption=defer_decryption) - - except PendingReceivedDocsSyncError: - logger.warning("Local sync db is not clear, skipping sync...") - return - except CannotSendRequest: - logger.warning("Connection with sync target couldn't be " - "established. Resetting connection...") - # closing the connection it will be recreated in the next try - syncer.sync_target.close() - return - - return res - - def stop_sync(self): - """ - Interrupt all ongoing syncs. - """ - self._stop_sync() - - def _stop_sync(self): - for url in self._syncers: - _, syncer = self._syncers[url] - syncer.stop() + return syncer.sync(defer_decryption=defer_decryption) @contextmanager def _syncer(self, url, creds=None): @@ -690,6 +599,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase): one instance synchronizing the same database replica at the same time. Because of that, this method blocks until the syncing lock can be acquired. + + :param creds: optional dictionary giving credentials to authorize the + operation with the server. + :type creds: dict """ with self.syncing_lock[self._path]: syncer = self._get_syncer(url, creds=creds) @@ -723,16 +636,17 @@ class SQLCipherU1DBSync(SQLCipherDatabase): h = sha256(json.dumps([url, creds])).hexdigest() cur_h, syncer = self._syncers.get(url, (None, None)) if syncer is None or h != cur_h: - wlock = self._sync_db_write_lock syncer = SoledadSynchronizer( self, - SoledadSyncTarget(url, - # XXX is the replica_uid ready? - self._replica_uid, - creds=creds, - crypto=self._crypto, - sync_db=self._sync_db, - sync_db_write_lock=wlock)) + SoledadHTTPSyncTarget( + url, + # XXX is the replica_uid ready? + self._replica_uid, + creds=creds, + crypto=self._crypto, + cert_file=self._cert_file, + sync_db=self._sync_db, + sync_enc_pool=self._sync_enc_pool)) self._syncers[url] = (h, syncer) # in order to reuse the same synchronizer multiple times we have to # reset its state (i.e. the number of documents received from target @@ -744,34 +658,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # Symmetric encryption of syncing docs # - def _encrypt_syncing_docs(self): - """ - Process the syncing queue and send the documents there - to be encrypted in the sync db. They will be read by the - SoledadSyncTarget during the sync_exchange. - - Called periodically from the LoopingCall self._sync_loop. - """ - # TODO should return a deferred that would firewhen the encryption is - # done. See note on __init__ - - lock = self.encrypting_lock - # optional wait flag used to avoid blocking - if not lock.acquire(False): - return - else: - queue = self.sync_queue - try: - while not queue.empty(): - doc = queue.get_nowait() - self._sync_enc_pool.encrypt_doc(doc) - - except Exception as exc: - logger.error("Error while encrypting docs to sync") - logger.exception(exc) - finally: - lock.release() - def get_generation(self): # FIXME # XXX this SHOULD BE a callback @@ -789,16 +675,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase): """ Close the syncer and syncdb orderly """ - # stop the sync loop for deferred encryption - if self._sync_loop is not None: - self._sync_loop.reset() - self._sync_loop.stop() - self._sync_loop = None # close all open syncers for url in self._syncers: - _, syncer = self._syncers[url] - syncer.close() - self._syncers = [] + del self._syncers[url] + # stop the encryption pool if self._sync_enc_pool is not None: self._sync_enc_pool.close() @@ -808,11 +688,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): if self._sync_db is not None: self._sync_db.close() self._sync_db = None - # close the sync queue - if self.sync_queue is not None: - self.sync_queue.close() - del self.sync_queue - self.sync_queue = None class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): @@ -903,3 +778,40 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False, has_conflicts=has_conflicts, syncable=syncable) sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) + + +# +# twisted.enterprise.adbapi SQLCipher implementation +# + +SQLCIPHER_CONNECTION_TIMEOUT = 10 + + +def getConnectionPool(opts, extra_queries=None): + openfun = partial( + pragmas.set_init_pragmas, + opts=opts, + extra_queries=extra_queries) + return SQLCipherConnectionPool( + database=opts.path, + check_same_thread=False, + cp_openfun=openfun, + timeout=SQLCIPHER_CONNECTION_TIMEOUT) + + +class SQLCipherConnection(adbapi.Connection): + pass + + +class SQLCipherTransaction(adbapi.Transaction): + pass + + +class SQLCipherConnectionPool(adbapi.ConnectionPool): + + connectionFactory = SQLCipherConnection + transactionFactory = SQLCipherTransaction + + def __init__(self, *args, **kwargs): + adbapi.ConnectionPool.__init__( + self, "pysqlcipher.dbapi2", *args, **kwargs) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index d3f106da..53172f31 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -16,17 +16,10 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. """ Soledad synchronization utilities. - -Extend u1db Synchronizer with the ability to: - - * Postpone the update of the known replica uid until all the decryption of - the incoming messages has been processed. - - * Be interrupted and recovered. """ import logging -import traceback -from threading import Lock + +from twisted.internet import defer from u1db import errors from u1db.sync import Synchronizer @@ -47,17 +40,8 @@ class SoledadSynchronizer(Synchronizer): Also modified to allow for interrupting the synchronization process. """ - # TODO can delegate the syncing to the api object, living in the reactor - # thread, and use a simple flag. - syncing_lock = Lock() - - def stop(self): - """ - Stop the current sync in progress. - """ - self.sync_target.stop() - - def sync(self, autocreate=False, defer_decryption=True): + @defer.inlineCallbacks + def sync(self, defer_decryption=True): """ Synchronize documents between source and target. @@ -69,48 +53,22 @@ class SoledadSynchronizer(Synchronizer): This is done to allow the ongoing parallel decryption of the incoming docs to proceed without `InvalidGeneration` conflicts. - :param autocreate: Whether the target replica should be created or not. - :type autocreate: bool :param defer_decryption: Whether to defer the decryption process using the intermediate database. If False, decryption will be done inline. :type defer_decryption: bool - """ - self.syncing_lock.acquire() - try: - return self._sync(autocreate=autocreate, - defer_decryption=defer_decryption) - except Exception: - # we want this exception to reach either SQLCipherU1DBSync.sync or - # the Solead api object itself, so it is poperly handled and/or - # logged... - raise - finally: - # ... but we also want to release the syncing lock so this - # Synchronizer may be reused later. - self.release_syncing_lock() - - def _sync(self, autocreate=False, defer_decryption=True): - """ - Helper function, called from the main `sync` method. - See `sync` docstring. + + :return: A deferred which will fire after the sync has finished. + :rtype: twisted.internet.defer.Deferred """ sync_target = self.sync_target # get target identifier, its current generation, # and its last-seen database generation for this source ensure_callback = None - try: - (self.target_replica_uid, target_gen, target_trans_id, - target_my_gen, target_my_trans_id) = \ - sync_target.get_sync_info(self.source._replica_uid) - except errors.DatabaseDoesNotExist: - if not autocreate: - raise - # will try to ask sync_exchange() to create the db - self.target_replica_uid = None - target_gen, target_trans_id = (0, '') - target_my_gen, target_my_trans_id = (0, '') + (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id) = yield \ + sync_target.get_sync_info(self.source._replica_uid) logger.debug( "Soledad target sync info:\n" @@ -151,15 +109,15 @@ class SoledadSynchronizer(Synchronizer): self.target_replica_uid) logger.debug( "Soledad source sync info:\n" - " source target gen: %d\n" - " source target trans_id: %s" + " last target gen known to source: %d\n" + " last target trans_id known to source: %s" % (target_last_known_gen, target_last_known_trans_id)) # validate transaction ids if not changes and target_last_known_gen == target_gen: if target_trans_id != target_last_known_trans_id: raise errors.InvalidTransactionId - return my_gen + defer.returnValue(my_gen) # prepare to send all the changed docs changed_doc_ids = [doc_id for doc_id, _, _ in changes] @@ -174,40 +132,26 @@ class SoledadSynchronizer(Synchronizer): # exchange documents and try to insert the returned ones with # the target, return target synced-up-to gen. - # - # The sync_exchange method may be interrupted, in which case it will - # return a tuple of Nones. - try: - new_gen, new_trans_id = sync_target.sync_exchange( - docs_by_generation, self.source._replica_uid, - target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback, - defer_decryption=defer_decryption) - logger.debug( - "Soledad source sync info after sync exchange:\n" - " source target gen: %d\n" - " source target trans_id: %s" - % (new_gen, new_trans_id)) - info = { - "target_replica_uid": self.target_replica_uid, - "new_gen": new_gen, - "new_trans_id": new_trans_id, - "my_gen": my_gen - } - self._syncing_info = info - if defer_decryption and not sync_target.has_syncdb(): - logger.debug("Sync target has no valid sync db, " - "aborting defer_decryption") - defer_decryption = False - self.complete_sync() - except Exception as e: - logger.error("Soledad sync error: %s" % str(e)) - logger.error(traceback.format_exc()) - sync_target.stop() - finally: - sync_target.close() - - return my_gen + new_gen, new_trans_id = yield sync_target.sync_exchange( + docs_by_generation, self.source._replica_uid, + target_last_known_gen, target_last_known_trans_id, + self._insert_doc_from_target, ensure_callback=ensure_callback, + defer_decryption=defer_decryption) + logger.debug( + "Soledad source sync info after sync exchange:\n" + " source known target gen: %d\n" + " source known target trans_id: %s" + % (new_gen, new_trans_id)) + info = { + "target_replica_uid": self.target_replica_uid, + "new_gen": new_gen, + "new_trans_id": new_trans_id, + "my_gen": my_gen + } + self._syncing_info = info + yield self.complete_sync() + + defer.returnValue(my_gen) def complete_sync(self): """ @@ -215,6 +159,9 @@ class SoledadSynchronizer(Synchronizer): (a) record last known generation and transaction uid for the remote replica, and (b) make target aware of our current reached generation. + + :return: A deferred which will fire when the sync has been completed. + :rtype: twisted.internet.defer.Deferred """ logger.debug("Completing deferred last step in SYNC...") @@ -225,39 +172,23 @@ class SoledadSynchronizer(Synchronizer): info["target_replica_uid"], info["new_gen"], info["new_trans_id"]) # if gapless record current reached generation with target - self._record_sync_info_with_the_target(info["my_gen"]) - - @property - def syncing(self): - """ - Return True if a sync is ongoing, False otherwise. - :rtype: bool - """ - # XXX FIXME we need some mechanism for timeout: should cleanup and - # release if something in the syncdb-decrypt goes wrong. we could keep - # track of the release date and cleanup unrealistic sync entries after - # some time. + return self._record_sync_info_with_the_target(info["my_gen"]) - # TODO use cancellable deferreds instead - locked = self.syncing_lock.locked() - return locked - - def release_syncing_lock(self): - """ - Release syncing lock if it's locked. + def _record_sync_info_with_the_target(self, start_generation): """ - if self.syncing_lock.locked(): - self.syncing_lock.release() + Store local replica metadata in server. - def close(self): - """ - Close sync target pool of workers. - """ - self.release_syncing_lock() - self.sync_target.close() + :param start_generation: The local generation when the sync was + started. + :type start_generation: int - def __del__(self): - """ - Cleanup: release lock. + :return: A deferred which will fire when the operation has been + completed. + :rtype: twisted.internet.defer.Deferred """ - self.release_syncing_lock() + cur_gen, trans_id = self.source._get_generation_info() + if (cur_gen == start_generation + self.num_inserted + and self.num_inserted > 0): + return self.sync_target.record_sync_info( + self.source._replica_uid, cur_gen, trans_id) + return defer.succeed(None) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py deleted file mode 100644 index 986bd991..00000000 --- a/client/src/leap/soledad/client/target.py +++ /dev/null @@ -1,1517 +0,0 @@ -# -*- coding: utf-8 -*- -# target.py -# Copyright (C) 2013, 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -A U1DB backend for encrypting data before sending to server and decrypting -after receiving. -""" -import cStringIO -import gzip -import logging -import re -import urllib -import threading - -from collections import defaultdict -from time import sleep -from uuid import uuid4 - -import simplejson as json - -from u1db import errors -from u1db.remote import utils, http_errors -from u1db.remote.http_target import HTTPSyncTarget -from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase -from zope.proxy import ProxyBase -from zope.proxy import sameProxiedObjects, setProxiedObject - -from twisted.internet.task import LoopingCall - -from leap.soledad.common.document import SoledadDocument -from leap.soledad.client.auth import TokenBasedAuth -from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.crypto import encrypt_doc, decrypt_doc -from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool -from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS -from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS -from leap.soledad.client.events import signal - - -logger = logging.getLogger(__name__) - - -def _gunzip(data): - """ - Uncompress data that is gzipped. - - :param data: gzipped data - :type data: basestring - """ - buffer = cStringIO.StringIO() - buffer.write(data) - buffer.seek(0) - try: - data = gzip.GzipFile(mode='r', fileobj=buffer).read() - except Exception: - logger.warning("Error while decrypting gzipped data") - buffer.close() - return data - - -class PendingReceivedDocsSyncError(Exception): - pass - - -class DocumentSyncerThread(threading.Thread): - """ - A thread that knowns how to either send or receive a document during the - sync process. - """ - - def __init__(self, doc_syncer, release_method, failed_method, - idx, total, last_request_lock=None, last_callback_lock=None): - """ - Initialize a new syncer thread. - - :param doc_syncer: A document syncer. - :type doc_syncer: HTTPDocumentSyncer - :param release_method: A method to be called when finished running. - :type release_method: callable(DocumentSyncerThread) - :param failed_method: A method to be called when we failed. - :type failed_method: callable(DocumentSyncerThread) - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - :param last_request_lock: A lock to wait for before actually performing - the request. - :type last_request_lock: threading.Lock - :param last_callback_lock: A lock to wait for before actually running - the success callback. - :type last_callback_lock: threading.Lock - """ - threading.Thread.__init__(self) - self._doc_syncer = doc_syncer - self._release_method = release_method - self._failed_method = failed_method - self._idx = idx - self._total = total - self._last_request_lock = last_request_lock - self._last_callback_lock = last_callback_lock - self._response = None - self._exception = None - self._result = None - self._success = False - # a lock so we can signal when we're finished - self._request_lock = threading.Lock() - self._request_lock.acquire() - self._callback_lock = threading.Lock() - self._callback_lock.acquire() - # make thread interruptable - self._stopped = None - self._stop_lock = threading.Lock() - - def run(self): - """ - Run the HTTP request and store results. - - This method will block and wait for an eventual previous operation to - finish before actually performing the request. It also traps any - exception and register any failure with the request. - """ - with self._stop_lock: - if self._stopped is None: - self._stopped = False - else: - return - - # eventually wait for the previous thread to finish - if self._last_request_lock is not None: - self._last_request_lock.acquire() - - # bail out in case we've been interrupted - if self.stopped is True: - return - - try: - self._response = self._doc_syncer.do_request() - self._request_lock.release() - - # run success callback - if self._doc_syncer.success_callback is not None: - - # eventually wait for callback lock release - if self._last_callback_lock is not None: - self._last_callback_lock.acquire() - - # bail out in case we've been interrupted - if self._stopped is True: - return - - self._result = self._doc_syncer.success_callback( - self._idx, self._total, self._response) - self._success = True - doc_syncer = self._doc_syncer - self._release_method(self, doc_syncer) - self._doc_syncer = None - # let next thread executed its callback - self._callback_lock.release() - - # trap any exception and signal failure - except Exception as e: - self._exception = e - self._success = False - # run failure callback - if self._doc_syncer.failure_callback is not None: - - # eventually wait for callback lock release - if self._last_callback_lock is not None: - self._last_callback_lock.acquire() - - # bail out in case we've been interrupted - if self.stopped is True: - return - - self._doc_syncer.failure_callback( - self._idx, self._total, self._exception) - - self._failed_method() - # we do not release the callback lock here because we - # failed and so we don't want other threads to succeed. - - @property - def doc_syncer(self): - return self._doc_syncer - - @property - def response(self): - return self._response - - @property - def exception(self): - return self._exception - - @property - def callback_lock(self): - return self._callback_lock - - @property - def request_lock(self): - return self._request_lock - - @property - def success(self): - return self._success - - def stop(self): - with self._stop_lock: - self._stopped = True - - @property - def stopped(self): - with self._stop_lock: - return self._stopped - - @property - def result(self): - return self._result - - -class DocumentSyncerPool(object): - """ - A pool of reusable document syncers. - """ - - POOL_SIZE = 10 - """ - The maximum amount of syncer threads running at the same time. - """ - - def __init__(self, raw_url, raw_creds, query_string, headers, - ensure_callback, stop_method): - """ - Initialize the document syncer pool. - - :param raw_url: The complete raw URL for the HTTP request. - :type raw_url: str - :param raw_creds: The credentials for the HTTP request. - :type raw_creds: dict - :param query_string: The query string for the HTTP request. - :type query_string: str - :param headers: The headers for the HTTP request. - :type headers: dict - :param ensure_callback: A callback to ensure we have the correct - target_replica_uid, if it was just created. - :type ensure_callback: callable - - """ - # save syncer params - self._raw_url = raw_url - self._raw_creds = raw_creds - self._query_string = query_string - self._headers = headers - self._ensure_callback = ensure_callback - self._stop_method = stop_method - # pool attributes - self._failures = False - self._semaphore_pool = threading.BoundedSemaphore( - DocumentSyncerPool.POOL_SIZE) - self._pool_access_lock = threading.Lock() - self._doc_syncers = [] - self._threads = [] - - def new_syncer_thread(self, idx, total, last_request_lock=None, - last_callback_lock=None): - """ - Yield a new document syncer thread. - - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - :param last_request_lock: A lock to wait for before actually performing - the request. - :type last_request_lock: threading.Lock - :param last_callback_lock: A lock to wait for before actually running - the success callback. - :type last_callback_lock: threading.Lock - """ - t = None - # wait for available threads - self._semaphore_pool.acquire() - with self._pool_access_lock: - if self._failures is True: - return None - # get a syncer - doc_syncer = self._get_syncer() - # we rely on DocumentSyncerThread.run() to release the lock using - # self.release_syncer so we can launch a new thread. - t = DocumentSyncerThread( - doc_syncer, self.release_syncer, self.cancel_threads, - idx, total, - last_request_lock=last_request_lock, - last_callback_lock=last_callback_lock) - self._threads.append(t) - return t - - def _failed(self): - with self._pool_access_lock: - self._failures = True - - @property - def failures(self): - return self._failures - - def _get_syncer(self): - """ - Get a document syncer from the pool. - - This method will create a new syncer whenever there is no syncer - available in the pool. - - :return: A syncer. - :rtype: HTTPDocumentSyncer - """ - syncer = None - # get an available syncer or create a new one - try: - syncer = self._doc_syncers.pop() - except IndexError: - syncer = HTTPDocumentSyncer( - self._raw_url, self._raw_creds, self._query_string, - self._headers, self._ensure_callback) - return syncer - - def release_syncer(self, syncer_thread, doc_syncer): - """ - Return a syncer to the pool after use and check for any failures. - - :param syncer: The syncer to be returned to the pool. - :type syncer: HTTPDocumentSyncer - """ - with self._pool_access_lock: - self._doc_syncers.append(doc_syncer) - if syncer_thread.success is True: - self._threads.remove(syncer_thread) - self._semaphore_pool.release() - - def cancel_threads(self): - """ - Stop all threads in the pool. - """ - # stop sync - self._stop_method() - stopped = [] - # stop all threads - logger.warning("Soledad sync: cancelling sync threads...") - with self._pool_access_lock: - self._failures = True - while self._threads: - t = self._threads.pop(0) - t.stop() - self._doc_syncers.append(t.doc_syncer) - stopped.append(t) - # release locks and join - while stopped: - t = stopped.pop(0) - t.request_lock.acquire(False) # just in case - t.request_lock.release() - t.callback_lock.acquire(False) # just in case - t.callback_lock.release() - # release any blocking semaphores - for i in xrange(DocumentSyncerPool.POOL_SIZE): - try: - self._semaphore_pool.release() - except ValueError: - break - logger.warning("Soledad sync: cancelled sync threads.") - - def cleanup(self): - """ - Close and remove any syncers from the pool. - """ - with self._pool_access_lock: - while self._doc_syncers: - syncer = self._doc_syncers.pop() - syncer.close() - del syncer - - -class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): - - def __init__(self, raw_url, creds, query_string, headers, ensure_callback): - """ - Initialize the client. - - :param raw_url: The raw URL of the target HTTP server. - :type raw_url: str - :param creds: Authentication credentials. - :type creds: dict - :param query_string: The query string for the HTTP request. - :type query_string: str - :param headers: The headers for the HTTP request. - :type headers: dict - :param ensure_callback: A callback to ensure we have the correct - target_replica_uid, if it was just created. - :type ensure_callback: callable - """ - HTTPClientBase.__init__(self, raw_url, creds=creds) - # info needed to perform the request - self._query_string = query_string - self._headers = headers - self._ensure_callback = ensure_callback - # the actual request method - self._request_method = None - self._success_callback = None - self._failure_callback = None - - def _reset(self): - """ - Reset this document syncer so we can reuse it. - """ - self._request_method = None - self._success_callback = None - self._failure_callback = None - self._request_method = None - - def set_request_method(self, method, *args, **kwargs): - """ - Set the actual method to perform the request. - - :param method: Either 'get' or 'put'. - :type method: str - :param args: Arguments for the request method. - :type args: list - :param kwargs: Keyworded arguments for the request method. - :type kwargs: dict - """ - self._reset() - # resolve request method - if method is 'get': - self._request_method = self._get_doc - elif method is 'put': - self._request_method = self._put_doc - else: - raise Exception - # store request method args - self._args = args - self._kwargs = kwargs - - def set_success_callback(self, callback): - self._success_callback = callback - - def set_failure_callback(self, callback): - self._failure_callback = callback - - @property - def success_callback(self): - return self._success_callback - - @property - def failure_callback(self): - return self._failure_callback - - def do_request(self): - """ - Actually perform the request. - - :return: The body and headers of the response. - :rtype: tuple - """ - self._ensure_connection() - args = self._args - kwargs = self._kwargs - return self._request_method(*args, **kwargs) - - def _request(self, method, url_parts, params=None, body=None, - content_type=None): - """ - Perform an HTTP request. - - :param method: The HTTP request method. - :type method: str - :param url_parts: A list representing the request path. - :type url_parts: list - :param params: Parameters for the URL query string. - :type params: dict - :param body: The body of the request. - :type body: str - :param content-type: The content-type of the request. - :type content-type: str - - :return: The body and headers of the response. - :rtype: tuple - - :raise errors.Unavailable: Raised after a number of unsuccesful - request attempts. - :raise Exception: Raised for any other exception ocurring during the - request. - """ - - self._ensure_connection() - unquoted_url = url_query = self._url.path - if url_parts: - if not url_query.endswith('/'): - url_query += '/' - unquoted_url = url_query - url_query += '/'.join(urllib.quote(part, safe='') - for part in url_parts) - # oauth performs its own quoting - unquoted_url += '/'.join(url_parts) - encoded_params = {} - if params: - for key, value in params.items(): - key = unicode(key).encode('utf-8') - encoded_params[key] = _encode_query_parameter(value) - url_query += ('?' + urllib.urlencode(encoded_params)) - if body is not None and not isinstance(body, basestring): - body = json.dumps(body) - content_type = 'application/json' - headers = {} - if content_type: - headers['content-type'] = content_type - - # Patched: We would like to receive gzip pretty please - # ---------------------------------------------------- - headers['accept-encoding'] = "gzip" - # ---------------------------------------------------- - - headers.update( - self._sign_request(method, unquoted_url, encoded_params)) - - for delay in self._delays: - try: - self._conn.request(method, url_query, body, headers) - return self._response() - except errors.Unavailable, e: - sleep(delay) - raise e - - def _response(self): - """ - Return the response of the (possibly gzipped) HTTP request. - - :return: The body and headers of the response. - :rtype: tuple - """ - resp = self._conn.getresponse() - body = resp.read() - headers = dict(resp.getheaders()) - - # Patched: We would like to decode gzip - # ---------------------------------------------------- - encoding = headers.get('content-encoding', '') - if "gzip" in encoding: - body = _gunzip(body) - # ---------------------------------------------------- - - if resp.status in (200, 201): - return body, headers - elif resp.status in http_errors.ERROR_STATUSES: - try: - respdic = json.loads(body) - except ValueError: - pass - else: - self._error(respdic) - # special case - if resp.status == 503: - raise errors.Unavailable(body, headers) - raise errors.HTTPError(resp.status, body, headers) - - def _prepare(self, comma, entries, **dic): - """ - Prepare an entry to be sent through a syncing POST request. - - :param comma: A string to be prepended to the current entry. - :type comma: str - :param entries: A list of entries accumulated to be sent on the - request. - :type entries: list - :param dic: The data to be included in this entry. - :type dic: dict - - :return: The size of the prepared entry. - :rtype: int - """ - entry = comma + '\r\n' + json.dumps(dic) - entries.append(entry) - return len(entry) - - def _init_post_request(self, action, content_length): - """ - Initiate a syncing POST request. - - :param url: The syncing URL. - :type url: str - :param action: The syncing action, either 'get' or 'receive'. - :type action: str - :param headers: The initial headers to be sent on this request. - :type headers: dict - :param content_length: The content-length of the request. - :type content_length: int - """ - self._conn.putrequest('POST', self._query_string) - self._conn.putheader( - 'content-type', 'application/x-soledad-sync-%s' % action) - for header_name, header_value in self._headers: - self._conn.putheader(header_name, header_value) - self._conn.putheader('accept-encoding', 'gzip') - self._conn.putheader('content-length', str(content_length)) - self._conn.endheaders() - - def _get_doc(self, received, sync_id, last_known_generation, - last_known_trans_id): - """ - Get a sync document from server by means of a POST request. - - :param received: The number of documents already received in the - current sync session. - :type received: int - :param sync_id: The id for the current sync session. - :type sync_id: str - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - - :return: The body and headers of the response. - :rtype: tuple - """ - entries = ['['] - size = 1 - # add remote replica metadata to the request - size += self._prepare( - '', entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=self._ensure_callback is not None) - # inform server of how many documents have already been received - size += self._prepare( - ',', entries, received=received) - entries.append('\r\n]') - size += len(entries[-1]) - # send headers - self._init_post_request('get', size) - # get document - for entry in entries: - self._conn.send(entry) - return self._response() - - def _put_doc(self, sync_id, last_known_generation, last_known_trans_id, - id, rev, content, gen, trans_id, number_of_docs, doc_idx): - """ - Put a sync document on server by means of a POST request. - - :param sync_id: The id for the current sync session. - :type sync_id: str - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - :param id: The document id. - :type id: str - :param rev: The document revision. - :type rev: str - :param content: The serialized document content. - :type content: str - :param gen: The generation of the modification of the document. - :type gen: int - :param trans_id: The transaction id of the modification of the - document. - :type trans_id: str - :param number_of_docs: The total amount of documents sent on this sync - session. - :type number_of_docs: int - :param doc_idx: The index of the current document being sent. - :type doc_idx: int - - :return: The body and headers of the response. - :rtype: tuple - """ - # prepare to send the document - entries = ['['] - size = 1 - # add remote replica metadata to the request - size += self._prepare( - '', entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=self._ensure_callback is not None) - # add the document to the request - size += self._prepare( - ',', entries, - id=id, rev=rev, content=content, gen=gen, trans_id=trans_id, - number_of_docs=number_of_docs, doc_idx=doc_idx) - entries.append('\r\n]') - size += len(entries[-1]) - # send headers - self._init_post_request('put', size) - # send document - for entry in entries: - self._conn.send(entry) - return self._response() - - def _sign_request(self, method, url_query, params): - """ - Return an authorization header to be included in the HTTP request. - - :param method: The HTTP method. - :type method: str - :param url_query: The URL query string. - :type url_query: str - :param params: A list with encoded query parameters. - :type param: list - - :return: The Authorization header. - :rtype: list of tuple - """ - return TokenBasedAuth._sign_request(self, method, url_query, params) - - def set_token_credentials(self, uuid, token): - """ - Store given credentials so we can sign the request later. - - :param uuid: The user's uuid. - :type uuid: str - :param token: The authentication token. - :type token: str - """ - TokenBasedAuth.set_token_credentials(self, uuid, token) - - -class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): - """ - A SyncTarget that encrypts data before sending and decrypts data after - receiving. - - Normally encryption will have been written to the sync database upon - document modification. The sync database is also used to write temporarily - the parsed documents that the remote send us, before being decrypted and - written to the main database. - """ - - # will later keep a reference to the insert-doc callback - # passed to sync_exchange - _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) - - """ - Period of recurrence of the periodic decrypting task, in seconds. - """ - DECRYPT_LOOP_PERIOD = 0.5 - - # - # Modified HTTPSyncTarget methods. - # - - def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, - sync_db=None, sync_db_write_lock=None): - """ - Initialize the SoledadSyncTarget. - - :param source_replica_uid: The source replica uid which we use when - deferring decryption. - :type source_replica_uid: str - :param url: The url of the target replica to sync with. - :type url: str - :param creds: Optional dictionary giving credentials. - to authorize the operation with the server. - :type creds: dict - :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt - document contents when syncing. - :type crypto: soledad.crypto.SoledadCrypto - :param sync_db: Optional. handler for the db with the symmetric - encryption of the syncing documents. If - None, encryption will be done in-place, - instead of retreiving it from the dedicated - database. - :type sync_db: Sqlite handler - :param sync_db_write_lock: a write lock for controlling concurrent - access to the sync_db - :type sync_db_write_lock: threading.Lock - """ - HTTPSyncTarget.__init__(self, url, creds) - self._raw_url = url - self._raw_creds = creds - self._crypto = crypto - self._stopped = True - self._stop_lock = threading.Lock() - self._sync_exchange_lock = threading.Lock() - self.source_replica_uid = source_replica_uid - self._defer_decryption = False - self._syncer_pool = None - - # deferred decryption attributes - self._sync_db = None - self._sync_db_write_lock = None - self._decryption_callback = None - self._sync_decr_pool = None - self._sync_loop = None - if sync_db and sync_db_write_lock is not None: - self._sync_db = sync_db - self._sync_db_write_lock = sync_db_write_lock - - def _setup_sync_decr_pool(self): - """ - Set up the SyncDecrypterPool for deferred decryption. - """ - if self._sync_decr_pool is None: - # initialize syncing queue decryption pool - self._sync_decr_pool = SyncDecrypterPool( - self._crypto, self._sync_db, - self._sync_db_write_lock, - insert_doc_cb=self._insert_doc_cb) - self._sync_decr_pool.set_source_replica_uid( - self.source_replica_uid) - - def _teardown_sync_decr_pool(self): - """ - Tear down the SyncDecrypterPool. - """ - if self._sync_decr_pool is not None: - self._sync_decr_pool.close() - self._sync_decr_pool = None - - def _setup_sync_loop(self): - """ - Set up the sync loop for deferred decryption. - """ - if self._sync_loop is None: - self._sync_loop = LoopingCall( - self._decrypt_syncing_received_docs) - self._sync_loop.start(self.DECRYPT_LOOP_PERIOD) - - def _teardown_sync_loop(self): - """ - Tear down the sync loop. - """ - if self._sync_loop is not None: - self._sync_loop.stop() - self._sync_loop = None - - def _get_replica_uid(self, url): - """ - Return replica uid from the url, or None. - - :param url: the replica url - :type url: str - """ - replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url) - return replica_uid_match[0] if len(replica_uid_match) > 0 else None - - @staticmethod - def connect(url, source_replica_uid=None, crypto=None): - return SoledadSyncTarget( - url, source_replica_uid=source_replica_uid, crypto=crypto) - - def _parse_received_doc_response(self, response): - """ - Parse the response from the server containing the received document. - - :param response: The body and headers of the response. - :type response: tuple(str, dict) - """ - data, _ = response - # decode incoming stream - parts = data.splitlines() - if not parts or parts[0] != '[' or parts[-1] != ']': - raise errors.BrokenSyncStream - data = parts[1:-1] - # decode metadata - line, comma = utils.check_and_strip_comma(data[0]) - metadata = None - try: - metadata = json.loads(line) - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - number_of_changes = metadata['number_of_changes'] - except (json.JSONDecodeError, KeyError): - raise errors.BrokenSyncStream - # make sure we have replica_uid from fresh new dbs - if self._ensure_callback and 'replica_uid' in metadata: - self._ensure_callback(metadata['replica_uid']) - # parse incoming document info - doc_id = None - rev = None - content = None - gen = None - trans_id = None - if number_of_changes > 0: - try: - entry = json.loads(data[1]) - doc_id = entry['id'] - rev = entry['rev'] - content = entry['content'] - gen = entry['gen'] - trans_id = entry['trans_id'] - except (IndexError, KeyError): - raise errors.BrokenSyncStream - return new_generation, new_transaction_id, number_of_changes, \ - doc_id, rev, content, gen, trans_id - - def _insert_received_doc(self, idx, total, response): - """ - Insert a received document into the local replica. - - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - :param response: The body and headers of the response. - :type response: tuple(str, dict) - """ - new_generation, new_transaction_id, number_of_changes, doc_id, \ - rev, content, gen, trans_id = \ - self._parse_received_doc_response(response) - if doc_id is not None: - # decrypt incoming document and insert into local database - # ------------------------------------------------------------- - # symmetric decryption of document's contents - # ------------------------------------------------------------- - # If arriving content was symmetrically encrypted, we decrypt it. - # We do it inline if defer_decryption flag is False or no sync_db - # was defined, otherwise we defer it writing it to the received - # docs table. - doc = SoledadDocument(doc_id, rev, content) - if is_symmetrically_encrypted(doc): - if self._queue_for_decrypt: - self._save_encrypted_received_doc( - doc, gen, trans_id, idx, total) - else: - # defer_decryption is False or no-sync-db fallback - doc.set_json(decrypt_doc(self._crypto, doc)) - self._return_doc_cb(doc, gen, trans_id) - else: - # not symmetrically encrypted doc, insert it directly - # or save it in the decrypted stage. - if self._queue_for_decrypt: - self._save_received_doc(doc, gen, trans_id, idx, total) - else: - self._return_doc_cb(doc, gen, trans_id) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- - msg = "%d/%d" % (idx + 1, total) - signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg) - logger.debug("Soledad sync receive status: %s" % msg) - return number_of_changes, new_generation, new_transaction_id - - def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, - headers, return_doc_cb, ensure_callback, sync_id, - defer_decryption=False): - """ - Fetch sync documents from the remote database and insert them in the - local database. - - If an incoming document's encryption scheme is equal to - EncryptionSchemes.SYMKEY, then this method will decrypt it with - Soledad's symmetric key. - - :param url: The syncing URL. - :type url: str - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - :param headers: The headers of the HTTP request. - :type headers: dict - :param return_doc_cb: A callback to insert docs from target. - :type return_doc_cb: callable - :param ensure_callback: A callback to ensure we have the correct - target_replica_uid, if it was just created. - :type ensure_callback: callable - :param sync_id: The id for the current sync session. - :type sync_id: str - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - - :raise BrokenSyncStream: If `data` is malformed. - - :return: A dictionary representing the first line of the response got - from remote replica. - :rtype: dict - """ - # we keep a reference to the callback in case we defer the decryption - self._return_doc_cb = return_doc_cb - self._queue_for_decrypt = defer_decryption \ - and self._sync_db is not None - - new_generation = last_known_generation - new_transaction_id = last_known_trans_id - - if self._queue_for_decrypt: - logger.debug( - "Soledad sync: will queue received docs for decrypting.") - - idx = 0 - number_of_changes = 1 - - first_request = True - last_callback_lock = None - threads = [] - - # get incoming documents - while idx < number_of_changes: - # bail out if sync process was interrupted - if self.stopped is True: - break - - # launch a thread to fetch one document from target - t = self._syncer_pool.new_syncer_thread( - idx, number_of_changes, - last_callback_lock=last_callback_lock) - - # bail out if any thread failed - if t is None: - self.stop() - break - - t.doc_syncer.set_request_method( - 'get', idx, sync_id, last_known_generation, - last_known_trans_id) - t.doc_syncer.set_success_callback(self._insert_received_doc) - - def _failure_callback(idx, total, exception): - _failure_msg = "Soledad sync: error while getting document " \ - "%d/%d: %s" \ - % (idx + 1, total, exception) - logger.warning("%s" % _failure_msg) - logger.warning("Soledad sync: failing gracefully, will " - "recover on next sync.") - - t.doc_syncer.set_failure_callback(_failure_callback) - threads.append(t) - t.start() - last_callback_lock = t.callback_lock - idx += 1 - - # if this is the first request, wait to update the number of - # changes - if first_request is True: - t.join() - if t.success: - number_of_changes, _, _ = t.result - else: - raise t.exception - first_request = False - - # make sure all threads finished and we have up-to-date info - last_successful_thread = None - while threads: - # check if there are failures - t = threads.pop(0) - t.join() - if t.success: - last_successful_thread = t - else: - raise t.exception - - # get information about last successful thread - if last_successful_thread is not None: - body, _ = last_successful_thread.response - parsed_body = json.loads(body) - # get current target gen and trans id in case no documents were - # transferred - if len(parsed_body) == 1: - metadata = parsed_body[0] - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - # get current target gen and trans id from last transferred - # document - else: - doc_data = parsed_body[1] - new_generation = doc_data['gen'] - new_transaction_id = doc_data['trans_id'] - - return new_generation, new_transaction_id - - def sync_exchange(self, docs_by_generations, - source_replica_uid, last_known_generation, - last_known_trans_id, return_doc_cb, - ensure_callback=None, defer_decryption=True, - sync_id=None): - """ - Find out which documents the remote database does not know about, - encrypt and send them. - - This does the same as the parent's method but encrypts content before - syncing. - - :param docs_by_generations: A list of (doc_id, generation, trans_id) - of local documents that were changed since - the last local generation the remote - replica knows about. - :type docs_by_generations: list of tuples - - :param source_replica_uid: The uid of the source replica. - :type source_replica_uid: str - - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - - :param return_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type return_doc_cb: function - - :param ensure_callback: A callback that ensures we know the target - replica uid if the target replica was just - created. - :type ensure_callback: function - - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - - :return: The new generation and transaction id of the target replica. - :rtype: tuple - """ - self._ensure_callback = ensure_callback - - if defer_decryption and self._sync_db is not None: - self._sync_exchange_lock.acquire() - self._setup_sync_decr_pool() - self._setup_sync_loop() - self._defer_decryption = True - else: - # fall back - defer_decryption = False - - self.start() - - if sync_id is None: - sync_id = str(uuid4()) - self.source_replica_uid = source_replica_uid - # let the decrypter pool access the passed callback to insert docs - setProxiedObject(self._insert_doc_cb[source_replica_uid], - return_doc_cb) - - # empty the database before starting a new sync - if defer_decryption is True and not self.clear_to_sync(): - self._sync_decr_pool.empty() - - self._ensure_connection() - if self._trace_hook: # for tests - self._trace_hook('sync_exchange') - url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) - headers = self._sign_request('POST', url, {}) - - cur_target_gen = last_known_generation - cur_target_trans_id = last_known_trans_id - - # send docs - msg = "%d/%d" % (0, len(docs_by_generations)) - signal(SOLEDAD_SYNC_SEND_STATUS, msg) - logger.debug("Soledad sync send status: %s" % msg) - - defer_encryption = self._sync_db is not None - self._syncer_pool = DocumentSyncerPool( - self._raw_url, self._raw_creds, url, headers, ensure_callback, - self.stop_syncer) - threads = [] - last_callback_lock = None - sent = 0 - total = len(docs_by_generations) - - synced = [] - number_of_docs = len(docs_by_generations) - - last_request_lock = None - for doc, gen, trans_id in docs_by_generations: - # allow for interrupting the sync process - if self.stopped is True: - break - - # skip non-syncable docs - if isinstance(doc, SoledadDocument) and not doc.syncable: - continue - - # ------------------------------------------------------------- - # symmetric encryption of document's contents - # ------------------------------------------------------------- - doc_json = doc.get_json() - if not doc.is_tombstone(): - if not defer_encryption: - # fallback case, for tests - doc_json = encrypt_doc(self._crypto, doc) - else: - try: - doc_json = self.get_encrypted_doc_from_db( - doc.doc_id, doc.rev) - except Exception as exc: - logger.error("Error while getting " - "encrypted doc from db") - logger.exception(exc) - continue - if doc_json is None: - # Not marked as tombstone, but we got nothing - # from the sync db. As it is not encrypted yet, we - # force inline encryption. - # TODO: implement a queue to deal with these cases. - doc_json = encrypt_doc(self._crypto, doc) - # ------------------------------------------------------------- - # end of symmetric encryption - # ------------------------------------------------------------- - t = self._syncer_pool.new_syncer_thread( - sent + 1, total, last_request_lock=last_request_lock, - last_callback_lock=last_callback_lock) - - # bail out if any thread failed - if t is None: - self.stop() - break - - # set the request method - t.doc_syncer.set_request_method( - 'put', sync_id, cur_target_gen, cur_target_trans_id, - id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, - trans_id=trans_id, number_of_docs=number_of_docs, - doc_idx=sent + 1) - # set the success calback - - def _success_callback(idx, total, response): - _success_msg = "Soledad sync send status: %d/%d" \ - % (idx, total) - signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) - logger.debug(_success_msg) - - t.doc_syncer.set_success_callback(_success_callback) - - # set the failure callback - def _failure_callback(idx, total, exception): - _failure_msg = "Soledad sync: error while sending document " \ - "%d/%d: %s" % (idx, total, exception) - logger.warning("%s" % _failure_msg) - logger.warning("Soledad sync: failing gracefully, will " - "recover on next sync.") - - t.doc_syncer.set_failure_callback(_failure_callback) - - # save thread and append - t.start() - threads.append((t, doc)) - - # update lock references so they can be used in next call to - # syncer_pool.new_syncer_thread() above - last_callback_lock = t.callback_lock - last_request_lock = t.request_lock - - sent += 1 - - # make sure all threads finished and we have up-to-date info - last_successful_thread = None - while threads: - # check if there are failures - t, doc = threads.pop(0) - t.join() - if t.success: - synced.append((doc.doc_id, doc.rev)) - last_successful_thread = t - else: - raise t.exception - - # delete documents from the sync database - if defer_encryption: - self.delete_encrypted_docs_from_db(synced) - - # get target gen and trans_id after docs - gen_after_send = None - trans_id_after_send = None - if last_successful_thread is not None: - response_dict = json.loads(last_successful_thread.response[0])[0] - gen_after_send = response_dict['new_generation'] - trans_id_after_send = response_dict['new_transaction_id'] - - # get docs from target - if self.stopped is False: - cur_target_gen, cur_target_trans_id = self._get_remote_docs( - url, - last_known_generation, last_known_trans_id, headers, - return_doc_cb, ensure_callback, sync_id, - defer_decryption=defer_decryption) - - self._syncer_pool.cleanup() - - # decrypt docs in case of deferred decryption - if defer_decryption: - while not self.clear_to_sync(): - sleep(self.DECRYPT_LOOP_PERIOD) - self._teardown_sync_loop() - self._teardown_sync_decr_pool() - self._sync_exchange_lock.release() - - # update gen and trans id info in case we just sent and did not - # receive docs. - if gen_after_send is not None and gen_after_send > cur_target_gen: - cur_target_gen = gen_after_send - cur_target_trans_id = trans_id_after_send - - self.stop() - self._syncer_pool = None - return cur_target_gen, cur_target_trans_id - - def start(self): - """ - Mark current sync session as running. - """ - with self._stop_lock: - self._stopped = False - - - def stop_syncer(self): - with self._stop_lock: - self._stopped = True - - def stop(self): - """ - Mark current sync session as stopped. - - This will eventually interrupt the sync_exchange() method and return - enough information to the synchronizer so the sync session can be - recovered afterwards. - """ - self.stop_syncer() - if self._syncer_pool: - self._syncer_pool.cancel_threads() - - @property - def stopped(self): - """ - Return whether this sync session is stopped. - - :return: Whether this sync session is stopped. - :rtype: bool - """ - with self._stop_lock: - return self._stopped is True - - def get_encrypted_doc_from_db(self, doc_id, doc_rev): - """ - Retrieve encrypted document from the database of encrypted docs for - sync. - - :param doc_id: The Document id. - :type doc_id: str - - :param doc_rev: The document revision - :type doc_rev: str - """ - encr = SyncEncrypterPool - sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( - encr.TABLE_NAME,)) - res = self._fetchall(sql, (doc_id, doc_rev)) - if res: - val = res.pop() - return val[0] - else: - # no doc found - return None - - def delete_encrypted_docs_from_db(self, docs_ids): - """ - Delete several encrypted documents from the database of symmetrically - encrypted docs to sync. - - :param docs_ids: an iterable with (doc_id, doc_rev) for all documents - to be deleted. - :type docs_ids: any iterable of tuples of str - """ - if docs_ids: - encr = SyncEncrypterPool - for doc_id, doc_rev in docs_ids: - sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( - encr.TABLE_NAME,)) - self._sync_db.execute(sql, (doc_id, doc_rev)) - - def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): - """ - Save a symmetrically encrypted incoming document into the received - docs table in the sync db. A decryption task will pick it up - from here in turn. - - :param doc: The document to save. - :type doc: SoledadDocument - :param gen: The generation. - :type gen: str - :param trans_id: Transacion id. - :type gen: str - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - """ - logger.debug( - "Enqueueing doc for decryption: %d/%d." - % (idx + 1, total)) - self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id) - - def _save_received_doc(self, doc, gen, trans_id, idx, total): - """ - Save any incoming document into the received docs table in the sync db. - - :param doc: The document to save. - :type doc: SoledadDocument - :param gen: The generation. - :type gen: str - :param trans_id: Transacion id. - :type gen: str - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - """ - logger.debug( - "Enqueueing doc, no decryption needed: %d/%d." - % (idx + 1, total)) - self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id) - - # - # Symmetric decryption of syncing docs - # - - def clear_to_sync(self): - """ - Return whether sync can proceed (ie, the received db table is empty). - - :return: Whether sync can proceed. - :rtype: bool - """ - if self._sync_decr_pool: - return self._sync_decr_pool.count_docs_in_sync_db() == 0 - return True - - def set_decryption_callback(self, cb): - """ - Set callback to be called when the decryption finishes. - - :param cb: The callback to be set. - :type cb: callable - """ - self._decryption_callback = cb - - def has_decryption_callback(self): - """ - Return True if there is a decryption callback set. - :rtype: bool - """ - return self._decryption_callback is not None - - def has_syncdb(self): - """ - Return True if we have an initialized syncdb. - """ - return self._sync_db is not None - - def _decrypt_syncing_received_docs(self): - """ - Decrypt the documents received from remote replica and insert them - into the local one. - - Called periodically from LoopingCall self._sync_loop. - """ - if sameProxiedObjects( - self._insert_doc_cb.get(self.source_replica_uid), - None): - return - - decrypter = self._sync_decr_pool - decrypter.raise_in_case_of_failed_async_calls() - decrypter.decrypt_received_docs() - decrypter.process_decrypted() - - def _sign_request(self, method, url_query, params): - """ - Return an authorization header to be included in the HTTP request. - - :param method: The HTTP method. - :type method: str - :param url_query: The URL query string. - :type url_query: str - :param params: A list with encoded query parameters. - :type param: list - - :return: The Authorization header. - :rtype: list of tuple - """ - return TokenBasedAuth._sign_request(self, method, url_query, params) - - def set_token_credentials(self, uuid, token): - """ - Store given credentials so we can sign the request later. - - :param uuid: The user's uuid. - :type uuid: str - :param token: The authentication token. - :type token: str - """ - TokenBasedAuth.set_token_credentials(self, uuid, token) - - def _fetchall(self, *args, **kwargs): - with self._sync_db: - c = self._sync_db.cursor() - c.execute(*args, **kwargs) - return c.fetchall() |