diff options
Diffstat (limited to 'client')
20 files changed, 1769 insertions, 2375 deletions
diff --git a/client/changes/bug_6757_fix-order-of-insertion-when-syncing b/client/changes/bug_6757_fix-order-of-insertion-when-syncing new file mode 100644 index 00000000..c0470f5a --- /dev/null +++ b/client/changes/bug_6757_fix-order-of-insertion-when-syncing @@ -0,0 +1,2 @@ + o Fix the order of insertion of documents when using workers for decrypting + incoming documents during a sync. Closes #6757. diff --git a/client/changes/bug_6892_fix-log-message-for-local-secret b/client/changes/bug_6892_fix-log-message-for-local-secret new file mode 100644 index 00000000..39c13257 --- /dev/null +++ b/client/changes/bug_6892_fix-log-message-for-local-secret @@ -0,0 +1,2 @@ + o Fix the log message when a local secret is not found so it's less + confusing. Closes #6892. diff --git a/client/changes/bug_always-initialize-the-sync-db b/client/changes/bug_always-initialize-the-sync-db new file mode 100644 index 00000000..2b12989a --- /dev/null +++ b/client/changes/bug_always-initialize-the-sync-db @@ -0,0 +1,2 @@ + o Always initialize the sync db to allow for both asynchronous encryption + and asynchronous decryption when syncing. diff --git a/client/changes/bug_fix-async-decrypt b/client/changes/bug_fix-async-decrypt new file mode 100644 index 00000000..eb0ce7b5 --- /dev/null +++ b/client/changes/bug_fix-async-decrypt @@ -0,0 +1,2 @@ + o Refactor asynchronous encryption/decryption code to its own file. + o Fix logging and graceful failing when exceptions are raised during sync. diff --git a/client/changes/bug_improve-log-when-fetching-documents b/client/changes/bug_improve-log-when-fetching-documents new file mode 100644 index 00000000..a67ce028 --- /dev/null +++ b/client/changes/bug_improve-log-when-fetching-documents @@ -0,0 +1 @@ + o Improve log messages when concurrently fetching documents from the server. diff --git a/client/changes/feature_add-pool-of-http-https-connections b/client/changes/feature_add-pool-of-http-https-connections new file mode 100644 index 00000000..7ff2a4ee --- /dev/null +++ b/client/changes/feature_add-pool-of-http-https-connections @@ -0,0 +1,2 @@ + o Add a pool of HTTP/HTTPS connections that is able to verify the server + certificate against a given CA certificate. diff --git a/client/changes/feature_use-twisted-adbapi-for-sync-db b/client/changes/feature_use-twisted-adbapi-for-sync-db new file mode 100644 index 00000000..41e5e6e3 --- /dev/null +++ b/client/changes/feature_use-twisted-adbapi-for-sync-db @@ -0,0 +1 @@ + o Use twisted.enterprise.adbapi for access to the sync database. diff --git a/client/changes/feature_use-twisted-web-for-client-sync b/client/changes/feature_use-twisted-web-for-client-sync new file mode 100644 index 00000000..b4d1d4a4 --- /dev/null +++ b/client/changes/feature_use-twisted-web-for-client-sync @@ -0,0 +1 @@ + o Use twisted.web.client for client sync. diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 7ad10db5..5b882bbe 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -32,6 +32,7 @@ from zope.proxy import ProxyBase, setProxiedObject from pysqlcipher.dbapi2 import OperationalError from leap.soledad.client import sqlcipher as soledad_sqlcipher +from leap.soledad.client.pragmas import set_init_pragmas logger = logging.getLogger(name=__name__) @@ -72,7 +73,7 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher"): :rtype: U1DBConnectionPool """ if openfun is None and driver == "pysqlcipher": - openfun = partial(soledad_sqlcipher.set_init_pragmas, opts=opts) + openfun = partial(set_init_pragmas, opts=opts) return U1DBConnectionPool( "%s.dbapi2" % driver, database=opts.path, check_same_thread=False, cp_openfun=openfun, diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 0f29503f..91e0a4a0 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -272,7 +272,8 @@ class Soledad(object): replica_uid = self._dbpool.replica_uid self._dbsyncer = SQLCipherU1DBSync( self._sqlcipher_opts, self._crypto, replica_uid, - self._defer_encryption) + SOLEDAD_CERT, + defer_encryption=self._defer_encryption) # # Closing methods @@ -630,6 +631,7 @@ class Soledad(object): Whether to defer decryption of documents, or do it inline while syncing. :type defer_decryption: bool + :return: A deferred whose callback will be invoked with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred @@ -650,7 +652,7 @@ class Soledad(object): sync_url = urlparse.urljoin(self._server_url, 'user-%s' % self.uuid) d = self._dbsyncer.sync( sync_url, - creds=self._creds, autocreate=False, + creds=self._creds, defer_decryption=defer_decryption) def _sync_callback(local_gen): @@ -658,21 +660,16 @@ class Soledad(object): soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid) return local_gen - # prevent sync failures from crashing the app by adding an errback - # that logs the failure and does not propagate it down the callback - # chain def _sync_errback(failure): s = StringIO() failure.printDetailedTraceback(file=s) msg = "Soledad exception when syncing!\n" + s.getvalue() logger.error(msg) + return failure d.addCallbacks(_sync_callback, _sync_errback) return d - def stop_sync(self): - self._dbsyncer.stop_sync() - @property def syncing(self): """ diff --git a/client/src/leap/soledad/client/auth.py b/client/src/leap/soledad/client/auth.py index 72ab0008..6dfabeb4 100644 --- a/client/src/leap/soledad/client/auth.py +++ b/client/src/leap/soledad/client/auth.py @@ -14,15 +14,13 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ Methods for token-based authentication. These methods have to be included in all classes that extend HTTPClient so they can do token-based auth requests to the Soledad server. """ - +import base64 from u1db import errors @@ -49,7 +47,7 @@ class TokenBasedAuth(object): Return an authorization header to be included in the HTTP request, in the form: - [('Authorization', 'Token <base64 encoded creds')] + [('Authorization', 'Token <(base64 encoded) uuid:token>')] :param method: The HTTP method. :type method: str @@ -64,7 +62,8 @@ class TokenBasedAuth(object): if 'token' in self._creds: uuid, token = self._creds['token'] auth = '%s:%s' % (uuid, token) - return [('Authorization', 'Token %s' % auth.encode('base64')[:-1])] + b64_token = base64.b64encode(auth) + return [('Authorization', 'Token %s' % b64_token)] else: raise errors.UnknownAuthMethod( 'Wrong credentials: %s' % self._creds) diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 107bf7f1..bdbaa8e0 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -23,17 +23,13 @@ import hmac import hashlib import json import logging -import multiprocessing -import threading from pycryptopp.cipher.aes import AES from pycryptopp.cipher.xsalsa20 import XSalsa20 -from zope.proxy import sameProxiedObjects from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type from leap.soledad.common import crypto -from leap.soledad.common.document import SoledadDocument logger = logging.getLogger(__name__) @@ -227,7 +223,7 @@ class SoledadCrypto(object): # def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv, - mac_method, secret): + mac_method, secret): """ Calculate a MAC for C{doc} using C{ciphertext}. @@ -378,7 +374,7 @@ def decrypt_doc(crypto, doc): def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, - enc_iv, mac_method, secret, doc_mac): + enc_iv, mac_method, secret, doc_mac): """ Verify that C{doc_mac} is a correct MAC for the given document. @@ -511,525 +507,3 @@ def is_symmetrically_encrypted(doc): == crypto.EncryptionSchemes.SYMKEY: return True return False - - -# -# Encrypt/decrypt pools of workers -# - -class SyncEncryptDecryptPool(object): - """ - Base class for encrypter/decrypter pools. - """ - WORKERS = multiprocessing.cpu_count() - - def __init__(self, crypto, sync_db, write_lock): - """ - Initialize the pool of encryption-workers. - - :param crypto: A SoledadCryto instance to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - - :param sync_db: A database connection handle - :type sync_db: pysqlcipher.dbapi2.Connection - - :param write_lock: a write lock for controlling concurrent access - to the sync_db - :type write_lock: threading.Lock - """ - self._pool = multiprocessing.Pool(self.WORKERS) - self._crypto = crypto - self._sync_db = sync_db - self._sync_db_write_lock = write_lock - - def close(self): - """ - Cleanly close the pool of workers. - """ - logger.debug("Closing %s" % (self.__class__.__name__,)) - self._pool.close() - try: - self._pool.join() - except Exception: - pass - - def terminate(self): - """ - Terminate the pool of workers. - """ - logger.debug("Terminating %s" % (self.__class__.__name__,)) - self._pool.terminate() - - -def encrypt_doc_task(doc_id, doc_rev, content, key, secret): - """ - Encrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad storage secret (used for MAC auth). - :type secret: str - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - encrypted_content = encrypt_docstr( - content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, encrypted_content - - -class SyncEncrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric encryption - of documents to be synced. - """ - # TODO implement throttling to reduce cpu usage?? - WORKERS = multiprocessing.cpu_count() - TABLE_NAME = "docs_tosync" - FIELD_NAMES = "doc_id, rev, content" - - def encrypt_doc(self, doc, workers=True): - """ - Symmetrically encrypt a document. - - :param doc: The document with contents to be encrypted. - :type doc: SoledadDocument - - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool - """ - soledad_assert(self._crypto is not None, "need a crypto object") - docstr = doc.get_json() - key = self._crypto.doc_passphrase(doc.doc_id) - secret = self._crypto.secret - args = doc.doc_id, doc.rev, docstr, key, secret - - try: - if workers: - res = self._pool.apply_async( - encrypt_doc_task, args, - callback=self.encrypt_doc_cb) - else: - # encrypt inline - res = encrypt_doc_task(*args) - self.encrypt_doc_cb(res) - - except Exception as exc: - logger.exception(exc) - - def encrypt_doc_cb(self, result): - """ - Insert results of encryption routine into the local sync database. - - :param result: A tuple containing the doc id, revision and encrypted - content. - :type result: tuple(str, str, str) - """ - doc_id, doc_rev, content = result - self.insert_encrypted_local_doc(doc_id, doc_rev, content) - - def insert_encrypted_local_doc(self, doc_id, doc_rev, content): - """ - Insert the contents of the encrypted doc into the local sync - database. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param content: The encrypted document. - :type content: str - """ - # FIXME --- callback should complete immediately since otherwise the - # thread which handles the results will get blocked - # Right now we're blocking the dispatcher with the writes to sqlite. - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,) - - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, )) - con.execute(sql_ins, (doc_id, doc_rev, content)) - - -def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): - """ - Decrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The encrypted content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification of - that document. - :type trans_id: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad storage secret (used for MAC auth). - :type secret: str - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - decrypted_content = decrypt_doc_dict( - content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, decrypted_content, gen, trans_id - - -class SyncDecrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric decryption - of documents that were received. - - The decryption of the received documents is done in two steps: - - 1. All the encrypted docs are collected, together with their generation - and transaction-id - 2. The docs are enqueued for decryption. When completed, they are - inserted following the generation order. - """ - # TODO implement throttling to reduce cpu usage?? - TABLE_NAME = "docs_received" - FIELD_NAMES = "doc_id, rev, content, gen, trans_id, encrypted" - - write_encrypted_lock = threading.Lock() - - def __init__(self, *args, **kwargs): - """ - Initialize the decrypter pool, and setup a dict for putting the - results of the decrypted docs until they are picked by the insert - routine that gets them in order. - - :param insert_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type insert_doc_cb: function - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - """ - self._insert_doc_cb = kwargs.pop("insert_doc_cb") - SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self.source_replica_uid = None - self._async_results = [] - - def set_source_replica_uid(self, source_replica_uid): - """ - Set the source replica uid for this decrypter pool instance. - - :param source_replica_uid: The uid of the source replica. - :type source_replica_uid: str - """ - self.source_replica_uid = source_replica_uid - - def insert_encrypted_received_doc(self, doc_id, doc_rev, content, - gen, trans_id): - """ - Insert a received message with encrypted content, to be decrypted later - on. - - :param doc_id: The Document ID. - :type doc_id: str - :param doc_rev: The Document Revision - :param doc_rev: str - :param content: the Content of the document - :type content: str - :param gen: the Document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - """ - docstr = json.dumps(content) - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( - self.TABLE_NAME,) - - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, )) - con.execute( - sql_ins, - (doc_id, doc_rev, docstr, gen, trans_id, 1)) - - def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): - """ - Insert a document that is not symmetrically encrypted. - We store it in the staging area (the decrypted_docs dictionary) to be - picked up in order as the preceding documents are decrypted. - - :param doc_id: The Document ID. - :type doc_id: str - :param doc_rev: The Document Revision - :param doc_rev: str - :param content: the Content of the document - :type content: str - :param gen: the Document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - """ - if not isinstance(content, str): - content = json.dumps(content) - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % ( - self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( - self.TABLE_NAME,) - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id,)) - con.execute( - sql_ins, - (doc_id, doc_rev, content, gen, trans_id, 0)) - - def delete_received_doc(self, doc_id, doc_rev): - """ - Delete a received doc after it was inserted into the local db. - - :param doc_id: Document ID. - :type doc_id: str - :param doc_rev: Document revision. - :type doc_rev: str - """ - sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( - self.TABLE_NAME,) - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, doc_rev)) - - def decrypt_doc(self, doc_id, rev, content, gen, trans_id, - source_replica_uid, workers=True): - """ - Symmetrically decrypt a document. - - :param doc_id: The ID for the document with contents to be encrypted. - :type doc: str - :param rev: The revision of the document. - :type rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - :param source_replica_uid: - :type source_replica_uid: str - - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool - """ - self.source_replica_uid = source_replica_uid - - # insert_doc_cb is a proxy object that gets updated with the right - # insert function only when the sync_target invokes the sync_exchange - # method. so, if we don't still have a non-empty callback, we refuse - # to proceed. - if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), - None): - logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") - return - - soledad_assert(self._crypto is not None, "need a crypto object") - - if len(content) == 0: - # not encrypted payload - return - - content = json.loads(content) - key = self._crypto.doc_passphrase(doc_id) - secret = self._crypto.secret - args = doc_id, rev, content, gen, trans_id, key, secret - - if workers: - # save the async result object so we can inspect it for failures - self._async_results.append(self._pool.apply_async( - decrypt_doc_task, args, - callback=self.decrypt_doc_cb)) - else: - # decrypt inline - res = decrypt_doc_task(*args) - self.decrypt_doc_cb(res) - - def decrypt_doc_cb(self, result): - """ - Store the decryption result in the sync db from where it will later be - picked by process_decrypted. - - :param result: A tuple containing the doc id, revision and encrypted - content. - :type result: tuple(str, str, str) - """ - doc_id, rev, content, gen, trans_id = result - logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" - % (doc_id, rev, gen, trans_id)) - self.insert_received_doc(doc_id, rev, content, gen, trans_id) - - def get_docs_by_generation(self, encrypted=None): - """ - Get all documents in the received table from the sync db, - ordered by generation. - - :param encrypted: If not None, only return documents with encrypted - field equal to given parameter. - :type encrypted: bool or None - - :return: list of doc_id, rev, generation, gen, trans_id - :rtype: list - """ - sql = "SELECT doc_id, rev, content, gen, trans_id, encrypted FROM %s" \ - % self.TABLE_NAME - if encrypted is not None: - sql += " WHERE encrypted = %d" % int(encrypted) - sql += " ORDER BY gen ASC" - return self._fetchall(sql) - - def get_insertable_docs_by_gen(self): - """ - Return a list of non-encrypted documents ready to be inserted. - """ - # here, we compare the list of all available docs with the list of - # decrypted docs and find the longest common prefix between these two - # lists. Note that the order of lists fetch matters: if instead we - # first fetch the list of decrypted docs and then the list of all - # docs, then some document might have been decrypted between these two - # calls, and if it is just the right doc then it might not be caught - # by the next loop. - all_docs = self.get_docs_by_generation() - decrypted_docs = self.get_docs_by_generation(encrypted=False) - insertable = [] - for doc_id, rev, _, gen, trans_id, encrypted in all_docs: - for next_doc_id, _, next_content, _, _, _ in decrypted_docs: - if doc_id == next_doc_id: - content = next_content - insertable.append((doc_id, rev, content, gen, trans_id)) - else: - break - return insertable - - def count_docs_in_sync_db(self, encrypted=None): - """ - Count how many documents we have in the table for received docs. - - :param encrypted: If not None, return count of documents with - encrypted field equal to given parameter. - :type encrypted: bool or None - - :return: The count of documents. - :rtype: int - """ - if self._sync_db is None: - logger.warning("cannot return count with null sync_db") - return - sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) - if encrypted is not None: - sql += " WHERE encrypted = %d" % int(encrypted) - res = self._fetchall(sql) - if res: - val = res.pop() - return val[0] - else: - return 0 - - def decrypt_received_docs(self): - """ - Get all the encrypted documents from the sync database and dispatch a - decrypt worker to decrypt each one of them. - """ - docs_by_generation = self.get_docs_by_generation(encrypted=True) - for doc_id, rev, content, gen, trans_id, _ \ - in filter(None, docs_by_generation): - self.decrypt_doc( - doc_id, rev, content, gen, trans_id, self.source_replica_uid) - - def process_decrypted(self): - """ - Process the already decrypted documents, and insert as many documents - as can be taken from the expected order without finding a gap. - - :return: Whether we have processed all the pending docs. - :rtype: bool - """ - # Acquire the lock to avoid processing while we're still - # getting data from the syncing stream, to avoid InvalidGeneration - # problems. - with self.write_encrypted_lock: - for doc_fields in self.get_insertable_docs_by_gen(): - self.insert_decrypted_local_doc(*doc_fields) - remaining = self.count_docs_in_sync_db() - return remaining == 0 - - def insert_decrypted_local_doc(self, doc_id, doc_rev, content, - gen, trans_id): - """ - Insert the decrypted document into the local sqlcipher database. - Makes use of the passed callback `return_doc_cb` passed to the caller - by u1db sync. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - """ - # could pass source_replica in params for callback chain - insert_fun = self._insert_doc_cb[self.source_replica_uid] - logger.debug("Sync decrypter pool: inserting doc in local db: " - "%s:%s %s" % (doc_id, doc_rev, gen)) - - # convert deleted documents to avoid error on document creation - if content == 'null': - content = None - doc = SoledadDocument(doc_id, doc_rev, content) - gen = int(gen) - insert_fun(doc, gen, trans_id) - - # If no errors found, remove it from the received database. - self.delete_received_doc(doc_id, doc_rev) - - def empty(self): - """ - Empty the received docs table of the sync database. - """ - sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) - self._sync_db.execute(sql) - - def _fetchall(self, *args, **kwargs): - with self._sync_db: - c = self._sync_db.cursor() - c.execute(*args, **kwargs) - return c.fetchall() - - def raise_in_case_of_failed_async_calls(self): - """ - Re-raise any exception raised by an async call. - - :raise Exception: Raised if an async call has raised an exception. - """ - for res in self._async_results: - if res.ready(): - if not res.successful(): - # re-raise the exception raised by the remote call - res.get() diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py new file mode 100644 index 00000000..c0a05d38 --- /dev/null +++ b/client/src/leap/soledad/client/encdecpool.py @@ -0,0 +1,745 @@ +# -*- coding: utf-8 -*- +# encdecpool.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +A pool of encryption/decryption concurrent and parallel workers for using +during synchronization. +""" + + +import multiprocessing +import json +import logging + +from twisted.internet import reactor +from twisted.internet import defer +from twisted.internet.threads import deferToThread + +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common import soledad_assert + +from leap.soledad.client.crypto import encrypt_docstr +from leap.soledad.client.crypto import decrypt_doc_dict + + +logger = logging.getLogger(__name__) + + +# +# Encrypt/decrypt pools of workers +# + +class SyncEncryptDecryptPool(object): + """ + Base class for encrypter/decrypter pools. + """ + + # TODO implement throttling to reduce cpu usage?? + WORKERS = multiprocessing.cpu_count() + + def __init__(self, crypto, sync_db): + """ + Initialize the pool of encryption-workers. + + :param crypto: A SoledadCryto instance to perform the encryption. + :type crypto: leap.soledad.crypto.SoledadCrypto + + :param sync_db: A database connection handle + :type sync_db: pysqlcipher.dbapi2.Connection + """ + self._crypto = crypto + self._sync_db = sync_db + self._pool = multiprocessing.Pool(self.WORKERS) + + def close(self): + """ + Cleanly close the pool of workers. + """ + logger.debug("Closing %s" % (self.__class__.__name__,)) + self._pool.close() + try: + self._pool.join() + except Exception: + pass + + def terminate(self): + """ + Terminate the pool of workers. + """ + logger.debug("Terminating %s" % (self.__class__.__name__,)) + self._pool.terminate() + + def _runOperation(self, query, *args): + """ + Run an operation on the sync db. + + :param query: The query to be executed. + :type query: str + :param args: A list of query arguments. + :type args: list + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + return self._sync_db.runOperation(query, *args) + + def _runQuery(self, query, *args): + """ + Run a query on the sync db. + + :param query: The query to be executed. + :type query: str + :param args: A list of query arguments. + :type args: list + + :return: A deferred that will fire with the results of the database + query. + :rtype: twisted.internet.defer.Deferred + """ + return self._sync_db.runQuery(query, *args) + + +def encrypt_doc_task(doc_id, doc_rev, content, key, secret): + """ + Encrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad storage secret (used for MAC auth). + :type secret: str + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + encrypted_content = encrypt_docstr( + content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, encrypted_content + + +class SyncEncrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric encryption + of documents to be synced. + """ + TABLE_NAME = "docs_tosync" + FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" + + ENCRYPT_LOOP_PERIOD = 0.5 + + def __init__(self, *args, **kwargs): + """ + Initialize the sync encrypter pool. + """ + SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + + self._stopped = False + self._sync_queue = multiprocessing.Queue() + + # start the encryption loop + self._deferred_loop = deferToThread(self._encrypt_docs_loop) + self._deferred_loop.addCallback( + lambda _: logger.debug("Finished encrypter thread.")) + + def enqueue_doc_for_encryption(self, doc): + """ + Enqueue a document for encryption. + + :param doc: The document to be encrypted. + :type doc: SoledadDocument + """ + try: + self.sync_queue.put_nowait(doc) + except multiprocessing.Queue.Full: + # do not asynchronously encrypt this file if the queue is full + pass + + def _encrypt_docs_loop(self): + """ + Process the syncing queue and send the documents there to be encrypted + in the sync db. They will be read by the SoledadSyncTarget during the + sync_exchange. + """ + logger.debug("Starting encrypter thread.") + while not self._stopped: + try: + doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD) + self._encrypt_doc(doc) + except multiprocessing.Queue.Empty: + pass + + def _encrypt_doc(self, doc): + """ + Symmetrically encrypt a document. + + :param doc: The document with contents to be encrypted. + :type doc: SoledadDocument + + :param workers: Whether to defer the decryption to the multiprocess + pool of workers. Useful for debugging purposes. + :type workers: bool + """ + soledad_assert(self._crypto is not None, "need a crypto object") + docstr = doc.get_json() + key = self._crypto.doc_passphrase(doc.doc_id) + secret = self._crypto.secret + args = doc.doc_id, doc.rev, docstr, key, secret + # encrypt asynchronously + self._pool.apply_async( + encrypt_doc_task, args, + callback=self._encrypt_doc_cb) + + def _encrypt_doc_cb(self, result): + """ + Insert results of encryption routine into the local sync database. + + :param result: A tuple containing the doc id, revision and encrypted + content. + :type result: tuple(str, str, str) + """ + doc_id, doc_rev, content = result + return self._insert_encrypted_local_doc(doc_id, doc_rev, content) + + def _insert_encrypted_local_doc(self, doc_id, doc_rev, content): + """ + Insert the contents of the encrypted doc into the local sync + database. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + """ + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ + % (self.TABLE_NAME,) + return self._runOperation(query, (doc_id, doc_rev, content)) + + @defer.inlineCallbacks + def get_encrypted_doc(self, doc_id, doc_rev): + """ + Get an encrypted document from the sync db. + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + + :return: A deferred that will fire with the encrypted content of the + document or None if the document was not found in the sync + db. + :rtype: twisted.internet.defer.Deferred + """ + logger.debug("Trying to get encrypted doc from sync db: %s" % doc_id) + query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \ + % self.TABLE_NAME + result = yield self._runQuery(query, (doc_id, doc_rev)) + if result: + val = result.pop() + defer.returnValue(val[0]) + defer.returnValue(None) + + def delete_encrypted_doc(self, doc_id, doc_rev): + """ + Delete an encrypted document from the sync db. + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM %s WHERE doc_id=? and rev=?" \ + % self.TABLE_NAME + self._runOperation(query, (doc_id, doc_rev)) + + def close(self): + """ + Close the encrypter pool. + """ + self._stopped = True + self._sync_queue.close() + q = self._sync_queue + del q + self._sync_queue = None + + +def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, + idx): + """ + Decrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The encrypted content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification of + that document. + :type trans_id: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad storage secret (used for MAC auth). + :type secret: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, decrypted_content, gen, trans_id, idx + + +class SyncDecrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric decryption + of documents that were received. + + The decryption of the received documents is done in two steps: + + 1. Encrypted documents are stored in the sync db by the actual soledad + sync loop. + 2. The soledad sync loop tells us how many documents we should expect + to process. + 3. We start a decrypt-and-process loop: + + a. Encrypted documents are fetched. + b. Encrypted documents are decrypted. + c. The longest possible list of decrypted documents are inserted + in the soledad db (this depends on which documents have already + arrived and which documents have already been decrypte, because + the order of insertion in the local soledad db matters). + d. Processed documents are deleted from the database. + + 4. When we have processed as many documents as we should, the loop + finishes. + """ + # TODO implement throttling to reduce cpu usage?? + TABLE_NAME = "docs_received" + FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ + "trans_id, encrypted, idx" + + """ + Period of recurrence of the periodic decrypting task, in seconds. + """ + DECRYPT_LOOP_PERIOD = 0.5 + + def __init__(self, *args, **kwargs): + """ + Initialize the decrypter pool, and setup a dict for putting the + results of the decrypted docs until they are picked by the insert + routine that gets them in order. + + :param insert_doc_cb: A callback for inserting received documents from + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. + :type insert_doc_cb: function + :param source_replica_uid: The source replica uid, used to find the + correct callback for inserting documents. + :type source_replica_uid: str + """ + self._insert_doc_cb = kwargs.pop("insert_doc_cb") + self.source_replica_uid = kwargs.pop("source_replica_uid") + SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + + self._last_inserted_idx = 0 + self._docs_to_process = None + self._processed_docs = 0 + + self._async_results = [] + self._failure = None + self._finished = False + + # XXX we want to empty the database before starting, but this is an + # asynchronous call, so we have to somehow make sure that it is + # executed before any other call to the database, without + # blocking. + self._empty() + + def _launch_decrypt_and_process(self): + d = self._decrypt_and_process_docs() + d.addErrback(lambda f: self._set_failure(f)) + + def _schedule_decrypt_and_process(self): + reactor.callLater( + self.DECRYPT_LOOP_PERIOD, + self._launch_decrypt_and_process) + + @property + def failure(self): + return self._failure + + def _set_failure(self, failure): + self._failure = failure + self._finished = True + + def failed(self): + return bool(self._failure) + + def start(self, docs_to_process): + """ + Set the number of documents we expect to process. + + This should be called by the during the sync exchange process as soon + as we know how many documents are arriving from the server. + + :param docs_to_process: The number of documents to process. + :type docs_to_process: int + """ + self._docs_to_process = docs_to_process + self._schedule_decrypt_and_process() + + def insert_encrypted_received_doc( + self, doc_id, doc_rev, content, gen, trans_id, idx): + """ + Insert a received message with encrypted content, to be decrypted later + on. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + docstr = json.dumps(content) + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ + % self.TABLE_NAME + return self._runOperation( + query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx)) + + def insert_received_doc( + self, doc_id, doc_rev, content, gen, trans_id, idx): + """ + Insert a document that is not symmetrically encrypted. + We store it in the staging area (the decrypted_docs dictionary) to be + picked up in order as the preceding documents are decrypted. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + if not isinstance(content, str): + content = json.dumps(content) + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ + % self.TABLE_NAME + return self._runOperation( + query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) + + def _delete_received_doc(self, doc_id): + """ + Delete a received doc after it was inserted into the local db. + + :param doc_id: Document ID. + :type doc_id: str + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM '%s' WHERE doc_id=?" \ + % self.TABLE_NAME + return self._runOperation(query, (doc_id,)) + + def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx): + """ + Dispatch an asynchronous document decrypting routine and save the + result object. + + :param doc_id: The ID for the document with contents to be encrypted. + :type doc: str + :param rev: The revision of the document. + :type rev: str + :param content: The serialized content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification + of that document. + :type trans_id: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A deferred that will fire after the document hasa been + decrypted and inserted in the sync db. + :rtype: twisted.internet.defer.Deferred + """ + soledad_assert(self._crypto is not None, "need a crypto object") + + content = json.loads(content) + key = self._crypto.doc_passphrase(doc_id) + secret = self._crypto.secret + args = doc_id, rev, content, gen, trans_id, key, secret, idx + # decrypt asynchronously + self._async_results.append( + self._pool.apply_async( + decrypt_doc_task, args)) + + def _decrypt_doc_cb(self, result): + """ + Store the decryption result in the sync db from where it will later be + picked by _process_decrypted_docs. + + :param result: A tuple containing the document's id, revision, + content, generation, transaction id and sync index. + :type result: tuple(str, str, str, int, str, int) + + :return: A deferred that will fire after the document has been + inserted in the sync db. + :rtype: twisted.internet.defer.Deferred + """ + doc_id, rev, content, gen, trans_id, idx = result + logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" + % (doc_id, rev, gen, trans_id)) + return self.insert_received_doc( + doc_id, rev, content, gen, trans_id, idx) + + def _get_docs(self, encrypted=None, order_by='idx', order='ASC'): + """ + Get documents from the received docs table in the sync db. + + :param encrypted: If not None, only return documents with encrypted + field equal to given parameter. + :type encrypted: bool or None + :param order_by: The name of the field to order results. + :type order_by: str + :param order: Whether the order should be ASC or DESC. + :type order: str + + :return: A deferred that will fire with the results of the database + query. + :rtype: twisted.internet.defer.Deferred + """ + query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ + "idx FROM %s" % self.TABLE_NAME + if encrypted is not None: + query += " WHERE encrypted = %d" % int(encrypted) + query += " ORDER BY %s %s" % (order_by, order) + return self._runQuery(query) + + @defer.inlineCallbacks + def _get_insertable_docs(self): + """ + Return a list of non-encrypted documents ready to be inserted. + + :return: A deferred that will fire with the list of insertable + documents. + :rtype: twisted.internet.defer.Deferred + """ + # here, we fetch the list of decrypted documents and compare with the + # index of the last succesfully processed document. + decrypted_docs = yield self._get_docs(encrypted=False) + insertable = [] + last_idx = self._last_inserted_idx + for doc_id, rev, content, gen, trans_id, encrypted, idx in \ + decrypted_docs: + # XXX for some reason, a document might not have been deleted from + # the database. This is a bug. In this point, already + # processed documents should have been removed from the sync + # database and we should not have to skip them here. We need + # to find out why this is happening, fix, and remove the + # skipping below. + if (idx < last_idx + 1): + continue + if (idx != last_idx + 1): + break + insertable.append((doc_id, rev, content, gen, trans_id, idx)) + last_idx += 1 + defer.returnValue(insertable) + + @defer.inlineCallbacks + def _async_decrypt_received_docs(self): + """ + Get all the encrypted documents from the sync database and dispatch a + decrypt worker to decrypt each one of them. + + :return: A deferred that will fire after all documents have been + decrypted and inserted back in the sync db. + :rtype: twisted.internet.defer.Deferred + """ + docs = yield self._get_docs(encrypted=True) + for doc_id, rev, content, gen, trans_id, _, idx in docs: + self._async_decrypt_doc( + doc_id, rev, content, gen, trans_id, idx) + + @defer.inlineCallbacks + def _process_decrypted_docs(self): + """ + Fetch as many decrypted documents as can be taken from the expected + order and insert them in the local replica. + + :return: A deferred that will fire with the list of inserted + documents. + :rtype: twisted.internet.defer.Deferred + """ + insertable = yield self._get_insertable_docs() + for doc_fields in insertable: + self._insert_decrypted_local_doc(*doc_fields) + defer.returnValue(insertable) + + def _delete_processed_docs(self, inserted): + """ + Delete from the sync db documents that have been processed. + + :param inserted: List of documents inserted in the previous process + step. + :type inserted: list + + :return: A list of deferreds that will fire when each operation in the + database has finished. + :rtype: twisted.internet.defer.DeferredList + """ + deferreds = [] + for doc_id, doc_rev, _, _, _, _ in inserted: + deferreds.append( + self._delete_received_doc(doc_id)) + if not deferreds: + return defer.succeed(None) + return defer.gatherResults(deferreds) + + def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, + gen, trans_id, idx): + """ + Insert the decrypted document into the local replica. + + Make use of the passed callback `insert_doc_cb` passed to the caller + by u1db sync. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification + of that document. + :type trans_id: str + """ + # could pass source_replica in params for callback chain + logger.debug("Sync decrypter pool: inserting doc in local db: " + "%s:%s %s" % (doc_id, doc_rev, gen)) + + # convert deleted documents to avoid error on document creation + if content == 'null': + content = None + doc = SoledadDocument(doc_id, doc_rev, content) + gen = int(gen) + self._insert_doc_cb(doc, gen, trans_id) + + # store info about processed docs + self._last_inserted_idx = idx + self._processed_docs += 1 + + def _empty(self): + """ + Empty the received docs table of the sync database. + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) + return self._runOperation(query) + + def _collect_async_decryption_results(self): + """ + Collect the results of the asynchronous doc decryptions and re-raise + any exception raised by a multiprocessing async decryption call. + + :raise Exception: Raised if an async call has raised an exception. + """ + async_results = self._async_results[:] + for res in async_results: + if res.ready(): + self._decrypt_doc_cb(res.get()) # might raise an exception! + self._async_results.remove(res) + + @defer.inlineCallbacks + def _decrypt_and_process_docs(self): + """ + Decrypt the documents received from remote replica and insert them + into the local one. + + This method implicitelly returns a defferred (see the decorator + above). It should only be called by _launch_decrypt_and_process(). + because this way any exceptions raised here will be stored by the + errback attached to the deferred returned. + + :return: A deferred which will fire after all decrypt, process and + delete operations have been executed. + :rtype: twisted.internet.defer.Deferred + """ + if not self.failed(): + if self._processed_docs < self._docs_to_process: + yield self._async_decrypt_received_docs() + yield self._collect_async_decryption_results() + docs = yield self._process_decrypted_docs() + yield self._delete_processed_docs(docs) + # recurse + self._schedule_decrypt_and_process() + else: + self._finished = True + + def has_finished(self): + """ + Return whether the decrypter has finished its work. + """ + return self._finished diff --git a/client/src/leap/soledad/client/http_client.py b/client/src/leap/soledad/client/http_client.py new file mode 100644 index 00000000..b08d199e --- /dev/null +++ b/client/src/leap/soledad/client/http_client.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +# http_client.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Twisted HTTP/HTTPS client. +""" + +import os + +from zope.interface import implements + +from OpenSSL.crypto import load_certificate +from OpenSSL.crypto import FILETYPE_PEM + +from twisted.internet import reactor +from twisted.internet.ssl import ClientContextFactory +from twisted.internet.ssl import CertificateOptions +from twisted.internet.defer import succeed + +from twisted.web.client import Agent +from twisted.web.client import HTTPConnectionPool +from twisted.web.client import readBody +from twisted.web.http_headers import Headers +from twisted.web.error import Error +from twisted.web.iweb import IBodyProducer + + +from leap.soledad.common.errors import InvalidAuthTokenError + + +# +# Setup a pool of connections +# + +_pool = HTTPConnectionPool(reactor, persistent=True) +_pool.maxPersistentPerHost = 10 +_agent = None + +# if we ever want to trust the system's CAs, we should use an agent like this: +# from twisted.web.client import BrowserLikePolicyForHTTPS +# _agent = Agent(reactor, BrowserLikePolicyForHTTPS(), pool=_pool) + + +# +# SSL/TLS certificate configuration +# + +def configure_certificate(cert_file): + """ + Configure an agent that verifies server certificates against a CA cert + file. + + :param cert_file: The path to the certificate file. + :type cert_file: str + """ + global _agent + cert = _load_cert(cert_file) + _agent = Agent( + reactor, + SoledadClientContextFactory(cert), + pool=_pool) + + +def _load_cert(cert_file): + """ + Load a X509 certificate from a file. + + :param cert_file: The path to the certificate file. + :type cert_file: str + + :return: The X509 certificate. + :rtype: OpenSSL.crypto.X509 + """ + if os.path.exists(cert_file): + with open(cert_file) as f: + data = f.read() + return load_certificate(FILETYPE_PEM, data) + + +class SoledadClientContextFactory(ClientContextFactory): + """ + A context factory that will verify the server's certificate against a + given CA certificate. + """ + + def __init__(self, cacert): + """ + Initialize the context factory. + + :param cacert: The CA certificate. + :type cacert: OpenSSL.crypto.X509 + """ + self._cacert = cacert + + def getContext(self, hostname, port): + opts = CertificateOptions(verify=True, caCerts=[self._cacert]) + return opts.getContext() + + +# +# HTTP request facilities +# + +def _unauth_to_invalid_token_error(failure): + """ + An errback to translate unauthorized errors to our own invalid token + class. + + :param failure: The original failure. + :type failure: twisted.python.failure.Failure + + :return: Either the original failure or an invalid auth token error. + :rtype: twisted.python.failure.Failure + """ + failure.trap(Error) + if failure.getErrorMessage() == "401 Unauthorized": + raise InvalidAuthTokenError + return failure + + +class StringBodyProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + implements(IBodyProducer) + + def __init__(self, body): + """ + Initialize the string produer. + + :param body: The body of the request. + :type body: str + """ + self.body = body + self.length = len(body) + + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A successful deferred. + :rtype: twisted.internet.defer.Deferred + """ + consumer.write(self.body) + return succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass + + +def httpRequest(url, method='GET', body=None, headers={}): + """ + Perform an HTTP request. + + :param url: The URL for the request. + :type url: str + :param method: The HTTP method of the request. + :type method: str + :param body: The body of the request, if any. + :type body: str + :param headers: The headers of the request. + :type headers: dict + + :return: A deferred that fires with the body of the request. + :rtype: twisted.internet.defer.Deferred + """ + if body: + body = StringBodyProducer(body) + d = _agent.request( + method, url, headers=Headers(headers), bodyProducer=body) + d.addCallbacks(readBody, _unauth_to_invalid_token_error) + return d diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py new file mode 100644 index 00000000..dc6c0e0a --- /dev/null +++ b/client/src/leap/soledad/client/http_target.py @@ -0,0 +1,598 @@ +# -*- coding: utf-8 -*- +# http_target.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +A U1DB backend for encrypting data before sending to server and decrypting +after receiving. +""" + + +import json +import base64 +import logging + +from uuid import uuid4 +from functools import partial + +from twisted.internet import defer +from twisted.internet import reactor + +from u1db import errors +from u1db import SyncTarget +from u1db.remote import utils + +from leap.soledad.common.document import SoledadDocument + +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.crypto import encrypt_doc +from leap.soledad.client.crypto import decrypt_doc +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import signal +from leap.soledad.client.encdecpool import SyncDecrypterPool +from leap.soledad.client.http_client import httpRequest +from leap.soledad.client.http_client import configure_certificate + + +logger = logging.getLogger(__name__) + + +class SoledadHTTPSyncTarget(SyncTarget): + """ + A SyncTarget that encrypts data before sending and decrypts data after + receiving. + + Normally encryption will have been written to the sync database upon + document modification. The sync database is also used to write temporarily + the parsed documents that the remote send us, before being decrypted and + written to the main database. + """ + + def __init__(self, url, source_replica_uid, creds, crypto, cert_file, + sync_db=None, sync_enc_pool=None): + """ + Initialize the sync target. + + :param url: The server sync url. + :type url: str + :param source_replica_uid: The source replica uid which we use when + deferring decryption. + :type source_replica_uid: str + :param url: The url of the target replica to sync with. + :type url: str + :param creds: A dictionary containing the uuid and token. + :type creds: creds + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt + document contents when syncing. + :type crypto: soledad.crypto.SoledadCrypto + :param cert_file: Path to the certificate of the ca used to validate + the SSL certificate used by the remote soledad + server. + :type cert_file: str + :param sync_db: Optional. handler for the db with the symmetric + encryption of the syncing documents. If + None, encryption will be done in-place, + instead of retreiving it from the dedicated + database. + :type sync_db: Sqlite handler + :param verify_ssl: Whether we should perform SSL server certificate + verification. + :type verify_ssl: bool + """ + if url.endswith("/"): + url = url[:-1] + self._url = str(url) + "/sync-from/" + source_replica_uid + self.source_replica_uid = source_replica_uid + self._auth_header = None + self.set_creds(creds) + self._crypto = crypto + self._sync_db = sync_db + self._sync_enc_pool = sync_enc_pool + self._insert_doc_cb = None + # asynchronous encryption/decryption attributes + self._decryption_callback = None + self._sync_decr_pool = None + configure_certificate(cert_file) + + def set_creds(self, creds): + """ + Update credentials. + + :param creds: A dictionary containing the uuid and token. + :type creds: dict + """ + uuid = creds['token']['uuid'] + token = creds['token']['token'] + auth = '%s:%s' % (uuid, token) + b64_token = base64.b64encode(auth) + self._auth_header = {'Authorization': ['Token %s' % b64_token]} + + @property + def _defer_encryption(self): + return self._sync_enc_pool is not None + + # + # SyncTarget API + # + + @defer.inlineCallbacks + def get_sync_info(self, source_replica_uid): + """ + Return information about known state of remote database. + + Return the replica_uid and the current database generation of the + remote database, and its last-seen database generation for the client + replica. + + :param source_replica_uid: The client-size replica uid. + :type source_replica_uid: str + + :return: A deferred which fires with (target_replica_uid, + target_replica_generation, target_trans_id, + source_replica_last_known_generation, + source_replica_last_known_transaction_id) + :rtype: twisted.internet.defer.Deferred + """ + raw = yield httpRequest(self._url, headers=self._auth_header) + res = json.loads(raw) + defer.returnValue([ + res['target_replica_uid'], + res['target_replica_generation'], + res['target_replica_transaction_id'], + res['source_replica_generation'], + res['source_transaction_id'] + ]) + + def record_sync_info( + self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + """ + Record tip information for another replica. + + After sync_exchange has been processed, the caller will have + received new content from this replica. This call allows the + source replica instigating the sync to inform us what their + generation became after applying the documents we returned. + + This is used to allow future sync operations to not need to repeat data + that we just talked about. It also means that if this is called at the + wrong time, there can be database records that will never be + synchronized. + + :param source_replica_uid: The identifier for the source replica. + :type source_replica_uid: str + :param source_replica_generation: The database generation for the + source replica. + :type source_replica_generation: int + :param source_replica_transaction_id: The transaction id associated + with the source replica + generation. + :type source_replica_transaction_id: str + + :return: A deferred which fires with the result of the query. + :rtype: twisted.internet.defer.Deferred + """ + data = json.dumps({ + 'generation': source_replica_generation, + 'transaction_id': source_replica_transaction_id + }) + headers = self._auth_header.copy() + headers.update({'content-type': ['application/json']}) + return httpRequest( + self._url, + method='PUT', + headers=headers, + body=data) + + @defer.inlineCallbacks + def sync_exchange(self, docs_by_generation, source_replica_uid, + last_known_generation, last_known_trans_id, + insert_doc_cb, ensure_callback=None, + defer_decryption=True, sync_id=None): + """ + Find out which documents the remote database does not know about, + encrypt and send them. After that, receive documents from the remote + database. + + :param docs_by_generations: A list of (doc_id, generation, trans_id) + of local documents that were changed since + the last local generation the remote + replica knows about. + :type docs_by_generations: list of tuples + + :param source_replica_uid: The uid of the source replica. + :type source_replica_uid: str + + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + + :param insert_doc_cb: A callback for inserting received documents from + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. + :type insert_doc_cb: function + + :param ensure_callback: A callback that ensures we know the target + replica uid if the target replica was just + created. + :type ensure_callback: function + + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + + :return: A deferred which fires with the new generation and + transaction id of the target replica. + :rtype: twisted.internet.defer.Deferred + """ + + self._ensure_callback = ensure_callback + + if sync_id is None: + sync_id = str(uuid4()) + self.source_replica_uid = source_replica_uid + + # save a reference to the callback so we can use it after decrypting + self._insert_doc_cb = insert_doc_cb + + gen_after_send, trans_id_after_send = yield self._send_docs( + docs_by_generation, + last_known_generation, + last_known_trans_id, + sync_id) + + cur_target_gen, cur_target_trans_id = yield self._receive_docs( + last_known_generation, last_known_trans_id, + ensure_callback, sync_id, + defer_decryption=defer_decryption) + + # update gen and trans id info in case we just sent and did not + # receive docs. + if gen_after_send is not None and gen_after_send > cur_target_gen: + cur_target_gen = gen_after_send + cur_target_trans_id = trans_id_after_send + + defer.returnValue([cur_target_gen, cur_target_trans_id]) + + # + # methods to send docs + # + + def _prepare(self, comma, entries, **dic): + entry = comma + '\r\n' + json.dumps(dic) + entries.append(entry) + return len(entry) + + @defer.inlineCallbacks + def _send_docs(self, docs_by_generation, last_known_generation, + last_known_trans_id, sync_id): + + if not docs_by_generation: + defer.returnValue([None, None]) + + headers = self._auth_header.copy() + headers.update({'content-type': ['application/x-soledad-sync-put']}) + # add remote replica metadata to the request + first_entries = ['['] + self._prepare( + '', first_entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + idx = 0 + total = len(docs_by_generation) + for doc, gen, trans_id in docs_by_generation: + idx += 1 + result = yield self._send_one_doc( + headers, first_entries, doc, + gen, trans_id, total, idx) + if self._defer_encryption: + self._sync_enc_pool.delete_encrypted_doc( + doc.doc_id, doc.rev) + signal(SOLEDAD_SYNC_SEND_STATUS, + "Soledad sync send status: %d/%d" + % (idx, total)) + response_dict = json.loads(result)[0] + gen_after_send = response_dict['new_generation'] + trans_id_after_send = response_dict['new_transaction_id'] + defer.returnValue([gen_after_send, trans_id_after_send]) + + @defer.inlineCallbacks + def _send_one_doc(self, headers, first_entries, doc, gen, trans_id, + number_of_docs, doc_idx): + entries = first_entries[:] + # add the document to the request + content = yield self._encrypt_doc(doc) + self._prepare( + ',', entries, + id=doc.doc_id, rev=doc.rev, content=content, gen=gen, + trans_id=trans_id, number_of_docs=number_of_docs, + doc_idx=doc_idx) + entries.append('\r\n]') + data = ''.join(entries) + result = yield httpRequest( + self._url, + method='POST', + headers=headers, + body=data) + defer.returnValue(result) + + def _encrypt_doc(self, doc): + d = None + if doc.is_tombstone(): + d = defer.succeed(None) + elif not self._defer_encryption: + # fallback case, for tests + d = defer.succeed(encrypt_doc(self._crypto, doc)) + else: + + def _maybe_encrypt_doc_inline(doc_json): + if doc_json is None: + # the document is not marked as tombstone, but we got + # nothing from the sync db. As it is not encrypted + # yet, we force inline encryption. + return encrypt_doc(self._crypto, doc) + return doc_json + + d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) + d.addCallback(_maybe_encrypt_doc_inline) + return d + + # + # methods to receive doc + # + + @defer.inlineCallbacks + def _receive_docs(self, last_known_generation, last_known_trans_id, + ensure_callback, sync_id, defer_decryption): + + self._queue_for_decrypt = defer_decryption \ + and self._sync_db is not None + + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + + if self._queue_for_decrypt: + logger.debug( + "Soledad sync: will queue received docs for decrypting.") + + if defer_decryption: + self._setup_sync_decr_pool() + + headers = self._auth_header.copy() + headers.update({'content-type': ['application/x-soledad-sync-get']}) + + #--------------------------------------------------------------------- + # maybe receive the first document + #--------------------------------------------------------------------- + + # we fetch the first document before fetching the rest because we need + # to know the total number of documents to be received, and this + # information comes as metadata to each request. + + d = self._receive_one_doc( + headers, last_known_generation, last_known_trans_id, + sync_id, 0) + d.addCallback(partial(self._insert_received_doc, 1, 1)) + number_of_changes, ngen, ntrans = yield d + + if defer_decryption: + self._sync_decr_pool.start(number_of_changes) + + #--------------------------------------------------------------------- + # maybe receive the rest of the documents + #--------------------------------------------------------------------- + + # launch many asynchronous fetches and inserts of received documents + # in the temporary sync db. Will wait for all results before + # continuing. + + received = 1 + deferreds = [] + while received < number_of_changes: + d = self._receive_one_doc( + headers, last_known_generation, + last_known_trans_id, sync_id, received) + d.addCallback( + partial( + self._insert_received_doc, + received + 1, # the index of the current received doc + number_of_changes)) + deferreds.append(d) + received += 1 + results = yield defer.gatherResults(deferreds) + + # get generation and transaction id of target after insertions + if deferreds: + _, new_generation, new_transaction_id = results.pop() + + #--------------------------------------------------------------------- + # wait for async decryption to finish + #--------------------------------------------------------------------- + + # below we do a trick so we can wait for the SyncDecrypterPool to + # finish its work before finally returning the new generation and + # transaction id of the remote replica. To achieve that, we create a + # Deferred that will return the results of the sync and, if we are + # decrypting asynchronously, we use reactor.callLater() to + # periodically poll the decrypter and check if it has finished its + # work. When it has finished, we either call the callback or errback + # of that deferred. In case we are not asynchronously decrypting, we + # just fire the deferred. + + def _shutdown_and_finish(res): + self._sync_decr_pool.close() + return new_generation, new_transaction_id + + d = defer.Deferred() + d.addCallback(_shutdown_and_finish) + + def _wait_or_finish(): + if not self._sync_decr_pool.has_finished(): + reactor.callLater( + SyncDecrypterPool.DECRYPT_LOOP_PERIOD, + _wait_or_finish) + else: + if not self._sync_decr_pool.failed(): + d.callback(None) + else: + d.errback(self._sync_decr_pool.failure) + + if defer_decryption: + _wait_or_finish() + else: + d.callback(None) + + new_generation, new_transaction_id = yield d + defer.returnValue([new_generation, new_transaction_id]) + + def _receive_one_doc(self, headers, last_known_generation, + last_known_trans_id, sync_id, received): + entries = ['['] + # add remote replica metadata to the request + self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + # inform server of how many documents have already been received + self._prepare( + ',', entries, received=received) + entries.append('\r\n]') + # send headers + return httpRequest( + self._url, + method='POST', + headers=headers, + body=''.join(entries)) + + def _insert_received_doc(self, idx, total, response): + """ + Insert a received document into the local replica. + + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + :param response: The body and headers of the response. + :type response: tuple(str, dict) + """ + new_generation, new_transaction_id, number_of_changes, doc_id, \ + rev, content, gen, trans_id = \ + self._parse_received_doc_response(response) + if doc_id is not None: + # decrypt incoming document and insert into local database + # ------------------------------------------------------------- + # symmetric decryption of document's contents + # ------------------------------------------------------------- + # If arriving content was symmetrically encrypted, we decrypt it. + # We do it inline if defer_decryption flag is False or no sync_db + # was defined, otherwise we defer it writing it to the received + # docs table. + doc = SoledadDocument(doc_id, rev, content) + if is_symmetrically_encrypted(doc): + if self._queue_for_decrypt: + self._sync_decr_pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + # defer_decryption is False or no-sync-db fallback + doc.set_json(decrypt_doc(self._crypto, doc)) + self._insert_doc_cb(doc, gen, trans_id) + else: + # not symmetrically encrypted doc, insert it directly + # or save it in the decrypted stage. + if self._queue_for_decrypt: + self._sync_decr_pool.insert_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + self._insert_doc_cb(doc, gen, trans_id) + # ------------------------------------------------------------- + # end of symmetric decryption + # ------------------------------------------------------------- + msg = "%d/%d" % (idx, total) + signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg) + logger.debug("Soledad sync receive status: %s" % msg) + return number_of_changes, new_generation, new_transaction_id + + def _parse_received_doc_response(self, response): + """ + Parse the response from the server containing the received document. + + :param response: The body and headers of the response. + :type response: tuple(str, dict) + + :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, + content, gen, trans_id) + :rtype: tuple + """ + # decode incoming stream + parts = response.splitlines() + if not parts or parts[0] != '[' or parts[-1] != ']': + raise errors.BrokenSyncStream + data = parts[1:-1] + # decode metadata + line, comma = utils.check_and_strip_comma(data[0]) + metadata = None + try: + metadata = json.loads(line) + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + number_of_changes = metadata['number_of_changes'] + except (json.JSONDecodeError, KeyError): + raise errors.BrokenSyncStream + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + # parse incoming document info + doc_id = None + rev = None + content = None + gen = None + trans_id = None + if number_of_changes > 0: + try: + entry = json.loads(data[1]) + doc_id = entry['id'] + rev = entry['rev'] + content = entry['content'] + gen = entry['gen'] + trans_id = entry['trans_id'] + except (IndexError, KeyError): + raise errors.BrokenSyncStream + return new_generation, new_transaction_id, number_of_changes, \ + doc_id, rev, content, gen, trans_id + + def _setup_sync_decr_pool(self): + """ + Set up the SyncDecrypterPool for deferred decryption. + """ + if self._sync_decr_pool is None and self._sync_db is not None: + # initialize syncing queue decryption pool + self._sync_decr_pool = SyncDecrypterPool( + self._crypto, + self._sync_db, + insert_doc_cb=self._insert_doc_cb, + source_replica_uid=self.source_replica_uid) diff --git a/client/src/leap/soledad/client/pragmas.py b/client/src/leap/soledad/client/pragmas.py index 2e9c53a3..55397d10 100644 --- a/client/src/leap/soledad/client/pragmas.py +++ b/client/src/leap/soledad/client/pragmas.py @@ -19,10 +19,53 @@ Different pragmas used in the initialization of the SQLCipher database. """ import logging import string +import threading +import os + +from leap.soledad.common import soledad_assert + logger = logging.getLogger(__name__) +_db_init_lock = threading.Lock() + + +def set_init_pragmas(conn, opts=None, extra_queries=None): + """ + Set the initialization pragmas. + + This includes the crypto pragmas, and any other options that must + be passed early to sqlcipher db. + """ + soledad_assert(opts is not None) + extra_queries = [] if extra_queries is None else extra_queries + with _db_init_lock: + # only one execution path should initialize the db + _set_init_pragmas(conn, opts, extra_queries) + + +def _set_init_pragmas(conn, opts, extra_queries): + + sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') + memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') + nowal = os.environ.get('LEAP_SQLITE_NOWAL') + + set_crypto_pragmas(conn, opts) + + if not nowal: + set_write_ahead_logging(conn) + if sync_off: + set_synchronous_off(conn) + else: + set_synchronous_normal(conn) + if memstore: + set_mem_temp_store(conn) + + for query in extra_queries: + conn.cursor().execute(query) + + def set_crypto_pragmas(db_handle, sqlcipher_opts): """ Set cryptographic params (key, cipher, KDF number of iterations and diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index af781a26..96f7e906 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -246,22 +246,26 @@ class SoledadSecrets(object): :return: Whether there's a storage secret for symmetric encryption. :rtype: bool """ - if self._secret_id is None or self._secret_id not in self._secrets: + logger.info("Checking if there's a secret in local storage...") + if (self._secret_id is None or self._secret_id not in self._secrets) \ + and os.path.isfile(self._secrets_path): try: self._load_secrets() # try to load from disk except IOError as e: logger.warning( 'IOError while loading secrets from disk: %s' % str(e)) - return False - return self.storage_secret is not None + + if self.storage_secret is not None: + logger.info("Found a secret in local storage.") + return True + + logger.info("Could not find a secret in local storage.") + return False def _load_secrets(self): """ Load storage secrets from local file. """ - # does the file exist in disk? - if not os.path.isfile(self._secrets_path): - raise IOError('File does not exist: %s' % self._secrets_path) # read storage secrets from file content = None with open(self._secrets_path, 'r') as f: diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index db3cb5cb..8e7d39c2 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -42,7 +42,6 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases handled by Soledad should be created by SQLCipher >= 2.0. """ import logging -import multiprocessing import os import threading import json @@ -54,19 +53,17 @@ from u1db.backends import sqlite_backend from hashlib import sha256 from contextlib import contextmanager from collections import defaultdict -from httplib import CannotSendRequest +from functools import partial from pysqlcipher import dbapi2 as sqlcipher_dbapi2 from twisted.internet import reactor -from twisted.internet.task import LoopingCall from twisted.internet.threads import deferToThreadPool from twisted.python.threadpool import ThreadPool -from twisted.python import log +from twisted.enterprise import adbapi -from leap.soledad.client import crypto -from leap.soledad.client.target import SoledadSyncTarget -from leap.soledad.client.target import PendingReceivedDocsSyncError +from leap.soledad.client import encdecpool +from leap.soledad.client.http_target import SoledadHTTPSyncTarget from leap.soledad.client.sync import SoledadSynchronizer from leap.soledad.client import pragmas @@ -102,46 +99,14 @@ def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True): conn = sqlcipher_dbapi2.connect( opts.path, check_same_thread=check_same_thread) - set_init_pragmas(conn, opts, extra_queries=on_init) + pragmas.set_init_pragmas(conn, opts, extra_queries=on_init) return conn -_db_init_lock = threading.Lock() - - -def set_init_pragmas(conn, opts=None, extra_queries=None): - """ - Set the initialization pragmas. - - This includes the crypto pragmas, and any other options that must - be passed early to sqlcipher db. - """ - soledad_assert(opts is not None) - extra_queries = [] if extra_queries is None else extra_queries - with _db_init_lock: - # only one execution path should initialize the db - _set_init_pragmas(conn, opts, extra_queries) - - -def _set_init_pragmas(conn, opts, extra_queries): - - sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') - memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') - nowal = os.environ.get('LEAP_SQLITE_NOWAL') - - pragmas.set_crypto_pragmas(conn, opts) - - if not nowal: - pragmas.set_write_ahead_logging(conn) - if sync_off: - pragmas.set_synchronous_off(conn) - else: - pragmas.set_synchronous_normal(conn) - if memstore: - pragmas.set_mem_temp_store(conn) - - for query in extra_queries: - conn.cursor().execute(query) +def initialize_sqlcipher_adbapi_db(opts, extra_queries=None): + from leap.soledad.client import sqlcipher_adbapi + return sqlcipher_adbapi.getConnectionPool( + opts, extra_queries=extra_queries) class SQLCipherOptions(object): @@ -151,22 +116,32 @@ class SQLCipherOptions(object): @classmethod def copy(cls, source, path=None, key=None, create=None, - is_raw_key=None, cipher=None, kdf_iter=None, cipher_page_size=None, - defer_encryption=None, sync_db_key=None): + is_raw_key=None, cipher=None, kdf_iter=None, + cipher_page_size=None, defer_encryption=None, sync_db_key=None): """ Return a copy of C{source} with parameters different than None replaced by new values. """ - return SQLCipherOptions( - path if path else source.path, - key if key else source.key, - create=create if create else source.create, - is_raw_key=is_raw_key if is_raw_key else source.is_raw_key, - cipher=cipher if cipher else source.cipher, - kdf_iter=kdf_iter if kdf_iter else source.kdf_iter, - cipher_page_size=cipher_page_size if cipher_page_size else source.cipher_page_size, - defer_encryption=defer_encryption if defer_encryption else source.defer_encryption, - sync_db_key=sync_db_key if sync_db_key else source.sync_db_key) + local_vars = locals() + args = [] + kwargs = {} + + for name in ["path", "key"]: + val = local_vars[name] + if val is not None: + args.append(val) + else: + args.append(getattr(source, name)) + + for name in ["create", "is_raw_key", "cipher", "kdf_iter", + "cipher_page_size", "defer_encryption", "sync_db_key"]: + val = local_vars[name] + if val is not None: + kwargs[name] = val + else: + kwargs[name] = getattr(source, name) + + return SQLCipherOptions(*args, **kwargs) def __init__(self, path, key, create=True, is_raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024, @@ -307,10 +282,9 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :rtype: str """ doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) - - # TODO XXX move to API XXX if self.defer_encryption: - self.sync_queue.put_nowait(doc) + # TODO move to api? + self._sync_enc_pool.enqueue_doc_for_encryption(doc) return doc_rev # @@ -440,7 +414,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): Soledad syncer implementation. """ - _sync_loop = None _sync_enc_pool = None """ @@ -450,13 +423,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db' """ - A dictionary that hold locks which avoid multiple sync attempts from the - same database replica. - """ - # XXX We do not need the lock here now. Remove. - encrypting_lock = threading.Lock() - - """ Period or recurrence of the Looping Call that will do the encryption to the syncdb (in seconds). """ @@ -468,19 +434,18 @@ class SQLCipherU1DBSync(SQLCipherDatabase): """ syncing_lock = defaultdict(threading.Lock) - def __init__(self, opts, soledad_crypto, replica_uid, + def __init__(self, opts, soledad_crypto, replica_uid, cert_file, defer_encryption=False): self._opts = opts self._path = opts.path self._crypto = soledad_crypto self.__replica_uid = replica_uid + self._cert_file = cert_file self._sync_db_key = opts.sync_db_key self._sync_db = None - self._sync_db_write_lock = None self._sync_enc_pool = None - self.sync_queue = None # we store syncers in a dictionary indexed by the target URL. We also # store a hash of the auth info in case auth info expires and we need @@ -490,8 +455,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # self._syncers = {'<url>': ('<auth_hash>', syncer), ...} self._syncers = {} - self._sync_db_write_lock = threading.Lock() - self.sync_queue = multiprocessing.Queue() self.running = False self._sync_threadpool = None @@ -503,25 +466,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._db_handle = None self._initialize_main_db() - if defer_encryption: - self._initialize_sync_db(opts) + # the sync_db is used both for deferred encryption and decryption, so + # we want to initialize it anyway to allow for all combinations of + # deferred encryption and decryption configurations. + self._initialize_sync_db(opts) + if defer_encryption: # initialize syncing queue encryption pool - self._sync_enc_pool = crypto.SyncEncrypterPool( - self._crypto, self._sync_db, self._sync_db_write_lock) - - # ----------------------------------------------------------------- - # From the documentation: If f returns a deferred, rescheduling - # will not take place until the deferred has fired. The result - # value is ignored. - - # TODO use this to avoid multiple sync attempts if the sync has not - # finished! - # ----------------------------------------------------------------- - - # XXX this was called sync_watcher --- trace any remnants - self._sync_loop = LoopingCall(self._encrypt_syncing_docs) - self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD) + self._sync_enc_pool = encdecpool.SyncEncrypterPool( + self._crypto, self._sync_db) self.shutdownID = None @@ -584,11 +537,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # somewhere else sync_opts = SQLCipherOptions.copy( opts, path=sync_db_path, create=True) - self._sync_db = initialize_sqlcipher_db( - sync_opts, on_init=self._sync_db_extra_init, - check_same_thread=False) - pragmas.set_crypto_pragmas(self._sync_db, opts) - # --------------------------------------------------------- + self._sync_db = getConnectionPool( + sync_opts, extra_queries=self._sync_db_extra_init) @property def _sync_db_extra_init(self): @@ -599,15 +549,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :rtype: tuple of strings """ maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)" - encr = crypto.SyncEncrypterPool - decr = crypto.SyncDecrypterPool + encr = encdecpool.SyncEncrypterPool + decr = encdecpool.SyncDecrypterPool sql_encr_table_query = (maybe_create % ( encr.TABLE_NAME, encr.FIELD_NAMES)) sql_decr_table_query = (maybe_create % ( decr.TABLE_NAME, decr.FIELD_NAMES)) return (sql_encr_table_query, sql_decr_table_query) - def sync(self, url, creds=None, autocreate=True, defer_decryption=True): + def sync(self, url, creds=None, defer_decryption=True): """ Synchronize documents with remote replica exposed at url. @@ -621,12 +571,9 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :param url: The url of the target replica to sync with. :type url: str - :param creds: - optional dictionary giving credentials. - to authorize the operation with the server. + :param creds: optional dictionary giving credentials to authorize the + operation with the server. :type creds: dict - :param autocreate: Ask the target to create the db if non-existent. - :type autocreate: bool :param defer_decryption: Whether to defer the decryption process using the intermediate database. If False, decryption will be done inline. @@ -637,49 +584,11 @@ class SQLCipherU1DBSync(SQLCipherDatabase): before the synchronisation was performed. :rtype: Deferred """ - kwargs = {'creds': creds, 'autocreate': autocreate, - 'defer_decryption': defer_decryption} - return self._defer_to_sync_threadpool(self._sync, url, **kwargs) - - def _sync(self, url, creds=None, autocreate=True, defer_decryption=True): - res = None - # the following context manager blocks until the syncing lock can be # acquired. - # TODO review, I think this is no longer needed with a 1-thread - # threadpool. - - log.msg("in _sync") - self.__url = url with self._syncer(url, creds=creds) as syncer: # XXX could mark the critical section here... - try: - log.msg('syncer sync...') - res = syncer.sync(autocreate=autocreate, - defer_decryption=defer_decryption) - - except PendingReceivedDocsSyncError: - logger.warning("Local sync db is not clear, skipping sync...") - return - except CannotSendRequest: - logger.warning("Connection with sync target couldn't be " - "established. Resetting connection...") - # closing the connection it will be recreated in the next try - syncer.sync_target.close() - return - - return res - - def stop_sync(self): - """ - Interrupt all ongoing syncs. - """ - self._stop_sync() - - def _stop_sync(self): - for url in self._syncers: - _, syncer = self._syncers[url] - syncer.stop() + return syncer.sync(defer_decryption=defer_decryption) @contextmanager def _syncer(self, url, creds=None): @@ -690,6 +599,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase): one instance synchronizing the same database replica at the same time. Because of that, this method blocks until the syncing lock can be acquired. + + :param creds: optional dictionary giving credentials to authorize the + operation with the server. + :type creds: dict """ with self.syncing_lock[self._path]: syncer = self._get_syncer(url, creds=creds) @@ -723,16 +636,17 @@ class SQLCipherU1DBSync(SQLCipherDatabase): h = sha256(json.dumps([url, creds])).hexdigest() cur_h, syncer = self._syncers.get(url, (None, None)) if syncer is None or h != cur_h: - wlock = self._sync_db_write_lock syncer = SoledadSynchronizer( self, - SoledadSyncTarget(url, - # XXX is the replica_uid ready? - self._replica_uid, - creds=creds, - crypto=self._crypto, - sync_db=self._sync_db, - sync_db_write_lock=wlock)) + SoledadHTTPSyncTarget( + url, + # XXX is the replica_uid ready? + self._replica_uid, + creds=creds, + crypto=self._crypto, + cert_file=self._cert_file, + sync_db=self._sync_db, + sync_enc_pool=self._sync_enc_pool)) self._syncers[url] = (h, syncer) # in order to reuse the same synchronizer multiple times we have to # reset its state (i.e. the number of documents received from target @@ -744,34 +658,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # Symmetric encryption of syncing docs # - def _encrypt_syncing_docs(self): - """ - Process the syncing queue and send the documents there - to be encrypted in the sync db. They will be read by the - SoledadSyncTarget during the sync_exchange. - - Called periodically from the LoopingCall self._sync_loop. - """ - # TODO should return a deferred that would firewhen the encryption is - # done. See note on __init__ - - lock = self.encrypting_lock - # optional wait flag used to avoid blocking - if not lock.acquire(False): - return - else: - queue = self.sync_queue - try: - while not queue.empty(): - doc = queue.get_nowait() - self._sync_enc_pool.encrypt_doc(doc) - - except Exception as exc: - logger.error("Error while encrypting docs to sync") - logger.exception(exc) - finally: - lock.release() - def get_generation(self): # FIXME # XXX this SHOULD BE a callback @@ -789,16 +675,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase): """ Close the syncer and syncdb orderly """ - # stop the sync loop for deferred encryption - if self._sync_loop is not None: - self._sync_loop.reset() - self._sync_loop.stop() - self._sync_loop = None # close all open syncers for url in self._syncers: - _, syncer = self._syncers[url] - syncer.close() - self._syncers = [] + del self._syncers[url] + # stop the encryption pool if self._sync_enc_pool is not None: self._sync_enc_pool.close() @@ -808,11 +688,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): if self._sync_db is not None: self._sync_db.close() self._sync_db = None - # close the sync queue - if self.sync_queue is not None: - self.sync_queue.close() - del self.sync_queue - self.sync_queue = None class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): @@ -903,3 +778,40 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False, has_conflicts=has_conflicts, syncable=syncable) sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) + + +# +# twisted.enterprise.adbapi SQLCipher implementation +# + +SQLCIPHER_CONNECTION_TIMEOUT = 10 + + +def getConnectionPool(opts, extra_queries=None): + openfun = partial( + pragmas.set_init_pragmas, + opts=opts, + extra_queries=extra_queries) + return SQLCipherConnectionPool( + database=opts.path, + check_same_thread=False, + cp_openfun=openfun, + timeout=SQLCIPHER_CONNECTION_TIMEOUT) + + +class SQLCipherConnection(adbapi.Connection): + pass + + +class SQLCipherTransaction(adbapi.Transaction): + pass + + +class SQLCipherConnectionPool(adbapi.ConnectionPool): + + connectionFactory = SQLCipherConnection + transactionFactory = SQLCipherTransaction + + def __init__(self, *args, **kwargs): + adbapi.ConnectionPool.__init__( + self, "pysqlcipher.dbapi2", *args, **kwargs) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index d3f106da..53172f31 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -16,17 +16,10 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. """ Soledad synchronization utilities. - -Extend u1db Synchronizer with the ability to: - - * Postpone the update of the known replica uid until all the decryption of - the incoming messages has been processed. - - * Be interrupted and recovered. """ import logging -import traceback -from threading import Lock + +from twisted.internet import defer from u1db import errors from u1db.sync import Synchronizer @@ -47,17 +40,8 @@ class SoledadSynchronizer(Synchronizer): Also modified to allow for interrupting the synchronization process. """ - # TODO can delegate the syncing to the api object, living in the reactor - # thread, and use a simple flag. - syncing_lock = Lock() - - def stop(self): - """ - Stop the current sync in progress. - """ - self.sync_target.stop() - - def sync(self, autocreate=False, defer_decryption=True): + @defer.inlineCallbacks + def sync(self, defer_decryption=True): """ Synchronize documents between source and target. @@ -69,48 +53,22 @@ class SoledadSynchronizer(Synchronizer): This is done to allow the ongoing parallel decryption of the incoming docs to proceed without `InvalidGeneration` conflicts. - :param autocreate: Whether the target replica should be created or not. - :type autocreate: bool :param defer_decryption: Whether to defer the decryption process using the intermediate database. If False, decryption will be done inline. :type defer_decryption: bool - """ - self.syncing_lock.acquire() - try: - return self._sync(autocreate=autocreate, - defer_decryption=defer_decryption) - except Exception: - # we want this exception to reach either SQLCipherU1DBSync.sync or - # the Solead api object itself, so it is poperly handled and/or - # logged... - raise - finally: - # ... but we also want to release the syncing lock so this - # Synchronizer may be reused later. - self.release_syncing_lock() - - def _sync(self, autocreate=False, defer_decryption=True): - """ - Helper function, called from the main `sync` method. - See `sync` docstring. + + :return: A deferred which will fire after the sync has finished. + :rtype: twisted.internet.defer.Deferred """ sync_target = self.sync_target # get target identifier, its current generation, # and its last-seen database generation for this source ensure_callback = None - try: - (self.target_replica_uid, target_gen, target_trans_id, - target_my_gen, target_my_trans_id) = \ - sync_target.get_sync_info(self.source._replica_uid) - except errors.DatabaseDoesNotExist: - if not autocreate: - raise - # will try to ask sync_exchange() to create the db - self.target_replica_uid = None - target_gen, target_trans_id = (0, '') - target_my_gen, target_my_trans_id = (0, '') + (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id) = yield \ + sync_target.get_sync_info(self.source._replica_uid) logger.debug( "Soledad target sync info:\n" @@ -151,15 +109,15 @@ class SoledadSynchronizer(Synchronizer): self.target_replica_uid) logger.debug( "Soledad source sync info:\n" - " source target gen: %d\n" - " source target trans_id: %s" + " last target gen known to source: %d\n" + " last target trans_id known to source: %s" % (target_last_known_gen, target_last_known_trans_id)) # validate transaction ids if not changes and target_last_known_gen == target_gen: if target_trans_id != target_last_known_trans_id: raise errors.InvalidTransactionId - return my_gen + defer.returnValue(my_gen) # prepare to send all the changed docs changed_doc_ids = [doc_id for doc_id, _, _ in changes] @@ -174,40 +132,26 @@ class SoledadSynchronizer(Synchronizer): # exchange documents and try to insert the returned ones with # the target, return target synced-up-to gen. - # - # The sync_exchange method may be interrupted, in which case it will - # return a tuple of Nones. - try: - new_gen, new_trans_id = sync_target.sync_exchange( - docs_by_generation, self.source._replica_uid, - target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback, - defer_decryption=defer_decryption) - logger.debug( - "Soledad source sync info after sync exchange:\n" - " source target gen: %d\n" - " source target trans_id: %s" - % (new_gen, new_trans_id)) - info = { - "target_replica_uid": self.target_replica_uid, - "new_gen": new_gen, - "new_trans_id": new_trans_id, - "my_gen": my_gen - } - self._syncing_info = info - if defer_decryption and not sync_target.has_syncdb(): - logger.debug("Sync target has no valid sync db, " - "aborting defer_decryption") - defer_decryption = False - self.complete_sync() - except Exception as e: - logger.error("Soledad sync error: %s" % str(e)) - logger.error(traceback.format_exc()) - sync_target.stop() - finally: - sync_target.close() - - return my_gen + new_gen, new_trans_id = yield sync_target.sync_exchange( + docs_by_generation, self.source._replica_uid, + target_last_known_gen, target_last_known_trans_id, + self._insert_doc_from_target, ensure_callback=ensure_callback, + defer_decryption=defer_decryption) + logger.debug( + "Soledad source sync info after sync exchange:\n" + " source known target gen: %d\n" + " source known target trans_id: %s" + % (new_gen, new_trans_id)) + info = { + "target_replica_uid": self.target_replica_uid, + "new_gen": new_gen, + "new_trans_id": new_trans_id, + "my_gen": my_gen + } + self._syncing_info = info + yield self.complete_sync() + + defer.returnValue(my_gen) def complete_sync(self): """ @@ -215,6 +159,9 @@ class SoledadSynchronizer(Synchronizer): (a) record last known generation and transaction uid for the remote replica, and (b) make target aware of our current reached generation. + + :return: A deferred which will fire when the sync has been completed. + :rtype: twisted.internet.defer.Deferred """ logger.debug("Completing deferred last step in SYNC...") @@ -225,39 +172,23 @@ class SoledadSynchronizer(Synchronizer): info["target_replica_uid"], info["new_gen"], info["new_trans_id"]) # if gapless record current reached generation with target - self._record_sync_info_with_the_target(info["my_gen"]) - - @property - def syncing(self): - """ - Return True if a sync is ongoing, False otherwise. - :rtype: bool - """ - # XXX FIXME we need some mechanism for timeout: should cleanup and - # release if something in the syncdb-decrypt goes wrong. we could keep - # track of the release date and cleanup unrealistic sync entries after - # some time. + return self._record_sync_info_with_the_target(info["my_gen"]) - # TODO use cancellable deferreds instead - locked = self.syncing_lock.locked() - return locked - - def release_syncing_lock(self): - """ - Release syncing lock if it's locked. + def _record_sync_info_with_the_target(self, start_generation): """ - if self.syncing_lock.locked(): - self.syncing_lock.release() + Store local replica metadata in server. - def close(self): - """ - Close sync target pool of workers. - """ - self.release_syncing_lock() - self.sync_target.close() + :param start_generation: The local generation when the sync was + started. + :type start_generation: int - def __del__(self): - """ - Cleanup: release lock. + :return: A deferred which will fire when the operation has been + completed. + :rtype: twisted.internet.defer.Deferred """ - self.release_syncing_lock() + cur_gen, trans_id = self.source._get_generation_info() + if (cur_gen == start_generation + self.num_inserted + and self.num_inserted > 0): + return self.sync_target.record_sync_info( + self.source._replica_uid, cur_gen, trans_id) + return defer.succeed(None) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py deleted file mode 100644 index 986bd991..00000000 --- a/client/src/leap/soledad/client/target.py +++ /dev/null @@ -1,1517 +0,0 @@ -# -*- coding: utf-8 -*- -# target.py -# Copyright (C) 2013, 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -A U1DB backend for encrypting data before sending to server and decrypting -after receiving. -""" -import cStringIO -import gzip -import logging -import re -import urllib -import threading - -from collections import defaultdict -from time import sleep -from uuid import uuid4 - -import simplejson as json - -from u1db import errors -from u1db.remote import utils, http_errors -from u1db.remote.http_target import HTTPSyncTarget -from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase -from zope.proxy import ProxyBase -from zope.proxy import sameProxiedObjects, setProxiedObject - -from twisted.internet.task import LoopingCall - -from leap.soledad.common.document import SoledadDocument -from leap.soledad.client.auth import TokenBasedAuth -from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.crypto import encrypt_doc, decrypt_doc -from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool -from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS -from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS -from leap.soledad.client.events import signal - - -logger = logging.getLogger(__name__) - - -def _gunzip(data): - """ - Uncompress data that is gzipped. - - :param data: gzipped data - :type data: basestring - """ - buffer = cStringIO.StringIO() - buffer.write(data) - buffer.seek(0) - try: - data = gzip.GzipFile(mode='r', fileobj=buffer).read() - except Exception: - logger.warning("Error while decrypting gzipped data") - buffer.close() - return data - - -class PendingReceivedDocsSyncError(Exception): - pass - - -class DocumentSyncerThread(threading.Thread): - """ - A thread that knowns how to either send or receive a document during the - sync process. - """ - - def __init__(self, doc_syncer, release_method, failed_method, - idx, total, last_request_lock=None, last_callback_lock=None): - """ - Initialize a new syncer thread. - - :param doc_syncer: A document syncer. - :type doc_syncer: HTTPDocumentSyncer - :param release_method: A method to be called when finished running. - :type release_method: callable(DocumentSyncerThread) - :param failed_method: A method to be called when we failed. - :type failed_method: callable(DocumentSyncerThread) - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - :param last_request_lock: A lock to wait for before actually performing - the request. - :type last_request_lock: threading.Lock - :param last_callback_lock: A lock to wait for before actually running - the success callback. - :type last_callback_lock: threading.Lock - """ - threading.Thread.__init__(self) - self._doc_syncer = doc_syncer - self._release_method = release_method - self._failed_method = failed_method - self._idx = idx - self._total = total - self._last_request_lock = last_request_lock - self._last_callback_lock = last_callback_lock - self._response = None - self._exception = None - self._result = None - self._success = False - # a lock so we can signal when we're finished - self._request_lock = threading.Lock() - self._request_lock.acquire() - self._callback_lock = threading.Lock() - self._callback_lock.acquire() - # make thread interruptable - self._stopped = None - self._stop_lock = threading.Lock() - - def run(self): - """ - Run the HTTP request and store results. - - This method will block and wait for an eventual previous operation to - finish before actually performing the request. It also traps any - exception and register any failure with the request. - """ - with self._stop_lock: - if self._stopped is None: - self._stopped = False - else: - return - - # eventually wait for the previous thread to finish - if self._last_request_lock is not None: - self._last_request_lock.acquire() - - # bail out in case we've been interrupted - if self.stopped is True: - return - - try: - self._response = self._doc_syncer.do_request() - self._request_lock.release() - - # run success callback - if self._doc_syncer.success_callback is not None: - - # eventually wait for callback lock release - if self._last_callback_lock is not None: - self._last_callback_lock.acquire() - - # bail out in case we've been interrupted - if self._stopped is True: - return - - self._result = self._doc_syncer.success_callback( - self._idx, self._total, self._response) - self._success = True - doc_syncer = self._doc_syncer - self._release_method(self, doc_syncer) - self._doc_syncer = None - # let next thread executed its callback - self._callback_lock.release() - - # trap any exception and signal failure - except Exception as e: - self._exception = e - self._success = False - # run failure callback - if self._doc_syncer.failure_callback is not None: - - # eventually wait for callback lock release - if self._last_callback_lock is not None: - self._last_callback_lock.acquire() - - # bail out in case we've been interrupted - if self.stopped is True: - return - - self._doc_syncer.failure_callback( - self._idx, self._total, self._exception) - - self._failed_method() - # we do not release the callback lock here because we - # failed and so we don't want other threads to succeed. - - @property - def doc_syncer(self): - return self._doc_syncer - - @property - def response(self): - return self._response - - @property - def exception(self): - return self._exception - - @property - def callback_lock(self): - return self._callback_lock - - @property - def request_lock(self): - return self._request_lock - - @property - def success(self): - return self._success - - def stop(self): - with self._stop_lock: - self._stopped = True - - @property - def stopped(self): - with self._stop_lock: - return self._stopped - - @property - def result(self): - return self._result - - -class DocumentSyncerPool(object): - """ - A pool of reusable document syncers. - """ - - POOL_SIZE = 10 - """ - The maximum amount of syncer threads running at the same time. - """ - - def __init__(self, raw_url, raw_creds, query_string, headers, - ensure_callback, stop_method): - """ - Initialize the document syncer pool. - - :param raw_url: The complete raw URL for the HTTP request. - :type raw_url: str - :param raw_creds: The credentials for the HTTP request. - :type raw_creds: dict - :param query_string: The query string for the HTTP request. - :type query_string: str - :param headers: The headers for the HTTP request. - :type headers: dict - :param ensure_callback: A callback to ensure we have the correct - target_replica_uid, if it was just created. - :type ensure_callback: callable - - """ - # save syncer params - self._raw_url = raw_url - self._raw_creds = raw_creds - self._query_string = query_string - self._headers = headers - self._ensure_callback = ensure_callback - self._stop_method = stop_method - # pool attributes - self._failures = False - self._semaphore_pool = threading.BoundedSemaphore( - DocumentSyncerPool.POOL_SIZE) - self._pool_access_lock = threading.Lock() - self._doc_syncers = [] - self._threads = [] - - def new_syncer_thread(self, idx, total, last_request_lock=None, - last_callback_lock=None): - """ - Yield a new document syncer thread. - - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - :param last_request_lock: A lock to wait for before actually performing - the request. - :type last_request_lock: threading.Lock - :param last_callback_lock: A lock to wait for before actually running - the success callback. - :type last_callback_lock: threading.Lock - """ - t = None - # wait for available threads - self._semaphore_pool.acquire() - with self._pool_access_lock: - if self._failures is True: - return None - # get a syncer - doc_syncer = self._get_syncer() - # we rely on DocumentSyncerThread.run() to release the lock using - # self.release_syncer so we can launch a new thread. - t = DocumentSyncerThread( - doc_syncer, self.release_syncer, self.cancel_threads, - idx, total, - last_request_lock=last_request_lock, - last_callback_lock=last_callback_lock) - self._threads.append(t) - return t - - def _failed(self): - with self._pool_access_lock: - self._failures = True - - @property - def failures(self): - return self._failures - - def _get_syncer(self): - """ - Get a document syncer from the pool. - - This method will create a new syncer whenever there is no syncer - available in the pool. - - :return: A syncer. - :rtype: HTTPDocumentSyncer - """ - syncer = None - # get an available syncer or create a new one - try: - syncer = self._doc_syncers.pop() - except IndexError: - syncer = HTTPDocumentSyncer( - self._raw_url, self._raw_creds, self._query_string, - self._headers, self._ensure_callback) - return syncer - - def release_syncer(self, syncer_thread, doc_syncer): - """ - Return a syncer to the pool after use and check for any failures. - - :param syncer: The syncer to be returned to the pool. - :type syncer: HTTPDocumentSyncer - """ - with self._pool_access_lock: - self._doc_syncers.append(doc_syncer) - if syncer_thread.success is True: - self._threads.remove(syncer_thread) - self._semaphore_pool.release() - - def cancel_threads(self): - """ - Stop all threads in the pool. - """ - # stop sync - self._stop_method() - stopped = [] - # stop all threads - logger.warning("Soledad sync: cancelling sync threads...") - with self._pool_access_lock: - self._failures = True - while self._threads: - t = self._threads.pop(0) - t.stop() - self._doc_syncers.append(t.doc_syncer) - stopped.append(t) - # release locks and join - while stopped: - t = stopped.pop(0) - t.request_lock.acquire(False) # just in case - t.request_lock.release() - t.callback_lock.acquire(False) # just in case - t.callback_lock.release() - # release any blocking semaphores - for i in xrange(DocumentSyncerPool.POOL_SIZE): - try: - self._semaphore_pool.release() - except ValueError: - break - logger.warning("Soledad sync: cancelled sync threads.") - - def cleanup(self): - """ - Close and remove any syncers from the pool. - """ - with self._pool_access_lock: - while self._doc_syncers: - syncer = self._doc_syncers.pop() - syncer.close() - del syncer - - -class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): - - def __init__(self, raw_url, creds, query_string, headers, ensure_callback): - """ - Initialize the client. - - :param raw_url: The raw URL of the target HTTP server. - :type raw_url: str - :param creds: Authentication credentials. - :type creds: dict - :param query_string: The query string for the HTTP request. - :type query_string: str - :param headers: The headers for the HTTP request. - :type headers: dict - :param ensure_callback: A callback to ensure we have the correct - target_replica_uid, if it was just created. - :type ensure_callback: callable - """ - HTTPClientBase.__init__(self, raw_url, creds=creds) - # info needed to perform the request - self._query_string = query_string - self._headers = headers - self._ensure_callback = ensure_callback - # the actual request method - self._request_method = None - self._success_callback = None - self._failure_callback = None - - def _reset(self): - """ - Reset this document syncer so we can reuse it. - """ - self._request_method = None - self._success_callback = None - self._failure_callback = None - self._request_method = None - - def set_request_method(self, method, *args, **kwargs): - """ - Set the actual method to perform the request. - - :param method: Either 'get' or 'put'. - :type method: str - :param args: Arguments for the request method. - :type args: list - :param kwargs: Keyworded arguments for the request method. - :type kwargs: dict - """ - self._reset() - # resolve request method - if method is 'get': - self._request_method = self._get_doc - elif method is 'put': - self._request_method = self._put_doc - else: - raise Exception - # store request method args - self._args = args - self._kwargs = kwargs - - def set_success_callback(self, callback): - self._success_callback = callback - - def set_failure_callback(self, callback): - self._failure_callback = callback - - @property - def success_callback(self): - return self._success_callback - - @property - def failure_callback(self): - return self._failure_callback - - def do_request(self): - """ - Actually perform the request. - - :return: The body and headers of the response. - :rtype: tuple - """ - self._ensure_connection() - args = self._args - kwargs = self._kwargs - return self._request_method(*args, **kwargs) - - def _request(self, method, url_parts, params=None, body=None, - content_type=None): - """ - Perform an HTTP request. - - :param method: The HTTP request method. - :type method: str - :param url_parts: A list representing the request path. - :type url_parts: list - :param params: Parameters for the URL query string. - :type params: dict - :param body: The body of the request. - :type body: str - :param content-type: The content-type of the request. - :type content-type: str - - :return: The body and headers of the response. - :rtype: tuple - - :raise errors.Unavailable: Raised after a number of unsuccesful - request attempts. - :raise Exception: Raised for any other exception ocurring during the - request. - """ - - self._ensure_connection() - unquoted_url = url_query = self._url.path - if url_parts: - if not url_query.endswith('/'): - url_query += '/' - unquoted_url = url_query - url_query += '/'.join(urllib.quote(part, safe='') - for part in url_parts) - # oauth performs its own quoting - unquoted_url += '/'.join(url_parts) - encoded_params = {} - if params: - for key, value in params.items(): - key = unicode(key).encode('utf-8') - encoded_params[key] = _encode_query_parameter(value) - url_query += ('?' + urllib.urlencode(encoded_params)) - if body is not None and not isinstance(body, basestring): - body = json.dumps(body) - content_type = 'application/json' - headers = {} - if content_type: - headers['content-type'] = content_type - - # Patched: We would like to receive gzip pretty please - # ---------------------------------------------------- - headers['accept-encoding'] = "gzip" - # ---------------------------------------------------- - - headers.update( - self._sign_request(method, unquoted_url, encoded_params)) - - for delay in self._delays: - try: - self._conn.request(method, url_query, body, headers) - return self._response() - except errors.Unavailable, e: - sleep(delay) - raise e - - def _response(self): - """ - Return the response of the (possibly gzipped) HTTP request. - - :return: The body and headers of the response. - :rtype: tuple - """ - resp = self._conn.getresponse() - body = resp.read() - headers = dict(resp.getheaders()) - - # Patched: We would like to decode gzip - # ---------------------------------------------------- - encoding = headers.get('content-encoding', '') - if "gzip" in encoding: - body = _gunzip(body) - # ---------------------------------------------------- - - if resp.status in (200, 201): - return body, headers - elif resp.status in http_errors.ERROR_STATUSES: - try: - respdic = json.loads(body) - except ValueError: - pass - else: - self._error(respdic) - # special case - if resp.status == 503: - raise errors.Unavailable(body, headers) - raise errors.HTTPError(resp.status, body, headers) - - def _prepare(self, comma, entries, **dic): - """ - Prepare an entry to be sent through a syncing POST request. - - :param comma: A string to be prepended to the current entry. - :type comma: str - :param entries: A list of entries accumulated to be sent on the - request. - :type entries: list - :param dic: The data to be included in this entry. - :type dic: dict - - :return: The size of the prepared entry. - :rtype: int - """ - entry = comma + '\r\n' + json.dumps(dic) - entries.append(entry) - return len(entry) - - def _init_post_request(self, action, content_length): - """ - Initiate a syncing POST request. - - :param url: The syncing URL. - :type url: str - :param action: The syncing action, either 'get' or 'receive'. - :type action: str - :param headers: The initial headers to be sent on this request. - :type headers: dict - :param content_length: The content-length of the request. - :type content_length: int - """ - self._conn.putrequest('POST', self._query_string) - self._conn.putheader( - 'content-type', 'application/x-soledad-sync-%s' % action) - for header_name, header_value in self._headers: - self._conn.putheader(header_name, header_value) - self._conn.putheader('accept-encoding', 'gzip') - self._conn.putheader('content-length', str(content_length)) - self._conn.endheaders() - - def _get_doc(self, received, sync_id, last_known_generation, - last_known_trans_id): - """ - Get a sync document from server by means of a POST request. - - :param received: The number of documents already received in the - current sync session. - :type received: int - :param sync_id: The id for the current sync session. - :type sync_id: str - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - - :return: The body and headers of the response. - :rtype: tuple - """ - entries = ['['] - size = 1 - # add remote replica metadata to the request - size += self._prepare( - '', entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=self._ensure_callback is not None) - # inform server of how many documents have already been received - size += self._prepare( - ',', entries, received=received) - entries.append('\r\n]') - size += len(entries[-1]) - # send headers - self._init_post_request('get', size) - # get document - for entry in entries: - self._conn.send(entry) - return self._response() - - def _put_doc(self, sync_id, last_known_generation, last_known_trans_id, - id, rev, content, gen, trans_id, number_of_docs, doc_idx): - """ - Put a sync document on server by means of a POST request. - - :param sync_id: The id for the current sync session. - :type sync_id: str - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - :param id: The document id. - :type id: str - :param rev: The document revision. - :type rev: str - :param content: The serialized document content. - :type content: str - :param gen: The generation of the modification of the document. - :type gen: int - :param trans_id: The transaction id of the modification of the - document. - :type trans_id: str - :param number_of_docs: The total amount of documents sent on this sync - session. - :type number_of_docs: int - :param doc_idx: The index of the current document being sent. - :type doc_idx: int - - :return: The body and headers of the response. - :rtype: tuple - """ - # prepare to send the document - entries = ['['] - size = 1 - # add remote replica metadata to the request - size += self._prepare( - '', entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=self._ensure_callback is not None) - # add the document to the request - size += self._prepare( - ',', entries, - id=id, rev=rev, content=content, gen=gen, trans_id=trans_id, - number_of_docs=number_of_docs, doc_idx=doc_idx) - entries.append('\r\n]') - size += len(entries[-1]) - # send headers - self._init_post_request('put', size) - # send document - for entry in entries: - self._conn.send(entry) - return self._response() - - def _sign_request(self, method, url_query, params): - """ - Return an authorization header to be included in the HTTP request. - - :param method: The HTTP method. - :type method: str - :param url_query: The URL query string. - :type url_query: str - :param params: A list with encoded query parameters. - :type param: list - - :return: The Authorization header. - :rtype: list of tuple - """ - return TokenBasedAuth._sign_request(self, method, url_query, params) - - def set_token_credentials(self, uuid, token): - """ - Store given credentials so we can sign the request later. - - :param uuid: The user's uuid. - :type uuid: str - :param token: The authentication token. - :type token: str - """ - TokenBasedAuth.set_token_credentials(self, uuid, token) - - -class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): - """ - A SyncTarget that encrypts data before sending and decrypts data after - receiving. - - Normally encryption will have been written to the sync database upon - document modification. The sync database is also used to write temporarily - the parsed documents that the remote send us, before being decrypted and - written to the main database. - """ - - # will later keep a reference to the insert-doc callback - # passed to sync_exchange - _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) - - """ - Period of recurrence of the periodic decrypting task, in seconds. - """ - DECRYPT_LOOP_PERIOD = 0.5 - - # - # Modified HTTPSyncTarget methods. - # - - def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, - sync_db=None, sync_db_write_lock=None): - """ - Initialize the SoledadSyncTarget. - - :param source_replica_uid: The source replica uid which we use when - deferring decryption. - :type source_replica_uid: str - :param url: The url of the target replica to sync with. - :type url: str - :param creds: Optional dictionary giving credentials. - to authorize the operation with the server. - :type creds: dict - :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt - document contents when syncing. - :type crypto: soledad.crypto.SoledadCrypto - :param sync_db: Optional. handler for the db with the symmetric - encryption of the syncing documents. If - None, encryption will be done in-place, - instead of retreiving it from the dedicated - database. - :type sync_db: Sqlite handler - :param sync_db_write_lock: a write lock for controlling concurrent - access to the sync_db - :type sync_db_write_lock: threading.Lock - """ - HTTPSyncTarget.__init__(self, url, creds) - self._raw_url = url - self._raw_creds = creds - self._crypto = crypto - self._stopped = True - self._stop_lock = threading.Lock() - self._sync_exchange_lock = threading.Lock() - self.source_replica_uid = source_replica_uid - self._defer_decryption = False - self._syncer_pool = None - - # deferred decryption attributes - self._sync_db = None - self._sync_db_write_lock = None - self._decryption_callback = None - self._sync_decr_pool = None - self._sync_loop = None - if sync_db and sync_db_write_lock is not None: - self._sync_db = sync_db - self._sync_db_write_lock = sync_db_write_lock - - def _setup_sync_decr_pool(self): - """ - Set up the SyncDecrypterPool for deferred decryption. - """ - if self._sync_decr_pool is None: - # initialize syncing queue decryption pool - self._sync_decr_pool = SyncDecrypterPool( - self._crypto, self._sync_db, - self._sync_db_write_lock, - insert_doc_cb=self._insert_doc_cb) - self._sync_decr_pool.set_source_replica_uid( - self.source_replica_uid) - - def _teardown_sync_decr_pool(self): - """ - Tear down the SyncDecrypterPool. - """ - if self._sync_decr_pool is not None: - self._sync_decr_pool.close() - self._sync_decr_pool = None - - def _setup_sync_loop(self): - """ - Set up the sync loop for deferred decryption. - """ - if self._sync_loop is None: - self._sync_loop = LoopingCall( - self._decrypt_syncing_received_docs) - self._sync_loop.start(self.DECRYPT_LOOP_PERIOD) - - def _teardown_sync_loop(self): - """ - Tear down the sync loop. - """ - if self._sync_loop is not None: - self._sync_loop.stop() - self._sync_loop = None - - def _get_replica_uid(self, url): - """ - Return replica uid from the url, or None. - - :param url: the replica url - :type url: str - """ - replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url) - return replica_uid_match[0] if len(replica_uid_match) > 0 else None - - @staticmethod - def connect(url, source_replica_uid=None, crypto=None): - return SoledadSyncTarget( - url, source_replica_uid=source_replica_uid, crypto=crypto) - - def _parse_received_doc_response(self, response): - """ - Parse the response from the server containing the received document. - - :param response: The body and headers of the response. - :type response: tuple(str, dict) - """ - data, _ = response - # decode incoming stream - parts = data.splitlines() - if not parts or parts[0] != '[' or parts[-1] != ']': - raise errors.BrokenSyncStream - data = parts[1:-1] - # decode metadata - line, comma = utils.check_and_strip_comma(data[0]) - metadata = None - try: - metadata = json.loads(line) - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - number_of_changes = metadata['number_of_changes'] - except (json.JSONDecodeError, KeyError): - raise errors.BrokenSyncStream - # make sure we have replica_uid from fresh new dbs - if self._ensure_callback and 'replica_uid' in metadata: - self._ensure_callback(metadata['replica_uid']) - # parse incoming document info - doc_id = None - rev = None - content = None - gen = None - trans_id = None - if number_of_changes > 0: - try: - entry = json.loads(data[1]) - doc_id = entry['id'] - rev = entry['rev'] - content = entry['content'] - gen = entry['gen'] - trans_id = entry['trans_id'] - except (IndexError, KeyError): - raise errors.BrokenSyncStream - return new_generation, new_transaction_id, number_of_changes, \ - doc_id, rev, content, gen, trans_id - - def _insert_received_doc(self, idx, total, response): - """ - Insert a received document into the local replica. - - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - :param response: The body and headers of the response. - :type response: tuple(str, dict) - """ - new_generation, new_transaction_id, number_of_changes, doc_id, \ - rev, content, gen, trans_id = \ - self._parse_received_doc_response(response) - if doc_id is not None: - # decrypt incoming document and insert into local database - # ------------------------------------------------------------- - # symmetric decryption of document's contents - # ------------------------------------------------------------- - # If arriving content was symmetrically encrypted, we decrypt it. - # We do it inline if defer_decryption flag is False or no sync_db - # was defined, otherwise we defer it writing it to the received - # docs table. - doc = SoledadDocument(doc_id, rev, content) - if is_symmetrically_encrypted(doc): - if self._queue_for_decrypt: - self._save_encrypted_received_doc( - doc, gen, trans_id, idx, total) - else: - # defer_decryption is False or no-sync-db fallback - doc.set_json(decrypt_doc(self._crypto, doc)) - self._return_doc_cb(doc, gen, trans_id) - else: - # not symmetrically encrypted doc, insert it directly - # or save it in the decrypted stage. - if self._queue_for_decrypt: - self._save_received_doc(doc, gen, trans_id, idx, total) - else: - self._return_doc_cb(doc, gen, trans_id) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- - msg = "%d/%d" % (idx + 1, total) - signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg) - logger.debug("Soledad sync receive status: %s" % msg) - return number_of_changes, new_generation, new_transaction_id - - def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, - headers, return_doc_cb, ensure_callback, sync_id, - defer_decryption=False): - """ - Fetch sync documents from the remote database and insert them in the - local database. - - If an incoming document's encryption scheme is equal to - EncryptionSchemes.SYMKEY, then this method will decrypt it with - Soledad's symmetric key. - - :param url: The syncing URL. - :type url: str - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - :param headers: The headers of the HTTP request. - :type headers: dict - :param return_doc_cb: A callback to insert docs from target. - :type return_doc_cb: callable - :param ensure_callback: A callback to ensure we have the correct - target_replica_uid, if it was just created. - :type ensure_callback: callable - :param sync_id: The id for the current sync session. - :type sync_id: str - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - - :raise BrokenSyncStream: If `data` is malformed. - - :return: A dictionary representing the first line of the response got - from remote replica. - :rtype: dict - """ - # we keep a reference to the callback in case we defer the decryption - self._return_doc_cb = return_doc_cb - self._queue_for_decrypt = defer_decryption \ - and self._sync_db is not None - - new_generation = last_known_generation - new_transaction_id = last_known_trans_id - - if self._queue_for_decrypt: - logger.debug( - "Soledad sync: will queue received docs for decrypting.") - - idx = 0 - number_of_changes = 1 - - first_request = True - last_callback_lock = None - threads = [] - - # get incoming documents - while idx < number_of_changes: - # bail out if sync process was interrupted - if self.stopped is True: - break - - # launch a thread to fetch one document from target - t = self._syncer_pool.new_syncer_thread( - idx, number_of_changes, - last_callback_lock=last_callback_lock) - - # bail out if any thread failed - if t is None: - self.stop() - break - - t.doc_syncer.set_request_method( - 'get', idx, sync_id, last_known_generation, - last_known_trans_id) - t.doc_syncer.set_success_callback(self._insert_received_doc) - - def _failure_callback(idx, total, exception): - _failure_msg = "Soledad sync: error while getting document " \ - "%d/%d: %s" \ - % (idx + 1, total, exception) - logger.warning("%s" % _failure_msg) - logger.warning("Soledad sync: failing gracefully, will " - "recover on next sync.") - - t.doc_syncer.set_failure_callback(_failure_callback) - threads.append(t) - t.start() - last_callback_lock = t.callback_lock - idx += 1 - - # if this is the first request, wait to update the number of - # changes - if first_request is True: - t.join() - if t.success: - number_of_changes, _, _ = t.result - else: - raise t.exception - first_request = False - - # make sure all threads finished and we have up-to-date info - last_successful_thread = None - while threads: - # check if there are failures - t = threads.pop(0) - t.join() - if t.success: - last_successful_thread = t - else: - raise t.exception - - # get information about last successful thread - if last_successful_thread is not None: - body, _ = last_successful_thread.response - parsed_body = json.loads(body) - # get current target gen and trans id in case no documents were - # transferred - if len(parsed_body) == 1: - metadata = parsed_body[0] - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - # get current target gen and trans id from last transferred - # document - else: - doc_data = parsed_body[1] - new_generation = doc_data['gen'] - new_transaction_id = doc_data['trans_id'] - - return new_generation, new_transaction_id - - def sync_exchange(self, docs_by_generations, - source_replica_uid, last_known_generation, - last_known_trans_id, return_doc_cb, - ensure_callback=None, defer_decryption=True, - sync_id=None): - """ - Find out which documents the remote database does not know about, - encrypt and send them. - - This does the same as the parent's method but encrypts content before - syncing. - - :param docs_by_generations: A list of (doc_id, generation, trans_id) - of local documents that were changed since - the last local generation the remote - replica knows about. - :type docs_by_generations: list of tuples - - :param source_replica_uid: The uid of the source replica. - :type source_replica_uid: str - - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - - :param return_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type return_doc_cb: function - - :param ensure_callback: A callback that ensures we know the target - replica uid if the target replica was just - created. - :type ensure_callback: function - - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - - :return: The new generation and transaction id of the target replica. - :rtype: tuple - """ - self._ensure_callback = ensure_callback - - if defer_decryption and self._sync_db is not None: - self._sync_exchange_lock.acquire() - self._setup_sync_decr_pool() - self._setup_sync_loop() - self._defer_decryption = True - else: - # fall back - defer_decryption = False - - self.start() - - if sync_id is None: - sync_id = str(uuid4()) - self.source_replica_uid = source_replica_uid - # let the decrypter pool access the passed callback to insert docs - setProxiedObject(self._insert_doc_cb[source_replica_uid], - return_doc_cb) - - # empty the database before starting a new sync - if defer_decryption is True and not self.clear_to_sync(): - self._sync_decr_pool.empty() - - self._ensure_connection() - if self._trace_hook: # for tests - self._trace_hook('sync_exchange') - url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) - headers = self._sign_request('POST', url, {}) - - cur_target_gen = last_known_generation - cur_target_trans_id = last_known_trans_id - - # send docs - msg = "%d/%d" % (0, len(docs_by_generations)) - signal(SOLEDAD_SYNC_SEND_STATUS, msg) - logger.debug("Soledad sync send status: %s" % msg) - - defer_encryption = self._sync_db is not None - self._syncer_pool = DocumentSyncerPool( - self._raw_url, self._raw_creds, url, headers, ensure_callback, - self.stop_syncer) - threads = [] - last_callback_lock = None - sent = 0 - total = len(docs_by_generations) - - synced = [] - number_of_docs = len(docs_by_generations) - - last_request_lock = None - for doc, gen, trans_id in docs_by_generations: - # allow for interrupting the sync process - if self.stopped is True: - break - - # skip non-syncable docs - if isinstance(doc, SoledadDocument) and not doc.syncable: - continue - - # ------------------------------------------------------------- - # symmetric encryption of document's contents - # ------------------------------------------------------------- - doc_json = doc.get_json() - if not doc.is_tombstone(): - if not defer_encryption: - # fallback case, for tests - doc_json = encrypt_doc(self._crypto, doc) - else: - try: - doc_json = self.get_encrypted_doc_from_db( - doc.doc_id, doc.rev) - except Exception as exc: - logger.error("Error while getting " - "encrypted doc from db") - logger.exception(exc) - continue - if doc_json is None: - # Not marked as tombstone, but we got nothing - # from the sync db. As it is not encrypted yet, we - # force inline encryption. - # TODO: implement a queue to deal with these cases. - doc_json = encrypt_doc(self._crypto, doc) - # ------------------------------------------------------------- - # end of symmetric encryption - # ------------------------------------------------------------- - t = self._syncer_pool.new_syncer_thread( - sent + 1, total, last_request_lock=last_request_lock, - last_callback_lock=last_callback_lock) - - # bail out if any thread failed - if t is None: - self.stop() - break - - # set the request method - t.doc_syncer.set_request_method( - 'put', sync_id, cur_target_gen, cur_target_trans_id, - id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, - trans_id=trans_id, number_of_docs=number_of_docs, - doc_idx=sent + 1) - # set the success calback - - def _success_callback(idx, total, response): - _success_msg = "Soledad sync send status: %d/%d" \ - % (idx, total) - signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) - logger.debug(_success_msg) - - t.doc_syncer.set_success_callback(_success_callback) - - # set the failure callback - def _failure_callback(idx, total, exception): - _failure_msg = "Soledad sync: error while sending document " \ - "%d/%d: %s" % (idx, total, exception) - logger.warning("%s" % _failure_msg) - logger.warning("Soledad sync: failing gracefully, will " - "recover on next sync.") - - t.doc_syncer.set_failure_callback(_failure_callback) - - # save thread and append - t.start() - threads.append((t, doc)) - - # update lock references so they can be used in next call to - # syncer_pool.new_syncer_thread() above - last_callback_lock = t.callback_lock - last_request_lock = t.request_lock - - sent += 1 - - # make sure all threads finished and we have up-to-date info - last_successful_thread = None - while threads: - # check if there are failures - t, doc = threads.pop(0) - t.join() - if t.success: - synced.append((doc.doc_id, doc.rev)) - last_successful_thread = t - else: - raise t.exception - - # delete documents from the sync database - if defer_encryption: - self.delete_encrypted_docs_from_db(synced) - - # get target gen and trans_id after docs - gen_after_send = None - trans_id_after_send = None - if last_successful_thread is not None: - response_dict = json.loads(last_successful_thread.response[0])[0] - gen_after_send = response_dict['new_generation'] - trans_id_after_send = response_dict['new_transaction_id'] - - # get docs from target - if self.stopped is False: - cur_target_gen, cur_target_trans_id = self._get_remote_docs( - url, - last_known_generation, last_known_trans_id, headers, - return_doc_cb, ensure_callback, sync_id, - defer_decryption=defer_decryption) - - self._syncer_pool.cleanup() - - # decrypt docs in case of deferred decryption - if defer_decryption: - while not self.clear_to_sync(): - sleep(self.DECRYPT_LOOP_PERIOD) - self._teardown_sync_loop() - self._teardown_sync_decr_pool() - self._sync_exchange_lock.release() - - # update gen and trans id info in case we just sent and did not - # receive docs. - if gen_after_send is not None and gen_after_send > cur_target_gen: - cur_target_gen = gen_after_send - cur_target_trans_id = trans_id_after_send - - self.stop() - self._syncer_pool = None - return cur_target_gen, cur_target_trans_id - - def start(self): - """ - Mark current sync session as running. - """ - with self._stop_lock: - self._stopped = False - - - def stop_syncer(self): - with self._stop_lock: - self._stopped = True - - def stop(self): - """ - Mark current sync session as stopped. - - This will eventually interrupt the sync_exchange() method and return - enough information to the synchronizer so the sync session can be - recovered afterwards. - """ - self.stop_syncer() - if self._syncer_pool: - self._syncer_pool.cancel_threads() - - @property - def stopped(self): - """ - Return whether this sync session is stopped. - - :return: Whether this sync session is stopped. - :rtype: bool - """ - with self._stop_lock: - return self._stopped is True - - def get_encrypted_doc_from_db(self, doc_id, doc_rev): - """ - Retrieve encrypted document from the database of encrypted docs for - sync. - - :param doc_id: The Document id. - :type doc_id: str - - :param doc_rev: The document revision - :type doc_rev: str - """ - encr = SyncEncrypterPool - sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( - encr.TABLE_NAME,)) - res = self._fetchall(sql, (doc_id, doc_rev)) - if res: - val = res.pop() - return val[0] - else: - # no doc found - return None - - def delete_encrypted_docs_from_db(self, docs_ids): - """ - Delete several encrypted documents from the database of symmetrically - encrypted docs to sync. - - :param docs_ids: an iterable with (doc_id, doc_rev) for all documents - to be deleted. - :type docs_ids: any iterable of tuples of str - """ - if docs_ids: - encr = SyncEncrypterPool - for doc_id, doc_rev in docs_ids: - sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( - encr.TABLE_NAME,)) - self._sync_db.execute(sql, (doc_id, doc_rev)) - - def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): - """ - Save a symmetrically encrypted incoming document into the received - docs table in the sync db. A decryption task will pick it up - from here in turn. - - :param doc: The document to save. - :type doc: SoledadDocument - :param gen: The generation. - :type gen: str - :param trans_id: Transacion id. - :type gen: str - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - """ - logger.debug( - "Enqueueing doc for decryption: %d/%d." - % (idx + 1, total)) - self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id) - - def _save_received_doc(self, doc, gen, trans_id, idx, total): - """ - Save any incoming document into the received docs table in the sync db. - - :param doc: The document to save. - :type doc: SoledadDocument - :param gen: The generation. - :type gen: str - :param trans_id: Transacion id. - :type gen: str - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - """ - logger.debug( - "Enqueueing doc, no decryption needed: %d/%d." - % (idx + 1, total)) - self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id) - - # - # Symmetric decryption of syncing docs - # - - def clear_to_sync(self): - """ - Return whether sync can proceed (ie, the received db table is empty). - - :return: Whether sync can proceed. - :rtype: bool - """ - if self._sync_decr_pool: - return self._sync_decr_pool.count_docs_in_sync_db() == 0 - return True - - def set_decryption_callback(self, cb): - """ - Set callback to be called when the decryption finishes. - - :param cb: The callback to be set. - :type cb: callable - """ - self._decryption_callback = cb - - def has_decryption_callback(self): - """ - Return True if there is a decryption callback set. - :rtype: bool - """ - return self._decryption_callback is not None - - def has_syncdb(self): - """ - Return True if we have an initialized syncdb. - """ - return self._sync_db is not None - - def _decrypt_syncing_received_docs(self): - """ - Decrypt the documents received from remote replica and insert them - into the local one. - - Called periodically from LoopingCall self._sync_loop. - """ - if sameProxiedObjects( - self._insert_doc_cb.get(self.source_replica_uid), - None): - return - - decrypter = self._sync_decr_pool - decrypter.raise_in_case_of_failed_async_calls() - decrypter.decrypt_received_docs() - decrypter.process_decrypted() - - def _sign_request(self, method, url_query, params): - """ - Return an authorization header to be included in the HTTP request. - - :param method: The HTTP method. - :type method: str - :param url_query: The URL query string. - :type url_query: str - :param params: A list with encoded query parameters. - :type param: list - - :return: The Authorization header. - :rtype: list of tuple - """ - return TokenBasedAuth._sign_request(self, method, url_query, params) - - def set_token_credentials(self, uuid, token): - """ - Store given credentials so we can sign the request later. - - :param uuid: The user's uuid. - :type uuid: str - :param token: The authentication token. - :type token: str - """ - TokenBasedAuth.set_token_credentials(self, uuid, token) - - def _fetchall(self, *args, **kwargs): - with self._sync_db: - c = self._sync_db.cursor() - c.execute(*args, **kwargs) - return c.fetchall() |