summaryrefslogtreecommitdiff
path: root/client/src
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2015-05-25 16:17:20 -0400
committerKali Kaneko <kali@leap.se>2015-05-25 16:17:20 -0400
commit745ac11c78455e7487bef88da3c3ffeb4fe4351c (patch)
treecc14a46120a69921ba9eed6e5584012c23fc694f /client/src
parent340b0dcfbc0a819738a28f9c803fdbf848754897 (diff)
parent3e6e51649bb6206125f20ac6773f6744ec8bf175 (diff)
Merge remote-tracking branch 'leapcode/pr/216' into develop
this branch includes many changes that improve the asynchronous retrieval of the sync docs, and the parallel decryption of the encrypted documents.
Diffstat (limited to 'client/src')
-rw-r--r--client/src/leap/soledad/client/adbapi.py3
-rw-r--r--client/src/leap/soledad/client/api.py13
-rw-r--r--client/src/leap/soledad/client/auth.py9
-rw-r--r--client/src/leap/soledad/client/crypto.py530
-rw-r--r--client/src/leap/soledad/client/encdecpool.py745
-rw-r--r--client/src/leap/soledad/client/http_client.py194
-rw-r--r--client/src/leap/soledad/client/http_target.py598
-rw-r--r--client/src/leap/soledad/client/pragmas.py43
-rw-r--r--client/src/leap/soledad/client/secrets.py16
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py292
-rw-r--r--client/src/leap/soledad/client/sync.py171
-rw-r--r--client/src/leap/soledad/client/target.py1517
12 files changed, 1756 insertions, 2375 deletions
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py
index 7ad10db5..5b882bbe 100644
--- a/client/src/leap/soledad/client/adbapi.py
+++ b/client/src/leap/soledad/client/adbapi.py
@@ -32,6 +32,7 @@ from zope.proxy import ProxyBase, setProxiedObject
from pysqlcipher.dbapi2 import OperationalError
from leap.soledad.client import sqlcipher as soledad_sqlcipher
+from leap.soledad.client.pragmas import set_init_pragmas
logger = logging.getLogger(name=__name__)
@@ -72,7 +73,7 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher"):
:rtype: U1DBConnectionPool
"""
if openfun is None and driver == "pysqlcipher":
- openfun = partial(soledad_sqlcipher.set_init_pragmas, opts=opts)
+ openfun = partial(set_init_pragmas, opts=opts)
return U1DBConnectionPool(
"%s.dbapi2" % driver, database=opts.path,
check_same_thread=False, cp_openfun=openfun,
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py
index 0f29503f..91e0a4a0 100644
--- a/client/src/leap/soledad/client/api.py
+++ b/client/src/leap/soledad/client/api.py
@@ -272,7 +272,8 @@ class Soledad(object):
replica_uid = self._dbpool.replica_uid
self._dbsyncer = SQLCipherU1DBSync(
self._sqlcipher_opts, self._crypto, replica_uid,
- self._defer_encryption)
+ SOLEDAD_CERT,
+ defer_encryption=self._defer_encryption)
#
# Closing methods
@@ -630,6 +631,7 @@ class Soledad(object):
Whether to defer decryption of documents, or do it inline while
syncing.
:type defer_decryption: bool
+
:return: A deferred whose callback will be invoked with the local
generation before the synchronization was performed.
:rtype: twisted.internet.defer.Deferred
@@ -650,7 +652,7 @@ class Soledad(object):
sync_url = urlparse.urljoin(self._server_url, 'user-%s' % self.uuid)
d = self._dbsyncer.sync(
sync_url,
- creds=self._creds, autocreate=False,
+ creds=self._creds,
defer_decryption=defer_decryption)
def _sync_callback(local_gen):
@@ -658,21 +660,16 @@ class Soledad(object):
soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid)
return local_gen
- # prevent sync failures from crashing the app by adding an errback
- # that logs the failure and does not propagate it down the callback
- # chain
def _sync_errback(failure):
s = StringIO()
failure.printDetailedTraceback(file=s)
msg = "Soledad exception when syncing!\n" + s.getvalue()
logger.error(msg)
+ return failure
d.addCallbacks(_sync_callback, _sync_errback)
return d
- def stop_sync(self):
- self._dbsyncer.stop_sync()
-
@property
def syncing(self):
"""
diff --git a/client/src/leap/soledad/client/auth.py b/client/src/leap/soledad/client/auth.py
index 72ab0008..6dfabeb4 100644
--- a/client/src/leap/soledad/client/auth.py
+++ b/client/src/leap/soledad/client/auth.py
@@ -14,15 +14,13 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
Methods for token-based authentication.
These methods have to be included in all classes that extend HTTPClient so
they can do token-based auth requests to the Soledad server.
"""
-
+import base64
from u1db import errors
@@ -49,7 +47,7 @@ class TokenBasedAuth(object):
Return an authorization header to be included in the HTTP request, in
the form:
- [('Authorization', 'Token <base64 encoded creds')]
+ [('Authorization', 'Token <(base64 encoded) uuid:token>')]
:param method: The HTTP method.
:type method: str
@@ -64,7 +62,8 @@ class TokenBasedAuth(object):
if 'token' in self._creds:
uuid, token = self._creds['token']
auth = '%s:%s' % (uuid, token)
- return [('Authorization', 'Token %s' % auth.encode('base64')[:-1])]
+ b64_token = base64.b64encode(auth)
+ return [('Authorization', 'Token %s' % b64_token)]
else:
raise errors.UnknownAuthMethod(
'Wrong credentials: %s' % self._creds)
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index 107bf7f1..bdbaa8e0 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -23,17 +23,13 @@ import hmac
import hashlib
import json
import logging
-import multiprocessing
-import threading
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
-from zope.proxy import sameProxiedObjects
from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
from leap.soledad.common import crypto
-from leap.soledad.common.document import SoledadDocument
logger = logging.getLogger(__name__)
@@ -227,7 +223,7 @@ class SoledadCrypto(object):
#
def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv,
- mac_method, secret):
+ mac_method, secret):
"""
Calculate a MAC for C{doc} using C{ciphertext}.
@@ -378,7 +374,7 @@ def decrypt_doc(crypto, doc):
def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method,
- enc_iv, mac_method, secret, doc_mac):
+ enc_iv, mac_method, secret, doc_mac):
"""
Verify that C{doc_mac} is a correct MAC for the given document.
@@ -511,525 +507,3 @@ def is_symmetrically_encrypted(doc):
== crypto.EncryptionSchemes.SYMKEY:
return True
return False
-
-
-#
-# Encrypt/decrypt pools of workers
-#
-
-class SyncEncryptDecryptPool(object):
- """
- Base class for encrypter/decrypter pools.
- """
- WORKERS = multiprocessing.cpu_count()
-
- def __init__(self, crypto, sync_db, write_lock):
- """
- Initialize the pool of encryption-workers.
-
- :param crypto: A SoledadCryto instance to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
-
- :param sync_db: A database connection handle
- :type sync_db: pysqlcipher.dbapi2.Connection
-
- :param write_lock: a write lock for controlling concurrent access
- to the sync_db
- :type write_lock: threading.Lock
- """
- self._pool = multiprocessing.Pool(self.WORKERS)
- self._crypto = crypto
- self._sync_db = sync_db
- self._sync_db_write_lock = write_lock
-
- def close(self):
- """
- Cleanly close the pool of workers.
- """
- logger.debug("Closing %s" % (self.__class__.__name__,))
- self._pool.close()
- try:
- self._pool.join()
- except Exception:
- pass
-
- def terminate(self):
- """
- Terminate the pool of workers.
- """
- logger.debug("Terminating %s" % (self.__class__.__name__,))
- self._pool.terminate()
-
-
-def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
- """
- Encrypt the content of the given document.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param key: The encryption key.
- :type key: str
- :param secret: The Soledad storage secret (used for MAC auth).
- :type secret: str
-
- :return: A tuple containing the doc id, revision and encrypted content.
- :rtype: tuple(str, str, str)
- """
- encrypted_content = encrypt_docstr(
- content, doc_id, doc_rev, key, secret)
- return doc_id, doc_rev, encrypted_content
-
-
-class SyncEncrypterPool(SyncEncryptDecryptPool):
- """
- Pool of workers that spawn subprocesses to execute the symmetric encryption
- of documents to be synced.
- """
- # TODO implement throttling to reduce cpu usage??
- WORKERS = multiprocessing.cpu_count()
- TABLE_NAME = "docs_tosync"
- FIELD_NAMES = "doc_id, rev, content"
-
- def encrypt_doc(self, doc, workers=True):
- """
- Symmetrically encrypt a document.
-
- :param doc: The document with contents to be encrypted.
- :type doc: SoledadDocument
-
- :param workers: Whether to defer the decryption to the multiprocess
- pool of workers. Useful for debugging purposes.
- :type workers: bool
- """
- soledad_assert(self._crypto is not None, "need a crypto object")
- docstr = doc.get_json()
- key = self._crypto.doc_passphrase(doc.doc_id)
- secret = self._crypto.secret
- args = doc.doc_id, doc.rev, docstr, key, secret
-
- try:
- if workers:
- res = self._pool.apply_async(
- encrypt_doc_task, args,
- callback=self.encrypt_doc_cb)
- else:
- # encrypt inline
- res = encrypt_doc_task(*args)
- self.encrypt_doc_cb(res)
-
- except Exception as exc:
- logger.exception(exc)
-
- def encrypt_doc_cb(self, result):
- """
- Insert results of encryption routine into the local sync database.
-
- :param result: A tuple containing the doc id, revision and encrypted
- content.
- :type result: tuple(str, str, str)
- """
- doc_id, doc_rev, content = result
- self.insert_encrypted_local_doc(doc_id, doc_rev, content)
-
- def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
- """
- Insert the contents of the encrypted doc into the local sync
- database.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param content: The encrypted document.
- :type content: str
- """
- # FIXME --- callback should complete immediately since otherwise the
- # thread which handles the results will get blocked
- # Right now we're blocking the dispatcher with the writes to sqlite.
- sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,)
-
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id, ))
- con.execute(sql_ins, (doc_id, doc_rev, content))
-
-
-def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
- """
- Decrypt the content of the given document.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The encrypted content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification of
- that document.
- :type trans_id: str
- :param key: The encryption key.
- :type key: str
- :param secret: The Soledad storage secret (used for MAC auth).
- :type secret: str
-
- :return: A tuple containing the doc id, revision and encrypted content.
- :rtype: tuple(str, str, str)
- """
- decrypted_content = decrypt_doc_dict(
- content, doc_id, doc_rev, key, secret)
- return doc_id, doc_rev, decrypted_content, gen, trans_id
-
-
-class SyncDecrypterPool(SyncEncryptDecryptPool):
- """
- Pool of workers that spawn subprocesses to execute the symmetric decryption
- of documents that were received.
-
- The decryption of the received documents is done in two steps:
-
- 1. All the encrypted docs are collected, together with their generation
- and transaction-id
- 2. The docs are enqueued for decryption. When completed, they are
- inserted following the generation order.
- """
- # TODO implement throttling to reduce cpu usage??
- TABLE_NAME = "docs_received"
- FIELD_NAMES = "doc_id, rev, content, gen, trans_id, encrypted"
-
- write_encrypted_lock = threading.Lock()
-
- def __init__(self, *args, **kwargs):
- """
- Initialize the decrypter pool, and setup a dict for putting the
- results of the decrypted docs until they are picked by the insert
- routine that gets them in order.
-
- :param insert_doc_cb: A callback for inserting received documents from
- target. If not overriden, this will call u1db
- insert_doc_from_target in synchronizer, which
- implements the TAKE OTHER semantics.
- :type insert_doc_cb: function
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- """
- self._insert_doc_cb = kwargs.pop("insert_doc_cb")
- SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self.source_replica_uid = None
- self._async_results = []
-
- def set_source_replica_uid(self, source_replica_uid):
- """
- Set the source replica uid for this decrypter pool instance.
-
- :param source_replica_uid: The uid of the source replica.
- :type source_replica_uid: str
- """
- self.source_replica_uid = source_replica_uid
-
- def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
- gen, trans_id):
- """
- Insert a received message with encrypted content, to be decrypted later
- on.
-
- :param doc_id: The Document ID.
- :type doc_id: str
- :param doc_rev: The Document Revision
- :param doc_rev: str
- :param content: the Content of the document
- :type content: str
- :param gen: the Document Generation
- :type gen: int
- :param trans_id: Transaction ID
- :type trans_id: str
- """
- docstr = json.dumps(content)
- sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
- self.TABLE_NAME,)
-
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id, ))
- con.execute(
- sql_ins,
- (doc_id, doc_rev, docstr, gen, trans_id, 1))
-
- def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):
- """
- Insert a document that is not symmetrically encrypted.
- We store it in the staging area (the decrypted_docs dictionary) to be
- picked up in order as the preceding documents are decrypted.
-
- :param doc_id: The Document ID.
- :type doc_id: str
- :param doc_rev: The Document Revision
- :param doc_rev: str
- :param content: the Content of the document
- :type content: str
- :param gen: the Document Generation
- :type gen: int
- :param trans_id: Transaction ID
- :type trans_id: str
- """
- if not isinstance(content, str):
- content = json.dumps(content)
- sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (
- self.TABLE_NAME,)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
- self.TABLE_NAME,)
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id,))
- con.execute(
- sql_ins,
- (doc_id, doc_rev, content, gen, trans_id, 0))
-
- def delete_received_doc(self, doc_id, doc_rev):
- """
- Delete a received doc after it was inserted into the local db.
-
- :param doc_id: Document ID.
- :type doc_id: str
- :param doc_rev: Document revision.
- :type doc_rev: str
- """
- sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
- self.TABLE_NAME,)
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id, doc_rev))
-
- def decrypt_doc(self, doc_id, rev, content, gen, trans_id,
- source_replica_uid, workers=True):
- """
- Symmetrically decrypt a document.
-
- :param doc_id: The ID for the document with contents to be encrypted.
- :type doc: str
- :param rev: The revision of the document.
- :type rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification
- of that document.
- :type trans_id: str
- :param source_replica_uid:
- :type source_replica_uid: str
-
- :param workers: Whether to defer the decryption to the multiprocess
- pool of workers. Useful for debugging purposes.
- :type workers: bool
- """
- self.source_replica_uid = source_replica_uid
-
- # insert_doc_cb is a proxy object that gets updated with the right
- # insert function only when the sync_target invokes the sync_exchange
- # method. so, if we don't still have a non-empty callback, we refuse
- # to proceed.
- if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid),
- None):
- logger.debug("Sync decrypter pool: no insert_doc_cb() yet.")
- return
-
- soledad_assert(self._crypto is not None, "need a crypto object")
-
- if len(content) == 0:
- # not encrypted payload
- return
-
- content = json.loads(content)
- key = self._crypto.doc_passphrase(doc_id)
- secret = self._crypto.secret
- args = doc_id, rev, content, gen, trans_id, key, secret
-
- if workers:
- # save the async result object so we can inspect it for failures
- self._async_results.append(self._pool.apply_async(
- decrypt_doc_task, args,
- callback=self.decrypt_doc_cb))
- else:
- # decrypt inline
- res = decrypt_doc_task(*args)
- self.decrypt_doc_cb(res)
-
- def decrypt_doc_cb(self, result):
- """
- Store the decryption result in the sync db from where it will later be
- picked by process_decrypted.
-
- :param result: A tuple containing the doc id, revision and encrypted
- content.
- :type result: tuple(str, str, str)
- """
- doc_id, rev, content, gen, trans_id = result
- logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
- % (doc_id, rev, gen, trans_id))
- self.insert_received_doc(doc_id, rev, content, gen, trans_id)
-
- def get_docs_by_generation(self, encrypted=None):
- """
- Get all documents in the received table from the sync db,
- ordered by generation.
-
- :param encrypted: If not None, only return documents with encrypted
- field equal to given parameter.
- :type encrypted: bool or None
-
- :return: list of doc_id, rev, generation, gen, trans_id
- :rtype: list
- """
- sql = "SELECT doc_id, rev, content, gen, trans_id, encrypted FROM %s" \
- % self.TABLE_NAME
- if encrypted is not None:
- sql += " WHERE encrypted = %d" % int(encrypted)
- sql += " ORDER BY gen ASC"
- return self._fetchall(sql)
-
- def get_insertable_docs_by_gen(self):
- """
- Return a list of non-encrypted documents ready to be inserted.
- """
- # here, we compare the list of all available docs with the list of
- # decrypted docs and find the longest common prefix between these two
- # lists. Note that the order of lists fetch matters: if instead we
- # first fetch the list of decrypted docs and then the list of all
- # docs, then some document might have been decrypted between these two
- # calls, and if it is just the right doc then it might not be caught
- # by the next loop.
- all_docs = self.get_docs_by_generation()
- decrypted_docs = self.get_docs_by_generation(encrypted=False)
- insertable = []
- for doc_id, rev, _, gen, trans_id, encrypted in all_docs:
- for next_doc_id, _, next_content, _, _, _ in decrypted_docs:
- if doc_id == next_doc_id:
- content = next_content
- insertable.append((doc_id, rev, content, gen, trans_id))
- else:
- break
- return insertable
-
- def count_docs_in_sync_db(self, encrypted=None):
- """
- Count how many documents we have in the table for received docs.
-
- :param encrypted: If not None, return count of documents with
- encrypted field equal to given parameter.
- :type encrypted: bool or None
-
- :return: The count of documents.
- :rtype: int
- """
- if self._sync_db is None:
- logger.warning("cannot return count with null sync_db")
- return
- sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
- if encrypted is not None:
- sql += " WHERE encrypted = %d" % int(encrypted)
- res = self._fetchall(sql)
- if res:
- val = res.pop()
- return val[0]
- else:
- return 0
-
- def decrypt_received_docs(self):
- """
- Get all the encrypted documents from the sync database and dispatch a
- decrypt worker to decrypt each one of them.
- """
- docs_by_generation = self.get_docs_by_generation(encrypted=True)
- for doc_id, rev, content, gen, trans_id, _ \
- in filter(None, docs_by_generation):
- self.decrypt_doc(
- doc_id, rev, content, gen, trans_id, self.source_replica_uid)
-
- def process_decrypted(self):
- """
- Process the already decrypted documents, and insert as many documents
- as can be taken from the expected order without finding a gap.
-
- :return: Whether we have processed all the pending docs.
- :rtype: bool
- """
- # Acquire the lock to avoid processing while we're still
- # getting data from the syncing stream, to avoid InvalidGeneration
- # problems.
- with self.write_encrypted_lock:
- for doc_fields in self.get_insertable_docs_by_gen():
- self.insert_decrypted_local_doc(*doc_fields)
- remaining = self.count_docs_in_sync_db()
- return remaining == 0
-
- def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
- gen, trans_id):
- """
- Insert the decrypted document into the local sqlcipher database.
- Makes use of the passed callback `return_doc_cb` passed to the caller
- by u1db sync.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification
- of that document.
- :type trans_id: str
- """
- # could pass source_replica in params for callback chain
- insert_fun = self._insert_doc_cb[self.source_replica_uid]
- logger.debug("Sync decrypter pool: inserting doc in local db: "
- "%s:%s %s" % (doc_id, doc_rev, gen))
-
- # convert deleted documents to avoid error on document creation
- if content == 'null':
- content = None
- doc = SoledadDocument(doc_id, doc_rev, content)
- gen = int(gen)
- insert_fun(doc, gen, trans_id)
-
- # If no errors found, remove it from the received database.
- self.delete_received_doc(doc_id, doc_rev)
-
- def empty(self):
- """
- Empty the received docs table of the sync database.
- """
- sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
- self._sync_db.execute(sql)
-
- def _fetchall(self, *args, **kwargs):
- with self._sync_db:
- c = self._sync_db.cursor()
- c.execute(*args, **kwargs)
- return c.fetchall()
-
- def raise_in_case_of_failed_async_calls(self):
- """
- Re-raise any exception raised by an async call.
-
- :raise Exception: Raised if an async call has raised an exception.
- """
- for res in self._async_results:
- if res.ready():
- if not res.successful():
- # re-raise the exception raised by the remote call
- res.get()
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
new file mode 100644
index 00000000..c0a05d38
--- /dev/null
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -0,0 +1,745 @@
+# -*- coding: utf-8 -*-
+# encdecpool.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+A pool of encryption/decryption concurrent and parallel workers for using
+during synchronization.
+"""
+
+
+import multiprocessing
+import json
+import logging
+
+from twisted.internet import reactor
+from twisted.internet import defer
+from twisted.internet.threads import deferToThread
+
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.common import soledad_assert
+
+from leap.soledad.client.crypto import encrypt_docstr
+from leap.soledad.client.crypto import decrypt_doc_dict
+
+
+logger = logging.getLogger(__name__)
+
+
+#
+# Encrypt/decrypt pools of workers
+#
+
+class SyncEncryptDecryptPool(object):
+ """
+ Base class for encrypter/decrypter pools.
+ """
+
+ # TODO implement throttling to reduce cpu usage??
+ WORKERS = multiprocessing.cpu_count()
+
+ def __init__(self, crypto, sync_db):
+ """
+ Initialize the pool of encryption-workers.
+
+ :param crypto: A SoledadCryto instance to perform the encryption.
+ :type crypto: leap.soledad.crypto.SoledadCrypto
+
+ :param sync_db: A database connection handle
+ :type sync_db: pysqlcipher.dbapi2.Connection
+ """
+ self._crypto = crypto
+ self._sync_db = sync_db
+ self._pool = multiprocessing.Pool(self.WORKERS)
+
+ def close(self):
+ """
+ Cleanly close the pool of workers.
+ """
+ logger.debug("Closing %s" % (self.__class__.__name__,))
+ self._pool.close()
+ try:
+ self._pool.join()
+ except Exception:
+ pass
+
+ def terminate(self):
+ """
+ Terminate the pool of workers.
+ """
+ logger.debug("Terminating %s" % (self.__class__.__name__,))
+ self._pool.terminate()
+
+ def _runOperation(self, query, *args):
+ """
+ Run an operation on the sync db.
+
+ :param query: The query to be executed.
+ :type query: str
+ :param args: A list of query arguments.
+ :type args: list
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._sync_db.runOperation(query, *args)
+
+ def _runQuery(self, query, *args):
+ """
+ Run a query on the sync db.
+
+ :param query: The query to be executed.
+ :type query: str
+ :param args: A list of query arguments.
+ :type args: list
+
+ :return: A deferred that will fire with the results of the database
+ query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._sync_db.runQuery(query, *args)
+
+
+def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
+ """
+ Encrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad storage secret (used for MAC auth).
+ :type secret: str
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ encrypted_content = encrypt_docstr(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, encrypted_content
+
+
+class SyncEncrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric encryption
+ of documents to be synced.
+ """
+ TABLE_NAME = "docs_tosync"
+ FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
+
+ ENCRYPT_LOOP_PERIOD = 0.5
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initialize the sync encrypter pool.
+ """
+ SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+
+ self._stopped = False
+ self._sync_queue = multiprocessing.Queue()
+
+ # start the encryption loop
+ self._deferred_loop = deferToThread(self._encrypt_docs_loop)
+ self._deferred_loop.addCallback(
+ lambda _: logger.debug("Finished encrypter thread."))
+
+ def enqueue_doc_for_encryption(self, doc):
+ """
+ Enqueue a document for encryption.
+
+ :param doc: The document to be encrypted.
+ :type doc: SoledadDocument
+ """
+ try:
+ self.sync_queue.put_nowait(doc)
+ except multiprocessing.Queue.Full:
+ # do not asynchronously encrypt this file if the queue is full
+ pass
+
+ def _encrypt_docs_loop(self):
+ """
+ Process the syncing queue and send the documents there to be encrypted
+ in the sync db. They will be read by the SoledadSyncTarget during the
+ sync_exchange.
+ """
+ logger.debug("Starting encrypter thread.")
+ while not self._stopped:
+ try:
+ doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD)
+ self._encrypt_doc(doc)
+ except multiprocessing.Queue.Empty:
+ pass
+
+ def _encrypt_doc(self, doc):
+ """
+ Symmetrically encrypt a document.
+
+ :param doc: The document with contents to be encrypted.
+ :type doc: SoledadDocument
+
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+ """
+ soledad_assert(self._crypto is not None, "need a crypto object")
+ docstr = doc.get_json()
+ key = self._crypto.doc_passphrase(doc.doc_id)
+ secret = self._crypto.secret
+ args = doc.doc_id, doc.rev, docstr, key, secret
+ # encrypt asynchronously
+ self._pool.apply_async(
+ encrypt_doc_task, args,
+ callback=self._encrypt_doc_cb)
+
+ def _encrypt_doc_cb(self, result):
+ """
+ Insert results of encryption routine into the local sync database.
+
+ :param result: A tuple containing the doc id, revision and encrypted
+ content.
+ :type result: tuple(str, str, str)
+ """
+ doc_id, doc_rev, content = result
+ return self._insert_encrypted_local_doc(doc_id, doc_rev, content)
+
+ def _insert_encrypted_local_doc(self, doc_id, doc_rev, content):
+ """
+ Insert the contents of the encrypted doc into the local sync
+ database.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ """
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \
+ % (self.TABLE_NAME,)
+ return self._runOperation(query, (doc_id, doc_rev, content))
+
+ @defer.inlineCallbacks
+ def get_encrypted_doc(self, doc_id, doc_rev):
+ """
+ Get an encrypted document from the sync db.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+
+ :return: A deferred that will fire with the encrypted content of the
+ document or None if the document was not found in the sync
+ db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ logger.debug("Trying to get encrypted doc from sync db: %s" % doc_id)
+ query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \
+ % self.TABLE_NAME
+ result = yield self._runQuery(query, (doc_id, doc_rev))
+ if result:
+ val = result.pop()
+ defer.returnValue(val[0])
+ defer.returnValue(None)
+
+ def delete_encrypted_doc(self, doc_id, doc_rev):
+ """
+ Delete an encrypted document from the sync db.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM %s WHERE doc_id=? and rev=?" \
+ % self.TABLE_NAME
+ self._runOperation(query, (doc_id, doc_rev))
+
+ def close(self):
+ """
+ Close the encrypter pool.
+ """
+ self._stopped = True
+ self._sync_queue.close()
+ q = self._sync_queue
+ del q
+ self._sync_queue = None
+
+
+def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
+ idx):
+ """
+ Decrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The encrypted content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification of
+ that document.
+ :type trans_id: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad storage secret (used for MAC auth).
+ :type secret: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, decrypted_content, gen, trans_id, idx
+
+
+class SyncDecrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric decryption
+ of documents that were received.
+
+ The decryption of the received documents is done in two steps:
+
+ 1. Encrypted documents are stored in the sync db by the actual soledad
+ sync loop.
+ 2. The soledad sync loop tells us how many documents we should expect
+ to process.
+ 3. We start a decrypt-and-process loop:
+
+ a. Encrypted documents are fetched.
+ b. Encrypted documents are decrypted.
+ c. The longest possible list of decrypted documents are inserted
+ in the soledad db (this depends on which documents have already
+ arrived and which documents have already been decrypte, because
+ the order of insertion in the local soledad db matters).
+ d. Processed documents are deleted from the database.
+
+ 4. When we have processed as many documents as we should, the loop
+ finishes.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ TABLE_NAME = "docs_received"
+ FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \
+ "trans_id, encrypted, idx"
+
+ """
+ Period of recurrence of the periodic decrypting task, in seconds.
+ """
+ DECRYPT_LOOP_PERIOD = 0.5
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initialize the decrypter pool, and setup a dict for putting the
+ results of the decrypted docs until they are picked by the insert
+ routine that gets them in order.
+
+ :param insert_doc_cb: A callback for inserting received documents from
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
+ :type insert_doc_cb: function
+ :param source_replica_uid: The source replica uid, used to find the
+ correct callback for inserting documents.
+ :type source_replica_uid: str
+ """
+ self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ self.source_replica_uid = kwargs.pop("source_replica_uid")
+ SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+
+ self._last_inserted_idx = 0
+ self._docs_to_process = None
+ self._processed_docs = 0
+
+ self._async_results = []
+ self._failure = None
+ self._finished = False
+
+ # XXX we want to empty the database before starting, but this is an
+ # asynchronous call, so we have to somehow make sure that it is
+ # executed before any other call to the database, without
+ # blocking.
+ self._empty()
+
+ def _launch_decrypt_and_process(self):
+ d = self._decrypt_and_process_docs()
+ d.addErrback(lambda f: self._set_failure(f))
+
+ def _schedule_decrypt_and_process(self):
+ reactor.callLater(
+ self.DECRYPT_LOOP_PERIOD,
+ self._launch_decrypt_and_process)
+
+ @property
+ def failure(self):
+ return self._failure
+
+ def _set_failure(self, failure):
+ self._failure = failure
+ self._finished = True
+
+ def failed(self):
+ return bool(self._failure)
+
+ def start(self, docs_to_process):
+ """
+ Set the number of documents we expect to process.
+
+ This should be called by the during the sync exchange process as soon
+ as we know how many documents are arriving from the server.
+
+ :param docs_to_process: The number of documents to process.
+ :type docs_to_process: int
+ """
+ self._docs_to_process = docs_to_process
+ self._schedule_decrypt_and_process()
+
+ def insert_encrypted_received_doc(
+ self, doc_id, doc_rev, content, gen, trans_id, idx):
+ """
+ Insert a received message with encrypted content, to be decrypted later
+ on.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ docstr = json.dumps(content)
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
+ % self.TABLE_NAME
+ return self._runOperation(
+ query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx))
+
+ def insert_received_doc(
+ self, doc_id, doc_rev, content, gen, trans_id, idx):
+ """
+ Insert a document that is not symmetrically encrypted.
+ We store it in the staging area (the decrypted_docs dictionary) to be
+ picked up in order as the preceding documents are decrypted.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ if not isinstance(content, str):
+ content = json.dumps(content)
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
+ % self.TABLE_NAME
+ return self._runOperation(
+ query, (doc_id, doc_rev, content, gen, trans_id, 0, idx))
+
+ def _delete_received_doc(self, doc_id):
+ """
+ Delete a received doc after it was inserted into the local db.
+
+ :param doc_id: Document ID.
+ :type doc_id: str
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM '%s' WHERE doc_id=?" \
+ % self.TABLE_NAME
+ return self._runOperation(query, (doc_id,))
+
+ def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx):
+ """
+ Dispatch an asynchronous document decrypting routine and save the
+ result object.
+
+ :param doc_id: The ID for the document with contents to be encrypted.
+ :type doc: str
+ :param rev: The revision of the document.
+ :type rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A deferred that will fire after the document hasa been
+ decrypted and inserted in the sync db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ soledad_assert(self._crypto is not None, "need a crypto object")
+
+ content = json.loads(content)
+ key = self._crypto.doc_passphrase(doc_id)
+ secret = self._crypto.secret
+ args = doc_id, rev, content, gen, trans_id, key, secret, idx
+ # decrypt asynchronously
+ self._async_results.append(
+ self._pool.apply_async(
+ decrypt_doc_task, args))
+
+ def _decrypt_doc_cb(self, result):
+ """
+ Store the decryption result in the sync db from where it will later be
+ picked by _process_decrypted_docs.
+
+ :param result: A tuple containing the document's id, revision,
+ content, generation, transaction id and sync index.
+ :type result: tuple(str, str, str, int, str, int)
+
+ :return: A deferred that will fire after the document has been
+ inserted in the sync db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ doc_id, rev, content, gen, trans_id, idx = result
+ logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
+ % (doc_id, rev, gen, trans_id))
+ return self.insert_received_doc(
+ doc_id, rev, content, gen, trans_id, idx)
+
+ def _get_docs(self, encrypted=None, order_by='idx', order='ASC'):
+ """
+ Get documents from the received docs table in the sync db.
+
+ :param encrypted: If not None, only return documents with encrypted
+ field equal to given parameter.
+ :type encrypted: bool or None
+ :param order_by: The name of the field to order results.
+ :type order_by: str
+ :param order: Whether the order should be ASC or DESC.
+ :type order: str
+
+ :return: A deferred that will fire with the results of the database
+ query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \
+ "idx FROM %s" % self.TABLE_NAME
+ if encrypted is not None:
+ query += " WHERE encrypted = %d" % int(encrypted)
+ query += " ORDER BY %s %s" % (order_by, order)
+ return self._runQuery(query)
+
+ @defer.inlineCallbacks
+ def _get_insertable_docs(self):
+ """
+ Return a list of non-encrypted documents ready to be inserted.
+
+ :return: A deferred that will fire with the list of insertable
+ documents.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ # here, we fetch the list of decrypted documents and compare with the
+ # index of the last succesfully processed document.
+ decrypted_docs = yield self._get_docs(encrypted=False)
+ insertable = []
+ last_idx = self._last_inserted_idx
+ for doc_id, rev, content, gen, trans_id, encrypted, idx in \
+ decrypted_docs:
+ # XXX for some reason, a document might not have been deleted from
+ # the database. This is a bug. In this point, already
+ # processed documents should have been removed from the sync
+ # database and we should not have to skip them here. We need
+ # to find out why this is happening, fix, and remove the
+ # skipping below.
+ if (idx < last_idx + 1):
+ continue
+ if (idx != last_idx + 1):
+ break
+ insertable.append((doc_id, rev, content, gen, trans_id, idx))
+ last_idx += 1
+ defer.returnValue(insertable)
+
+ @defer.inlineCallbacks
+ def _async_decrypt_received_docs(self):
+ """
+ Get all the encrypted documents from the sync database and dispatch a
+ decrypt worker to decrypt each one of them.
+
+ :return: A deferred that will fire after all documents have been
+ decrypted and inserted back in the sync db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ docs = yield self._get_docs(encrypted=True)
+ for doc_id, rev, content, gen, trans_id, _, idx in docs:
+ self._async_decrypt_doc(
+ doc_id, rev, content, gen, trans_id, idx)
+
+ @defer.inlineCallbacks
+ def _process_decrypted_docs(self):
+ """
+ Fetch as many decrypted documents as can be taken from the expected
+ order and insert them in the local replica.
+
+ :return: A deferred that will fire with the list of inserted
+ documents.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ insertable = yield self._get_insertable_docs()
+ for doc_fields in insertable:
+ self._insert_decrypted_local_doc(*doc_fields)
+ defer.returnValue(insertable)
+
+ def _delete_processed_docs(self, inserted):
+ """
+ Delete from the sync db documents that have been processed.
+
+ :param inserted: List of documents inserted in the previous process
+ step.
+ :type inserted: list
+
+ :return: A list of deferreds that will fire when each operation in the
+ database has finished.
+ :rtype: twisted.internet.defer.DeferredList
+ """
+ deferreds = []
+ for doc_id, doc_rev, _, _, _, _ in inserted:
+ deferreds.append(
+ self._delete_received_doc(doc_id))
+ if not deferreds:
+ return defer.succeed(None)
+ return defer.gatherResults(deferreds)
+
+ def _insert_decrypted_local_doc(self, doc_id, doc_rev, content,
+ gen, trans_id, idx):
+ """
+ Insert the decrypted document into the local replica.
+
+ Make use of the passed callback `insert_doc_cb` passed to the caller
+ by u1db sync.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
+ """
+ # could pass source_replica in params for callback chain
+ logger.debug("Sync decrypter pool: inserting doc in local db: "
+ "%s:%s %s" % (doc_id, doc_rev, gen))
+
+ # convert deleted documents to avoid error on document creation
+ if content == 'null':
+ content = None
+ doc = SoledadDocument(doc_id, doc_rev, content)
+ gen = int(gen)
+ self._insert_doc_cb(doc, gen, trans_id)
+
+ # store info about processed docs
+ self._last_inserted_idx = idx
+ self._processed_docs += 1
+
+ def _empty(self):
+ """
+ Empty the received docs table of the sync database.
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
+ return self._runOperation(query)
+
+ def _collect_async_decryption_results(self):
+ """
+ Collect the results of the asynchronous doc decryptions and re-raise
+ any exception raised by a multiprocessing async decryption call.
+
+ :raise Exception: Raised if an async call has raised an exception.
+ """
+ async_results = self._async_results[:]
+ for res in async_results:
+ if res.ready():
+ self._decrypt_doc_cb(res.get()) # might raise an exception!
+ self._async_results.remove(res)
+
+ @defer.inlineCallbacks
+ def _decrypt_and_process_docs(self):
+ """
+ Decrypt the documents received from remote replica and insert them
+ into the local one.
+
+ This method implicitelly returns a defferred (see the decorator
+ above). It should only be called by _launch_decrypt_and_process().
+ because this way any exceptions raised here will be stored by the
+ errback attached to the deferred returned.
+
+ :return: A deferred which will fire after all decrypt, process and
+ delete operations have been executed.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ if not self.failed():
+ if self._processed_docs < self._docs_to_process:
+ yield self._async_decrypt_received_docs()
+ yield self._collect_async_decryption_results()
+ docs = yield self._process_decrypted_docs()
+ yield self._delete_processed_docs(docs)
+ # recurse
+ self._schedule_decrypt_and_process()
+ else:
+ self._finished = True
+
+ def has_finished(self):
+ """
+ Return whether the decrypter has finished its work.
+ """
+ return self._finished
diff --git a/client/src/leap/soledad/client/http_client.py b/client/src/leap/soledad/client/http_client.py
new file mode 100644
index 00000000..b08d199e
--- /dev/null
+++ b/client/src/leap/soledad/client/http_client.py
@@ -0,0 +1,194 @@
+# -*- coding: utf-8 -*-
+# http_client.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Twisted HTTP/HTTPS client.
+"""
+
+import os
+
+from zope.interface import implements
+
+from OpenSSL.crypto import load_certificate
+from OpenSSL.crypto import FILETYPE_PEM
+
+from twisted.internet import reactor
+from twisted.internet.ssl import ClientContextFactory
+from twisted.internet.ssl import CertificateOptions
+from twisted.internet.defer import succeed
+
+from twisted.web.client import Agent
+from twisted.web.client import HTTPConnectionPool
+from twisted.web.client import readBody
+from twisted.web.http_headers import Headers
+from twisted.web.error import Error
+from twisted.web.iweb import IBodyProducer
+
+
+from leap.soledad.common.errors import InvalidAuthTokenError
+
+
+#
+# Setup a pool of connections
+#
+
+_pool = HTTPConnectionPool(reactor, persistent=True)
+_pool.maxPersistentPerHost = 10
+_agent = None
+
+# if we ever want to trust the system's CAs, we should use an agent like this:
+# from twisted.web.client import BrowserLikePolicyForHTTPS
+# _agent = Agent(reactor, BrowserLikePolicyForHTTPS(), pool=_pool)
+
+
+#
+# SSL/TLS certificate configuration
+#
+
+def configure_certificate(cert_file):
+ """
+ Configure an agent that verifies server certificates against a CA cert
+ file.
+
+ :param cert_file: The path to the certificate file.
+ :type cert_file: str
+ """
+ global _agent
+ cert = _load_cert(cert_file)
+ _agent = Agent(
+ reactor,
+ SoledadClientContextFactory(cert),
+ pool=_pool)
+
+
+def _load_cert(cert_file):
+ """
+ Load a X509 certificate from a file.
+
+ :param cert_file: The path to the certificate file.
+ :type cert_file: str
+
+ :return: The X509 certificate.
+ :rtype: OpenSSL.crypto.X509
+ """
+ if os.path.exists(cert_file):
+ with open(cert_file) as f:
+ data = f.read()
+ return load_certificate(FILETYPE_PEM, data)
+
+
+class SoledadClientContextFactory(ClientContextFactory):
+ """
+ A context factory that will verify the server's certificate against a
+ given CA certificate.
+ """
+
+ def __init__(self, cacert):
+ """
+ Initialize the context factory.
+
+ :param cacert: The CA certificate.
+ :type cacert: OpenSSL.crypto.X509
+ """
+ self._cacert = cacert
+
+ def getContext(self, hostname, port):
+ opts = CertificateOptions(verify=True, caCerts=[self._cacert])
+ return opts.getContext()
+
+
+#
+# HTTP request facilities
+#
+
+def _unauth_to_invalid_token_error(failure):
+ """
+ An errback to translate unauthorized errors to our own invalid token
+ class.
+
+ :param failure: The original failure.
+ :type failure: twisted.python.failure.Failure
+
+ :return: Either the original failure or an invalid auth token error.
+ :rtype: twisted.python.failure.Failure
+ """
+ failure.trap(Error)
+ if failure.getErrorMessage() == "401 Unauthorized":
+ raise InvalidAuthTokenError
+ return failure
+
+
+class StringBodyProducer(object):
+ """
+ A producer that writes the body of a request to a consumer.
+ """
+
+ implements(IBodyProducer)
+
+ def __init__(self, body):
+ """
+ Initialize the string produer.
+
+ :param body: The body of the request.
+ :type body: str
+ """
+ self.body = body
+ self.length = len(body)
+
+ def startProducing(self, consumer):
+ """
+ Write the body to the consumer.
+
+ :param consumer: Any IConsumer provider.
+ :type consumer: twisted.internet.interfaces.IConsumer
+
+ :return: A successful deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ consumer.write(self.body)
+ return succeed(None)
+
+ def pauseProducing(self):
+ pass
+
+ def stopProducing(self):
+ pass
+
+
+def httpRequest(url, method='GET', body=None, headers={}):
+ """
+ Perform an HTTP request.
+
+ :param url: The URL for the request.
+ :type url: str
+ :param method: The HTTP method of the request.
+ :type method: str
+ :param body: The body of the request, if any.
+ :type body: str
+ :param headers: The headers of the request.
+ :type headers: dict
+
+ :return: A deferred that fires with the body of the request.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ if body:
+ body = StringBodyProducer(body)
+ d = _agent.request(
+ method, url, headers=Headers(headers), bodyProducer=body)
+ d.addCallbacks(readBody, _unauth_to_invalid_token_error)
+ return d
diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py
new file mode 100644
index 00000000..dc6c0e0a
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target.py
@@ -0,0 +1,598 @@
+# -*- coding: utf-8 -*-
+# http_target.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+A U1DB backend for encrypting data before sending to server and decrypting
+after receiving.
+"""
+
+
+import json
+import base64
+import logging
+
+from uuid import uuid4
+from functools import partial
+
+from twisted.internet import defer
+from twisted.internet import reactor
+
+from u1db import errors
+from u1db import SyncTarget
+from u1db.remote import utils
+
+from leap.soledad.common.document import SoledadDocument
+
+from leap.soledad.client.crypto import is_symmetrically_encrypted
+from leap.soledad.client.crypto import encrypt_doc
+from leap.soledad.client.crypto import decrypt_doc
+from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
+from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
+from leap.soledad.client.events import signal
+from leap.soledad.client.encdecpool import SyncDecrypterPool
+from leap.soledad.client.http_client import httpRequest
+from leap.soledad.client.http_client import configure_certificate
+
+
+logger = logging.getLogger(__name__)
+
+
+class SoledadHTTPSyncTarget(SyncTarget):
+ """
+ A SyncTarget that encrypts data before sending and decrypts data after
+ receiving.
+
+ Normally encryption will have been written to the sync database upon
+ document modification. The sync database is also used to write temporarily
+ the parsed documents that the remote send us, before being decrypted and
+ written to the main database.
+ """
+
+ def __init__(self, url, source_replica_uid, creds, crypto, cert_file,
+ sync_db=None, sync_enc_pool=None):
+ """
+ Initialize the sync target.
+
+ :param url: The server sync url.
+ :type url: str
+ :param source_replica_uid: The source replica uid which we use when
+ deferring decryption.
+ :type source_replica_uid: str
+ :param url: The url of the target replica to sync with.
+ :type url: str
+ :param creds: A dictionary containing the uuid and token.
+ :type creds: creds
+ :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
+ document contents when syncing.
+ :type crypto: soledad.crypto.SoledadCrypto
+ :param cert_file: Path to the certificate of the ca used to validate
+ the SSL certificate used by the remote soledad
+ server.
+ :type cert_file: str
+ :param sync_db: Optional. handler for the db with the symmetric
+ encryption of the syncing documents. If
+ None, encryption will be done in-place,
+ instead of retreiving it from the dedicated
+ database.
+ :type sync_db: Sqlite handler
+ :param verify_ssl: Whether we should perform SSL server certificate
+ verification.
+ :type verify_ssl: bool
+ """
+ if url.endswith("/"):
+ url = url[:-1]
+ self._url = str(url) + "/sync-from/" + source_replica_uid
+ self.source_replica_uid = source_replica_uid
+ self._auth_header = None
+ self.set_creds(creds)
+ self._crypto = crypto
+ self._sync_db = sync_db
+ self._sync_enc_pool = sync_enc_pool
+ self._insert_doc_cb = None
+ # asynchronous encryption/decryption attributes
+ self._decryption_callback = None
+ self._sync_decr_pool = None
+ configure_certificate(cert_file)
+
+ def set_creds(self, creds):
+ """
+ Update credentials.
+
+ :param creds: A dictionary containing the uuid and token.
+ :type creds: dict
+ """
+ uuid = creds['token']['uuid']
+ token = creds['token']['token']
+ auth = '%s:%s' % (uuid, token)
+ b64_token = base64.b64encode(auth)
+ self._auth_header = {'Authorization': ['Token %s' % b64_token]}
+
+ @property
+ def _defer_encryption(self):
+ return self._sync_enc_pool is not None
+
+ #
+ # SyncTarget API
+ #
+
+ @defer.inlineCallbacks
+ def get_sync_info(self, source_replica_uid):
+ """
+ Return information about known state of remote database.
+
+ Return the replica_uid and the current database generation of the
+ remote database, and its last-seen database generation for the client
+ replica.
+
+ :param source_replica_uid: The client-size replica uid.
+ :type source_replica_uid: str
+
+ :return: A deferred which fires with (target_replica_uid,
+ target_replica_generation, target_trans_id,
+ source_replica_last_known_generation,
+ source_replica_last_known_transaction_id)
+ :rtype: twisted.internet.defer.Deferred
+ """
+ raw = yield httpRequest(self._url, headers=self._auth_header)
+ res = json.loads(raw)
+ defer.returnValue([
+ res['target_replica_uid'],
+ res['target_replica_generation'],
+ res['target_replica_transaction_id'],
+ res['source_replica_generation'],
+ res['source_transaction_id']
+ ])
+
+ def record_sync_info(
+ self, source_replica_uid, source_replica_generation,
+ source_replica_transaction_id):
+ """
+ Record tip information for another replica.
+
+ After sync_exchange has been processed, the caller will have
+ received new content from this replica. This call allows the
+ source replica instigating the sync to inform us what their
+ generation became after applying the documents we returned.
+
+ This is used to allow future sync operations to not need to repeat data
+ that we just talked about. It also means that if this is called at the
+ wrong time, there can be database records that will never be
+ synchronized.
+
+ :param source_replica_uid: The identifier for the source replica.
+ :type source_replica_uid: str
+ :param source_replica_generation: The database generation for the
+ source replica.
+ :type source_replica_generation: int
+ :param source_replica_transaction_id: The transaction id associated
+ with the source replica
+ generation.
+ :type source_replica_transaction_id: str
+
+ :return: A deferred which fires with the result of the query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ data = json.dumps({
+ 'generation': source_replica_generation,
+ 'transaction_id': source_replica_transaction_id
+ })
+ headers = self._auth_header.copy()
+ headers.update({'content-type': ['application/json']})
+ return httpRequest(
+ self._url,
+ method='PUT',
+ headers=headers,
+ body=data)
+
+ @defer.inlineCallbacks
+ def sync_exchange(self, docs_by_generation, source_replica_uid,
+ last_known_generation, last_known_trans_id,
+ insert_doc_cb, ensure_callback=None,
+ defer_decryption=True, sync_id=None):
+ """
+ Find out which documents the remote database does not know about,
+ encrypt and send them. After that, receive documents from the remote
+ database.
+
+ :param docs_by_generations: A list of (doc_id, generation, trans_id)
+ of local documents that were changed since
+ the last local generation the remote
+ replica knows about.
+ :type docs_by_generations: list of tuples
+
+ :param source_replica_uid: The uid of the source replica.
+ :type source_replica_uid: str
+
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+
+ :param insert_doc_cb: A callback for inserting received documents from
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
+ :type insert_doc_cb: function
+
+ :param ensure_callback: A callback that ensures we know the target
+ replica uid if the target replica was just
+ created.
+ :type ensure_callback: function
+
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+
+ :return: A deferred which fires with the new generation and
+ transaction id of the target replica.
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ self._ensure_callback = ensure_callback
+
+ if sync_id is None:
+ sync_id = str(uuid4())
+ self.source_replica_uid = source_replica_uid
+
+ # save a reference to the callback so we can use it after decrypting
+ self._insert_doc_cb = insert_doc_cb
+
+ gen_after_send, trans_id_after_send = yield self._send_docs(
+ docs_by_generation,
+ last_known_generation,
+ last_known_trans_id,
+ sync_id)
+
+ cur_target_gen, cur_target_trans_id = yield self._receive_docs(
+ last_known_generation, last_known_trans_id,
+ ensure_callback, sync_id,
+ defer_decryption=defer_decryption)
+
+ # update gen and trans id info in case we just sent and did not
+ # receive docs.
+ if gen_after_send is not None and gen_after_send > cur_target_gen:
+ cur_target_gen = gen_after_send
+ cur_target_trans_id = trans_id_after_send
+
+ defer.returnValue([cur_target_gen, cur_target_trans_id])
+
+ #
+ # methods to send docs
+ #
+
+ def _prepare(self, comma, entries, **dic):
+ entry = comma + '\r\n' + json.dumps(dic)
+ entries.append(entry)
+ return len(entry)
+
+ @defer.inlineCallbacks
+ def _send_docs(self, docs_by_generation, last_known_generation,
+ last_known_trans_id, sync_id):
+
+ if not docs_by_generation:
+ defer.returnValue([None, None])
+
+ headers = self._auth_header.copy()
+ headers.update({'content-type': ['application/x-soledad-sync-put']})
+ # add remote replica metadata to the request
+ first_entries = ['[']
+ self._prepare(
+ '', first_entries,
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ idx = 0
+ total = len(docs_by_generation)
+ for doc, gen, trans_id in docs_by_generation:
+ idx += 1
+ result = yield self._send_one_doc(
+ headers, first_entries, doc,
+ gen, trans_id, total, idx)
+ if self._defer_encryption:
+ self._sync_enc_pool.delete_encrypted_doc(
+ doc.doc_id, doc.rev)
+ signal(SOLEDAD_SYNC_SEND_STATUS,
+ "Soledad sync send status: %d/%d"
+ % (idx, total))
+ response_dict = json.loads(result)[0]
+ gen_after_send = response_dict['new_generation']
+ trans_id_after_send = response_dict['new_transaction_id']
+ defer.returnValue([gen_after_send, trans_id_after_send])
+
+ @defer.inlineCallbacks
+ def _send_one_doc(self, headers, first_entries, doc, gen, trans_id,
+ number_of_docs, doc_idx):
+ entries = first_entries[:]
+ # add the document to the request
+ content = yield self._encrypt_doc(doc)
+ self._prepare(
+ ',', entries,
+ id=doc.doc_id, rev=doc.rev, content=content, gen=gen,
+ trans_id=trans_id, number_of_docs=number_of_docs,
+ doc_idx=doc_idx)
+ entries.append('\r\n]')
+ data = ''.join(entries)
+ result = yield httpRequest(
+ self._url,
+ method='POST',
+ headers=headers,
+ body=data)
+ defer.returnValue(result)
+
+ def _encrypt_doc(self, doc):
+ d = None
+ if doc.is_tombstone():
+ d = defer.succeed(None)
+ elif not self._defer_encryption:
+ # fallback case, for tests
+ d = defer.succeed(encrypt_doc(self._crypto, doc))
+ else:
+
+ def _maybe_encrypt_doc_inline(doc_json):
+ if doc_json is None:
+ # the document is not marked as tombstone, but we got
+ # nothing from the sync db. As it is not encrypted
+ # yet, we force inline encryption.
+ return encrypt_doc(self._crypto, doc)
+ return doc_json
+
+ d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev)
+ d.addCallback(_maybe_encrypt_doc_inline)
+ return d
+
+ #
+ # methods to receive doc
+ #
+
+ @defer.inlineCallbacks
+ def _receive_docs(self, last_known_generation, last_known_trans_id,
+ ensure_callback, sync_id, defer_decryption):
+
+ self._queue_for_decrypt = defer_decryption \
+ and self._sync_db is not None
+
+ new_generation = last_known_generation
+ new_transaction_id = last_known_trans_id
+
+ if self._queue_for_decrypt:
+ logger.debug(
+ "Soledad sync: will queue received docs for decrypting.")
+
+ if defer_decryption:
+ self._setup_sync_decr_pool()
+
+ headers = self._auth_header.copy()
+ headers.update({'content-type': ['application/x-soledad-sync-get']})
+
+ #---------------------------------------------------------------------
+ # maybe receive the first document
+ #---------------------------------------------------------------------
+
+ # we fetch the first document before fetching the rest because we need
+ # to know the total number of documents to be received, and this
+ # information comes as metadata to each request.
+
+ d = self._receive_one_doc(
+ headers, last_known_generation, last_known_trans_id,
+ sync_id, 0)
+ d.addCallback(partial(self._insert_received_doc, 1, 1))
+ number_of_changes, ngen, ntrans = yield d
+
+ if defer_decryption:
+ self._sync_decr_pool.start(number_of_changes)
+
+ #---------------------------------------------------------------------
+ # maybe receive the rest of the documents
+ #---------------------------------------------------------------------
+
+ # launch many asynchronous fetches and inserts of received documents
+ # in the temporary sync db. Will wait for all results before
+ # continuing.
+
+ received = 1
+ deferreds = []
+ while received < number_of_changes:
+ d = self._receive_one_doc(
+ headers, last_known_generation,
+ last_known_trans_id, sync_id, received)
+ d.addCallback(
+ partial(
+ self._insert_received_doc,
+ received + 1, # the index of the current received doc
+ number_of_changes))
+ deferreds.append(d)
+ received += 1
+ results = yield defer.gatherResults(deferreds)
+
+ # get generation and transaction id of target after insertions
+ if deferreds:
+ _, new_generation, new_transaction_id = results.pop()
+
+ #---------------------------------------------------------------------
+ # wait for async decryption to finish
+ #---------------------------------------------------------------------
+
+ # below we do a trick so we can wait for the SyncDecrypterPool to
+ # finish its work before finally returning the new generation and
+ # transaction id of the remote replica. To achieve that, we create a
+ # Deferred that will return the results of the sync and, if we are
+ # decrypting asynchronously, we use reactor.callLater() to
+ # periodically poll the decrypter and check if it has finished its
+ # work. When it has finished, we either call the callback or errback
+ # of that deferred. In case we are not asynchronously decrypting, we
+ # just fire the deferred.
+
+ def _shutdown_and_finish(res):
+ self._sync_decr_pool.close()
+ return new_generation, new_transaction_id
+
+ d = defer.Deferred()
+ d.addCallback(_shutdown_and_finish)
+
+ def _wait_or_finish():
+ if not self._sync_decr_pool.has_finished():
+ reactor.callLater(
+ SyncDecrypterPool.DECRYPT_LOOP_PERIOD,
+ _wait_or_finish)
+ else:
+ if not self._sync_decr_pool.failed():
+ d.callback(None)
+ else:
+ d.errback(self._sync_decr_pool.failure)
+
+ if defer_decryption:
+ _wait_or_finish()
+ else:
+ d.callback(None)
+
+ new_generation, new_transaction_id = yield d
+ defer.returnValue([new_generation, new_transaction_id])
+
+ def _receive_one_doc(self, headers, last_known_generation,
+ last_known_trans_id, sync_id, received):
+ entries = ['[']
+ # add remote replica metadata to the request
+ self._prepare(
+ '', entries,
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ # inform server of how many documents have already been received
+ self._prepare(
+ ',', entries, received=received)
+ entries.append('\r\n]')
+ # send headers
+ return httpRequest(
+ self._url,
+ method='POST',
+ headers=headers,
+ body=''.join(entries))
+
+ def _insert_received_doc(self, idx, total, response):
+ """
+ Insert a received document into the local replica.
+
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+ """
+ new_generation, new_transaction_id, number_of_changes, doc_id, \
+ rev, content, gen, trans_id = \
+ self._parse_received_doc_response(response)
+ if doc_id is not None:
+ # decrypt incoming document and insert into local database
+ # -------------------------------------------------------------
+ # symmetric decryption of document's contents
+ # -------------------------------------------------------------
+ # If arriving content was symmetrically encrypted, we decrypt it.
+ # We do it inline if defer_decryption flag is False or no sync_db
+ # was defined, otherwise we defer it writing it to the received
+ # docs table.
+ doc = SoledadDocument(doc_id, rev, content)
+ if is_symmetrically_encrypted(doc):
+ if self._queue_for_decrypt:
+ self._sync_decr_pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id,
+ idx)
+ else:
+ # defer_decryption is False or no-sync-db fallback
+ doc.set_json(decrypt_doc(self._crypto, doc))
+ self._insert_doc_cb(doc, gen, trans_id)
+ else:
+ # not symmetrically encrypted doc, insert it directly
+ # or save it in the decrypted stage.
+ if self._queue_for_decrypt:
+ self._sync_decr_pool.insert_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id,
+ idx)
+ else:
+ self._insert_doc_cb(doc, gen, trans_id)
+ # -------------------------------------------------------------
+ # end of symmetric decryption
+ # -------------------------------------------------------------
+ msg = "%d/%d" % (idx, total)
+ signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg)
+ logger.debug("Soledad sync receive status: %s" % msg)
+ return number_of_changes, new_generation, new_transaction_id
+
+ def _parse_received_doc_response(self, response):
+ """
+ Parse the response from the server containing the received document.
+
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+
+ :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev,
+ content, gen, trans_id)
+ :rtype: tuple
+ """
+ # decode incoming stream
+ parts = response.splitlines()
+ if not parts or parts[0] != '[' or parts[-1] != ']':
+ raise errors.BrokenSyncStream
+ data = parts[1:-1]
+ # decode metadata
+ line, comma = utils.check_and_strip_comma(data[0])
+ metadata = None
+ try:
+ metadata = json.loads(line)
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
+ number_of_changes = metadata['number_of_changes']
+ except (json.JSONDecodeError, KeyError):
+ raise errors.BrokenSyncStream
+ # make sure we have replica_uid from fresh new dbs
+ if self._ensure_callback and 'replica_uid' in metadata:
+ self._ensure_callback(metadata['replica_uid'])
+ # parse incoming document info
+ doc_id = None
+ rev = None
+ content = None
+ gen = None
+ trans_id = None
+ if number_of_changes > 0:
+ try:
+ entry = json.loads(data[1])
+ doc_id = entry['id']
+ rev = entry['rev']
+ content = entry['content']
+ gen = entry['gen']
+ trans_id = entry['trans_id']
+ except (IndexError, KeyError):
+ raise errors.BrokenSyncStream
+ return new_generation, new_transaction_id, number_of_changes, \
+ doc_id, rev, content, gen, trans_id
+
+ def _setup_sync_decr_pool(self):
+ """
+ Set up the SyncDecrypterPool for deferred decryption.
+ """
+ if self._sync_decr_pool is None and self._sync_db is not None:
+ # initialize syncing queue decryption pool
+ self._sync_decr_pool = SyncDecrypterPool(
+ self._crypto,
+ self._sync_db,
+ insert_doc_cb=self._insert_doc_cb,
+ source_replica_uid=self.source_replica_uid)
diff --git a/client/src/leap/soledad/client/pragmas.py b/client/src/leap/soledad/client/pragmas.py
index 2e9c53a3..55397d10 100644
--- a/client/src/leap/soledad/client/pragmas.py
+++ b/client/src/leap/soledad/client/pragmas.py
@@ -19,10 +19,53 @@ Different pragmas used in the initialization of the SQLCipher database.
"""
import logging
import string
+import threading
+import os
+
+from leap.soledad.common import soledad_assert
+
logger = logging.getLogger(__name__)
+_db_init_lock = threading.Lock()
+
+
+def set_init_pragmas(conn, opts=None, extra_queries=None):
+ """
+ Set the initialization pragmas.
+
+ This includes the crypto pragmas, and any other options that must
+ be passed early to sqlcipher db.
+ """
+ soledad_assert(opts is not None)
+ extra_queries = [] if extra_queries is None else extra_queries
+ with _db_init_lock:
+ # only one execution path should initialize the db
+ _set_init_pragmas(conn, opts, extra_queries)
+
+
+def _set_init_pragmas(conn, opts, extra_queries):
+
+ sync_off = os.environ.get('LEAP_SQLITE_NOSYNC')
+ memstore = os.environ.get('LEAP_SQLITE_MEMSTORE')
+ nowal = os.environ.get('LEAP_SQLITE_NOWAL')
+
+ set_crypto_pragmas(conn, opts)
+
+ if not nowal:
+ set_write_ahead_logging(conn)
+ if sync_off:
+ set_synchronous_off(conn)
+ else:
+ set_synchronous_normal(conn)
+ if memstore:
+ set_mem_temp_store(conn)
+
+ for query in extra_queries:
+ conn.cursor().execute(query)
+
+
def set_crypto_pragmas(db_handle, sqlcipher_opts):
"""
Set cryptographic params (key, cipher, KDF number of iterations and
diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py
index af781a26..96f7e906 100644
--- a/client/src/leap/soledad/client/secrets.py
+++ b/client/src/leap/soledad/client/secrets.py
@@ -246,22 +246,26 @@ class SoledadSecrets(object):
:return: Whether there's a storage secret for symmetric encryption.
:rtype: bool
"""
- if self._secret_id is None or self._secret_id not in self._secrets:
+ logger.info("Checking if there's a secret in local storage...")
+ if (self._secret_id is None or self._secret_id not in self._secrets) \
+ and os.path.isfile(self._secrets_path):
try:
self._load_secrets() # try to load from disk
except IOError as e:
logger.warning(
'IOError while loading secrets from disk: %s' % str(e))
- return False
- return self.storage_secret is not None
+
+ if self.storage_secret is not None:
+ logger.info("Found a secret in local storage.")
+ return True
+
+ logger.info("Could not find a secret in local storage.")
+ return False
def _load_secrets(self):
"""
Load storage secrets from local file.
"""
- # does the file exist in disk?
- if not os.path.isfile(self._secrets_path):
- raise IOError('File does not exist: %s' % self._secrets_path)
# read storage secrets from file
content = None
with open(self._secrets_path, 'r') as f:
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index db3cb5cb..8e7d39c2 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -42,7 +42,6 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases
handled by Soledad should be created by SQLCipher >= 2.0.
"""
import logging
-import multiprocessing
import os
import threading
import json
@@ -54,19 +53,17 @@ from u1db.backends import sqlite_backend
from hashlib import sha256
from contextlib import contextmanager
from collections import defaultdict
-from httplib import CannotSendRequest
+from functools import partial
from pysqlcipher import dbapi2 as sqlcipher_dbapi2
from twisted.internet import reactor
-from twisted.internet.task import LoopingCall
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
-from twisted.python import log
+from twisted.enterprise import adbapi
-from leap.soledad.client import crypto
-from leap.soledad.client.target import SoledadSyncTarget
-from leap.soledad.client.target import PendingReceivedDocsSyncError
+from leap.soledad.client import encdecpool
+from leap.soledad.client.http_target import SoledadHTTPSyncTarget
from leap.soledad.client.sync import SoledadSynchronizer
from leap.soledad.client import pragmas
@@ -102,46 +99,14 @@ def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True):
conn = sqlcipher_dbapi2.connect(
opts.path, check_same_thread=check_same_thread)
- set_init_pragmas(conn, opts, extra_queries=on_init)
+ pragmas.set_init_pragmas(conn, opts, extra_queries=on_init)
return conn
-_db_init_lock = threading.Lock()
-
-
-def set_init_pragmas(conn, opts=None, extra_queries=None):
- """
- Set the initialization pragmas.
-
- This includes the crypto pragmas, and any other options that must
- be passed early to sqlcipher db.
- """
- soledad_assert(opts is not None)
- extra_queries = [] if extra_queries is None else extra_queries
- with _db_init_lock:
- # only one execution path should initialize the db
- _set_init_pragmas(conn, opts, extra_queries)
-
-
-def _set_init_pragmas(conn, opts, extra_queries):
-
- sync_off = os.environ.get('LEAP_SQLITE_NOSYNC')
- memstore = os.environ.get('LEAP_SQLITE_MEMSTORE')
- nowal = os.environ.get('LEAP_SQLITE_NOWAL')
-
- pragmas.set_crypto_pragmas(conn, opts)
-
- if not nowal:
- pragmas.set_write_ahead_logging(conn)
- if sync_off:
- pragmas.set_synchronous_off(conn)
- else:
- pragmas.set_synchronous_normal(conn)
- if memstore:
- pragmas.set_mem_temp_store(conn)
-
- for query in extra_queries:
- conn.cursor().execute(query)
+def initialize_sqlcipher_adbapi_db(opts, extra_queries=None):
+ from leap.soledad.client import sqlcipher_adbapi
+ return sqlcipher_adbapi.getConnectionPool(
+ opts, extra_queries=extra_queries)
class SQLCipherOptions(object):
@@ -151,22 +116,32 @@ class SQLCipherOptions(object):
@classmethod
def copy(cls, source, path=None, key=None, create=None,
- is_raw_key=None, cipher=None, kdf_iter=None, cipher_page_size=None,
- defer_encryption=None, sync_db_key=None):
+ is_raw_key=None, cipher=None, kdf_iter=None,
+ cipher_page_size=None, defer_encryption=None, sync_db_key=None):
"""
Return a copy of C{source} with parameters different than None
replaced by new values.
"""
- return SQLCipherOptions(
- path if path else source.path,
- key if key else source.key,
- create=create if create else source.create,
- is_raw_key=is_raw_key if is_raw_key else source.is_raw_key,
- cipher=cipher if cipher else source.cipher,
- kdf_iter=kdf_iter if kdf_iter else source.kdf_iter,
- cipher_page_size=cipher_page_size if cipher_page_size else source.cipher_page_size,
- defer_encryption=defer_encryption if defer_encryption else source.defer_encryption,
- sync_db_key=sync_db_key if sync_db_key else source.sync_db_key)
+ local_vars = locals()
+ args = []
+ kwargs = {}
+
+ for name in ["path", "key"]:
+ val = local_vars[name]
+ if val is not None:
+ args.append(val)
+ else:
+ args.append(getattr(source, name))
+
+ for name in ["create", "is_raw_key", "cipher", "kdf_iter",
+ "cipher_page_size", "defer_encryption", "sync_db_key"]:
+ val = local_vars[name]
+ if val is not None:
+ kwargs[name] = val
+ else:
+ kwargs[name] = getattr(source, name)
+
+ return SQLCipherOptions(*args, **kwargs)
def __init__(self, path, key, create=True, is_raw_key=False,
cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024,
@@ -307,10 +282,9 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:rtype: str
"""
doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc)
-
- # TODO XXX move to API XXX
if self.defer_encryption:
- self.sync_queue.put_nowait(doc)
+ # TODO move to api?
+ self._sync_enc_pool.enqueue_doc_for_encryption(doc)
return doc_rev
#
@@ -440,7 +414,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
Soledad syncer implementation.
"""
- _sync_loop = None
_sync_enc_pool = None
"""
@@ -450,13 +423,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db'
"""
- A dictionary that hold locks which avoid multiple sync attempts from the
- same database replica.
- """
- # XXX We do not need the lock here now. Remove.
- encrypting_lock = threading.Lock()
-
- """
Period or recurrence of the Looping Call that will do the encryption to the
syncdb (in seconds).
"""
@@ -468,19 +434,18 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
"""
syncing_lock = defaultdict(threading.Lock)
- def __init__(self, opts, soledad_crypto, replica_uid,
+ def __init__(self, opts, soledad_crypto, replica_uid, cert_file,
defer_encryption=False):
self._opts = opts
self._path = opts.path
self._crypto = soledad_crypto
self.__replica_uid = replica_uid
+ self._cert_file = cert_file
self._sync_db_key = opts.sync_db_key
self._sync_db = None
- self._sync_db_write_lock = None
self._sync_enc_pool = None
- self.sync_queue = None
# we store syncers in a dictionary indexed by the target URL. We also
# store a hash of the auth info in case auth info expires and we need
@@ -490,8 +455,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# self._syncers = {'<url>': ('<auth_hash>', syncer), ...}
self._syncers = {}
- self._sync_db_write_lock = threading.Lock()
- self.sync_queue = multiprocessing.Queue()
self.running = False
self._sync_threadpool = None
@@ -503,25 +466,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self._db_handle = None
self._initialize_main_db()
- if defer_encryption:
- self._initialize_sync_db(opts)
+ # the sync_db is used both for deferred encryption and decryption, so
+ # we want to initialize it anyway to allow for all combinations of
+ # deferred encryption and decryption configurations.
+ self._initialize_sync_db(opts)
+ if defer_encryption:
# initialize syncing queue encryption pool
- self._sync_enc_pool = crypto.SyncEncrypterPool(
- self._crypto, self._sync_db, self._sync_db_write_lock)
-
- # -----------------------------------------------------------------
- # From the documentation: If f returns a deferred, rescheduling
- # will not take place until the deferred has fired. The result
- # value is ignored.
-
- # TODO use this to avoid multiple sync attempts if the sync has not
- # finished!
- # -----------------------------------------------------------------
-
- # XXX this was called sync_watcher --- trace any remnants
- self._sync_loop = LoopingCall(self._encrypt_syncing_docs)
- self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD)
+ self._sync_enc_pool = encdecpool.SyncEncrypterPool(
+ self._crypto, self._sync_db)
self.shutdownID = None
@@ -584,11 +537,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# somewhere else
sync_opts = SQLCipherOptions.copy(
opts, path=sync_db_path, create=True)
- self._sync_db = initialize_sqlcipher_db(
- sync_opts, on_init=self._sync_db_extra_init,
- check_same_thread=False)
- pragmas.set_crypto_pragmas(self._sync_db, opts)
- # ---------------------------------------------------------
+ self._sync_db = getConnectionPool(
+ sync_opts, extra_queries=self._sync_db_extra_init)
@property
def _sync_db_extra_init(self):
@@ -599,15 +549,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
:rtype: tuple of strings
"""
maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)"
- encr = crypto.SyncEncrypterPool
- decr = crypto.SyncDecrypterPool
+ encr = encdecpool.SyncEncrypterPool
+ decr = encdecpool.SyncDecrypterPool
sql_encr_table_query = (maybe_create % (
encr.TABLE_NAME, encr.FIELD_NAMES))
sql_decr_table_query = (maybe_create % (
decr.TABLE_NAME, decr.FIELD_NAMES))
return (sql_encr_table_query, sql_decr_table_query)
- def sync(self, url, creds=None, autocreate=True, defer_decryption=True):
+ def sync(self, url, creds=None, defer_decryption=True):
"""
Synchronize documents with remote replica exposed at url.
@@ -621,12 +571,9 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
:param url: The url of the target replica to sync with.
:type url: str
- :param creds:
- optional dictionary giving credentials.
- to authorize the operation with the server.
+ :param creds: optional dictionary giving credentials to authorize the
+ operation with the server.
:type creds: dict
- :param autocreate: Ask the target to create the db if non-existent.
- :type autocreate: bool
:param defer_decryption:
Whether to defer the decryption process using the intermediate
database. If False, decryption will be done inline.
@@ -637,49 +584,11 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
before the synchronisation was performed.
:rtype: Deferred
"""
- kwargs = {'creds': creds, 'autocreate': autocreate,
- 'defer_decryption': defer_decryption}
- return self._defer_to_sync_threadpool(self._sync, url, **kwargs)
-
- def _sync(self, url, creds=None, autocreate=True, defer_decryption=True):
- res = None
-
# the following context manager blocks until the syncing lock can be
# acquired.
- # TODO review, I think this is no longer needed with a 1-thread
- # threadpool.
-
- log.msg("in _sync")
- self.__url = url
with self._syncer(url, creds=creds) as syncer:
# XXX could mark the critical section here...
- try:
- log.msg('syncer sync...')
- res = syncer.sync(autocreate=autocreate,
- defer_decryption=defer_decryption)
-
- except PendingReceivedDocsSyncError:
- logger.warning("Local sync db is not clear, skipping sync...")
- return
- except CannotSendRequest:
- logger.warning("Connection with sync target couldn't be "
- "established. Resetting connection...")
- # closing the connection it will be recreated in the next try
- syncer.sync_target.close()
- return
-
- return res
-
- def stop_sync(self):
- """
- Interrupt all ongoing syncs.
- """
- self._stop_sync()
-
- def _stop_sync(self):
- for url in self._syncers:
- _, syncer = self._syncers[url]
- syncer.stop()
+ return syncer.sync(defer_decryption=defer_decryption)
@contextmanager
def _syncer(self, url, creds=None):
@@ -690,6 +599,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
one instance synchronizing the same database replica at the same time.
Because of that, this method blocks until the syncing lock can be
acquired.
+
+ :param creds: optional dictionary giving credentials to authorize the
+ operation with the server.
+ :type creds: dict
"""
with self.syncing_lock[self._path]:
syncer = self._get_syncer(url, creds=creds)
@@ -723,16 +636,17 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
h = sha256(json.dumps([url, creds])).hexdigest()
cur_h, syncer = self._syncers.get(url, (None, None))
if syncer is None or h != cur_h:
- wlock = self._sync_db_write_lock
syncer = SoledadSynchronizer(
self,
- SoledadSyncTarget(url,
- # XXX is the replica_uid ready?
- self._replica_uid,
- creds=creds,
- crypto=self._crypto,
- sync_db=self._sync_db,
- sync_db_write_lock=wlock))
+ SoledadHTTPSyncTarget(
+ url,
+ # XXX is the replica_uid ready?
+ self._replica_uid,
+ creds=creds,
+ crypto=self._crypto,
+ cert_file=self._cert_file,
+ sync_db=self._sync_db,
+ sync_enc_pool=self._sync_enc_pool))
self._syncers[url] = (h, syncer)
# in order to reuse the same synchronizer multiple times we have to
# reset its state (i.e. the number of documents received from target
@@ -744,34 +658,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# Symmetric encryption of syncing docs
#
- def _encrypt_syncing_docs(self):
- """
- Process the syncing queue and send the documents there
- to be encrypted in the sync db. They will be read by the
- SoledadSyncTarget during the sync_exchange.
-
- Called periodically from the LoopingCall self._sync_loop.
- """
- # TODO should return a deferred that would firewhen the encryption is
- # done. See note on __init__
-
- lock = self.encrypting_lock
- # optional wait flag used to avoid blocking
- if not lock.acquire(False):
- return
- else:
- queue = self.sync_queue
- try:
- while not queue.empty():
- doc = queue.get_nowait()
- self._sync_enc_pool.encrypt_doc(doc)
-
- except Exception as exc:
- logger.error("Error while encrypting docs to sync")
- logger.exception(exc)
- finally:
- lock.release()
-
def get_generation(self):
# FIXME
# XXX this SHOULD BE a callback
@@ -789,16 +675,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
"""
Close the syncer and syncdb orderly
"""
- # stop the sync loop for deferred encryption
- if self._sync_loop is not None:
- self._sync_loop.reset()
- self._sync_loop.stop()
- self._sync_loop = None
# close all open syncers
for url in self._syncers:
- _, syncer = self._syncers[url]
- syncer.close()
- self._syncers = []
+ del self._syncers[url]
+
# stop the encryption pool
if self._sync_enc_pool is not None:
self._sync_enc_pool.close()
@@ -808,11 +688,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
if self._sync_db is not None:
self._sync_db.close()
self._sync_db = None
- # close the sync queue
- if self.sync_queue is not None:
- self.sync_queue.close()
- del self.sync_queue
- self.sync_queue = None
class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):
@@ -903,3 +778,40 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False,
has_conflicts=has_conflicts, syncable=syncable)
sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)
+
+
+#
+# twisted.enterprise.adbapi SQLCipher implementation
+#
+
+SQLCIPHER_CONNECTION_TIMEOUT = 10
+
+
+def getConnectionPool(opts, extra_queries=None):
+ openfun = partial(
+ pragmas.set_init_pragmas,
+ opts=opts,
+ extra_queries=extra_queries)
+ return SQLCipherConnectionPool(
+ database=opts.path,
+ check_same_thread=False,
+ cp_openfun=openfun,
+ timeout=SQLCIPHER_CONNECTION_TIMEOUT)
+
+
+class SQLCipherConnection(adbapi.Connection):
+ pass
+
+
+class SQLCipherTransaction(adbapi.Transaction):
+ pass
+
+
+class SQLCipherConnectionPool(adbapi.ConnectionPool):
+
+ connectionFactory = SQLCipherConnection
+ transactionFactory = SQLCipherTransaction
+
+ def __init__(self, *args, **kwargs):
+ adbapi.ConnectionPool.__init__(
+ self, "pysqlcipher.dbapi2", *args, **kwargs)
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index d3f106da..53172f31 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -16,17 +16,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Soledad synchronization utilities.
-
-Extend u1db Synchronizer with the ability to:
-
- * Postpone the update of the known replica uid until all the decryption of
- the incoming messages has been processed.
-
- * Be interrupted and recovered.
"""
import logging
-import traceback
-from threading import Lock
+
+from twisted.internet import defer
from u1db import errors
from u1db.sync import Synchronizer
@@ -47,17 +40,8 @@ class SoledadSynchronizer(Synchronizer):
Also modified to allow for interrupting the synchronization process.
"""
- # TODO can delegate the syncing to the api object, living in the reactor
- # thread, and use a simple flag.
- syncing_lock = Lock()
-
- def stop(self):
- """
- Stop the current sync in progress.
- """
- self.sync_target.stop()
-
- def sync(self, autocreate=False, defer_decryption=True):
+ @defer.inlineCallbacks
+ def sync(self, defer_decryption=True):
"""
Synchronize documents between source and target.
@@ -69,48 +53,22 @@ class SoledadSynchronizer(Synchronizer):
This is done to allow the ongoing parallel decryption of the incoming
docs to proceed without `InvalidGeneration` conflicts.
- :param autocreate: Whether the target replica should be created or not.
- :type autocreate: bool
:param defer_decryption: Whether to defer the decryption process using
the intermediate database. If False,
decryption will be done inline.
:type defer_decryption: bool
- """
- self.syncing_lock.acquire()
- try:
- return self._sync(autocreate=autocreate,
- defer_decryption=defer_decryption)
- except Exception:
- # we want this exception to reach either SQLCipherU1DBSync.sync or
- # the Solead api object itself, so it is poperly handled and/or
- # logged...
- raise
- finally:
- # ... but we also want to release the syncing lock so this
- # Synchronizer may be reused later.
- self.release_syncing_lock()
-
- def _sync(self, autocreate=False, defer_decryption=True):
- """
- Helper function, called from the main `sync` method.
- See `sync` docstring.
+
+ :return: A deferred which will fire after the sync has finished.
+ :rtype: twisted.internet.defer.Deferred
"""
sync_target = self.sync_target
# get target identifier, its current generation,
# and its last-seen database generation for this source
ensure_callback = None
- try:
- (self.target_replica_uid, target_gen, target_trans_id,
- target_my_gen, target_my_trans_id) = \
- sync_target.get_sync_info(self.source._replica_uid)
- except errors.DatabaseDoesNotExist:
- if not autocreate:
- raise
- # will try to ask sync_exchange() to create the db
- self.target_replica_uid = None
- target_gen, target_trans_id = (0, '')
- target_my_gen, target_my_trans_id = (0, '')
+ (self.target_replica_uid, target_gen, target_trans_id,
+ target_my_gen, target_my_trans_id) = yield \
+ sync_target.get_sync_info(self.source._replica_uid)
logger.debug(
"Soledad target sync info:\n"
@@ -151,15 +109,15 @@ class SoledadSynchronizer(Synchronizer):
self.target_replica_uid)
logger.debug(
"Soledad source sync info:\n"
- " source target gen: %d\n"
- " source target trans_id: %s"
+ " last target gen known to source: %d\n"
+ " last target trans_id known to source: %s"
% (target_last_known_gen, target_last_known_trans_id))
# validate transaction ids
if not changes and target_last_known_gen == target_gen:
if target_trans_id != target_last_known_trans_id:
raise errors.InvalidTransactionId
- return my_gen
+ defer.returnValue(my_gen)
# prepare to send all the changed docs
changed_doc_ids = [doc_id for doc_id, _, _ in changes]
@@ -174,40 +132,26 @@ class SoledadSynchronizer(Synchronizer):
# exchange documents and try to insert the returned ones with
# the target, return target synced-up-to gen.
- #
- # The sync_exchange method may be interrupted, in which case it will
- # return a tuple of Nones.
- try:
- new_gen, new_trans_id = sync_target.sync_exchange(
- docs_by_generation, self.source._replica_uid,
- target_last_known_gen, target_last_known_trans_id,
- self._insert_doc_from_target, ensure_callback=ensure_callback,
- defer_decryption=defer_decryption)
- logger.debug(
- "Soledad source sync info after sync exchange:\n"
- " source target gen: %d\n"
- " source target trans_id: %s"
- % (new_gen, new_trans_id))
- info = {
- "target_replica_uid": self.target_replica_uid,
- "new_gen": new_gen,
- "new_trans_id": new_trans_id,
- "my_gen": my_gen
- }
- self._syncing_info = info
- if defer_decryption and not sync_target.has_syncdb():
- logger.debug("Sync target has no valid sync db, "
- "aborting defer_decryption")
- defer_decryption = False
- self.complete_sync()
- except Exception as e:
- logger.error("Soledad sync error: %s" % str(e))
- logger.error(traceback.format_exc())
- sync_target.stop()
- finally:
- sync_target.close()
-
- return my_gen
+ new_gen, new_trans_id = yield sync_target.sync_exchange(
+ docs_by_generation, self.source._replica_uid,
+ target_last_known_gen, target_last_known_trans_id,
+ self._insert_doc_from_target, ensure_callback=ensure_callback,
+ defer_decryption=defer_decryption)
+ logger.debug(
+ "Soledad source sync info after sync exchange:\n"
+ " source known target gen: %d\n"
+ " source known target trans_id: %s"
+ % (new_gen, new_trans_id))
+ info = {
+ "target_replica_uid": self.target_replica_uid,
+ "new_gen": new_gen,
+ "new_trans_id": new_trans_id,
+ "my_gen": my_gen
+ }
+ self._syncing_info = info
+ yield self.complete_sync()
+
+ defer.returnValue(my_gen)
def complete_sync(self):
"""
@@ -215,6 +159,9 @@ class SoledadSynchronizer(Synchronizer):
(a) record last known generation and transaction uid for the remote
replica, and
(b) make target aware of our current reached generation.
+
+ :return: A deferred which will fire when the sync has been completed.
+ :rtype: twisted.internet.defer.Deferred
"""
logger.debug("Completing deferred last step in SYNC...")
@@ -225,39 +172,23 @@ class SoledadSynchronizer(Synchronizer):
info["target_replica_uid"], info["new_gen"], info["new_trans_id"])
# if gapless record current reached generation with target
- self._record_sync_info_with_the_target(info["my_gen"])
-
- @property
- def syncing(self):
- """
- Return True if a sync is ongoing, False otherwise.
- :rtype: bool
- """
- # XXX FIXME we need some mechanism for timeout: should cleanup and
- # release if something in the syncdb-decrypt goes wrong. we could keep
- # track of the release date and cleanup unrealistic sync entries after
- # some time.
+ return self._record_sync_info_with_the_target(info["my_gen"])
- # TODO use cancellable deferreds instead
- locked = self.syncing_lock.locked()
- return locked
-
- def release_syncing_lock(self):
- """
- Release syncing lock if it's locked.
+ def _record_sync_info_with_the_target(self, start_generation):
"""
- if self.syncing_lock.locked():
- self.syncing_lock.release()
+ Store local replica metadata in server.
- def close(self):
- """
- Close sync target pool of workers.
- """
- self.release_syncing_lock()
- self.sync_target.close()
+ :param start_generation: The local generation when the sync was
+ started.
+ :type start_generation: int
- def __del__(self):
- """
- Cleanup: release lock.
+ :return: A deferred which will fire when the operation has been
+ completed.
+ :rtype: twisted.internet.defer.Deferred
"""
- self.release_syncing_lock()
+ cur_gen, trans_id = self.source._get_generation_info()
+ if (cur_gen == start_generation + self.num_inserted
+ and self.num_inserted > 0):
+ return self.sync_target.record_sync_info(
+ self.source._replica_uid, cur_gen, trans_id)
+ return defer.succeed(None)
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
deleted file mode 100644
index 986bd991..00000000
--- a/client/src/leap/soledad/client/target.py
+++ /dev/null
@@ -1,1517 +0,0 @@
-# -*- coding: utf-8 -*-
-# target.py
-# Copyright (C) 2013, 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-A U1DB backend for encrypting data before sending to server and decrypting
-after receiving.
-"""
-import cStringIO
-import gzip
-import logging
-import re
-import urllib
-import threading
-
-from collections import defaultdict
-from time import sleep
-from uuid import uuid4
-
-import simplejson as json
-
-from u1db import errors
-from u1db.remote import utils, http_errors
-from u1db.remote.http_target import HTTPSyncTarget
-from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
-from zope.proxy import ProxyBase
-from zope.proxy import sameProxiedObjects, setProxiedObject
-
-from twisted.internet.task import LoopingCall
-
-from leap.soledad.common.document import SoledadDocument
-from leap.soledad.client.auth import TokenBasedAuth
-from leap.soledad.client.crypto import is_symmetrically_encrypted
-from leap.soledad.client.crypto import encrypt_doc, decrypt_doc
-from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
-from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
-from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
-from leap.soledad.client.events import signal
-
-
-logger = logging.getLogger(__name__)
-
-
-def _gunzip(data):
- """
- Uncompress data that is gzipped.
-
- :param data: gzipped data
- :type data: basestring
- """
- buffer = cStringIO.StringIO()
- buffer.write(data)
- buffer.seek(0)
- try:
- data = gzip.GzipFile(mode='r', fileobj=buffer).read()
- except Exception:
- logger.warning("Error while decrypting gzipped data")
- buffer.close()
- return data
-
-
-class PendingReceivedDocsSyncError(Exception):
- pass
-
-
-class DocumentSyncerThread(threading.Thread):
- """
- A thread that knowns how to either send or receive a document during the
- sync process.
- """
-
- def __init__(self, doc_syncer, release_method, failed_method,
- idx, total, last_request_lock=None, last_callback_lock=None):
- """
- Initialize a new syncer thread.
-
- :param doc_syncer: A document syncer.
- :type doc_syncer: HTTPDocumentSyncer
- :param release_method: A method to be called when finished running.
- :type release_method: callable(DocumentSyncerThread)
- :param failed_method: A method to be called when we failed.
- :type failed_method: callable(DocumentSyncerThread)
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- :param last_request_lock: A lock to wait for before actually performing
- the request.
- :type last_request_lock: threading.Lock
- :param last_callback_lock: A lock to wait for before actually running
- the success callback.
- :type last_callback_lock: threading.Lock
- """
- threading.Thread.__init__(self)
- self._doc_syncer = doc_syncer
- self._release_method = release_method
- self._failed_method = failed_method
- self._idx = idx
- self._total = total
- self._last_request_lock = last_request_lock
- self._last_callback_lock = last_callback_lock
- self._response = None
- self._exception = None
- self._result = None
- self._success = False
- # a lock so we can signal when we're finished
- self._request_lock = threading.Lock()
- self._request_lock.acquire()
- self._callback_lock = threading.Lock()
- self._callback_lock.acquire()
- # make thread interruptable
- self._stopped = None
- self._stop_lock = threading.Lock()
-
- def run(self):
- """
- Run the HTTP request and store results.
-
- This method will block and wait for an eventual previous operation to
- finish before actually performing the request. It also traps any
- exception and register any failure with the request.
- """
- with self._stop_lock:
- if self._stopped is None:
- self._stopped = False
- else:
- return
-
- # eventually wait for the previous thread to finish
- if self._last_request_lock is not None:
- self._last_request_lock.acquire()
-
- # bail out in case we've been interrupted
- if self.stopped is True:
- return
-
- try:
- self._response = self._doc_syncer.do_request()
- self._request_lock.release()
-
- # run success callback
- if self._doc_syncer.success_callback is not None:
-
- # eventually wait for callback lock release
- if self._last_callback_lock is not None:
- self._last_callback_lock.acquire()
-
- # bail out in case we've been interrupted
- if self._stopped is True:
- return
-
- self._result = self._doc_syncer.success_callback(
- self._idx, self._total, self._response)
- self._success = True
- doc_syncer = self._doc_syncer
- self._release_method(self, doc_syncer)
- self._doc_syncer = None
- # let next thread executed its callback
- self._callback_lock.release()
-
- # trap any exception and signal failure
- except Exception as e:
- self._exception = e
- self._success = False
- # run failure callback
- if self._doc_syncer.failure_callback is not None:
-
- # eventually wait for callback lock release
- if self._last_callback_lock is not None:
- self._last_callback_lock.acquire()
-
- # bail out in case we've been interrupted
- if self.stopped is True:
- return
-
- self._doc_syncer.failure_callback(
- self._idx, self._total, self._exception)
-
- self._failed_method()
- # we do not release the callback lock here because we
- # failed and so we don't want other threads to succeed.
-
- @property
- def doc_syncer(self):
- return self._doc_syncer
-
- @property
- def response(self):
- return self._response
-
- @property
- def exception(self):
- return self._exception
-
- @property
- def callback_lock(self):
- return self._callback_lock
-
- @property
- def request_lock(self):
- return self._request_lock
-
- @property
- def success(self):
- return self._success
-
- def stop(self):
- with self._stop_lock:
- self._stopped = True
-
- @property
- def stopped(self):
- with self._stop_lock:
- return self._stopped
-
- @property
- def result(self):
- return self._result
-
-
-class DocumentSyncerPool(object):
- """
- A pool of reusable document syncers.
- """
-
- POOL_SIZE = 10
- """
- The maximum amount of syncer threads running at the same time.
- """
-
- def __init__(self, raw_url, raw_creds, query_string, headers,
- ensure_callback, stop_method):
- """
- Initialize the document syncer pool.
-
- :param raw_url: The complete raw URL for the HTTP request.
- :type raw_url: str
- :param raw_creds: The credentials for the HTTP request.
- :type raw_creds: dict
- :param query_string: The query string for the HTTP request.
- :type query_string: str
- :param headers: The headers for the HTTP request.
- :type headers: dict
- :param ensure_callback: A callback to ensure we have the correct
- target_replica_uid, if it was just created.
- :type ensure_callback: callable
-
- """
- # save syncer params
- self._raw_url = raw_url
- self._raw_creds = raw_creds
- self._query_string = query_string
- self._headers = headers
- self._ensure_callback = ensure_callback
- self._stop_method = stop_method
- # pool attributes
- self._failures = False
- self._semaphore_pool = threading.BoundedSemaphore(
- DocumentSyncerPool.POOL_SIZE)
- self._pool_access_lock = threading.Lock()
- self._doc_syncers = []
- self._threads = []
-
- def new_syncer_thread(self, idx, total, last_request_lock=None,
- last_callback_lock=None):
- """
- Yield a new document syncer thread.
-
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- :param last_request_lock: A lock to wait for before actually performing
- the request.
- :type last_request_lock: threading.Lock
- :param last_callback_lock: A lock to wait for before actually running
- the success callback.
- :type last_callback_lock: threading.Lock
- """
- t = None
- # wait for available threads
- self._semaphore_pool.acquire()
- with self._pool_access_lock:
- if self._failures is True:
- return None
- # get a syncer
- doc_syncer = self._get_syncer()
- # we rely on DocumentSyncerThread.run() to release the lock using
- # self.release_syncer so we can launch a new thread.
- t = DocumentSyncerThread(
- doc_syncer, self.release_syncer, self.cancel_threads,
- idx, total,
- last_request_lock=last_request_lock,
- last_callback_lock=last_callback_lock)
- self._threads.append(t)
- return t
-
- def _failed(self):
- with self._pool_access_lock:
- self._failures = True
-
- @property
- def failures(self):
- return self._failures
-
- def _get_syncer(self):
- """
- Get a document syncer from the pool.
-
- This method will create a new syncer whenever there is no syncer
- available in the pool.
-
- :return: A syncer.
- :rtype: HTTPDocumentSyncer
- """
- syncer = None
- # get an available syncer or create a new one
- try:
- syncer = self._doc_syncers.pop()
- except IndexError:
- syncer = HTTPDocumentSyncer(
- self._raw_url, self._raw_creds, self._query_string,
- self._headers, self._ensure_callback)
- return syncer
-
- def release_syncer(self, syncer_thread, doc_syncer):
- """
- Return a syncer to the pool after use and check for any failures.
-
- :param syncer: The syncer to be returned to the pool.
- :type syncer: HTTPDocumentSyncer
- """
- with self._pool_access_lock:
- self._doc_syncers.append(doc_syncer)
- if syncer_thread.success is True:
- self._threads.remove(syncer_thread)
- self._semaphore_pool.release()
-
- def cancel_threads(self):
- """
- Stop all threads in the pool.
- """
- # stop sync
- self._stop_method()
- stopped = []
- # stop all threads
- logger.warning("Soledad sync: cancelling sync threads...")
- with self._pool_access_lock:
- self._failures = True
- while self._threads:
- t = self._threads.pop(0)
- t.stop()
- self._doc_syncers.append(t.doc_syncer)
- stopped.append(t)
- # release locks and join
- while stopped:
- t = stopped.pop(0)
- t.request_lock.acquire(False) # just in case
- t.request_lock.release()
- t.callback_lock.acquire(False) # just in case
- t.callback_lock.release()
- # release any blocking semaphores
- for i in xrange(DocumentSyncerPool.POOL_SIZE):
- try:
- self._semaphore_pool.release()
- except ValueError:
- break
- logger.warning("Soledad sync: cancelled sync threads.")
-
- def cleanup(self):
- """
- Close and remove any syncers from the pool.
- """
- with self._pool_access_lock:
- while self._doc_syncers:
- syncer = self._doc_syncers.pop()
- syncer.close()
- del syncer
-
-
-class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):
-
- def __init__(self, raw_url, creds, query_string, headers, ensure_callback):
- """
- Initialize the client.
-
- :param raw_url: The raw URL of the target HTTP server.
- :type raw_url: str
- :param creds: Authentication credentials.
- :type creds: dict
- :param query_string: The query string for the HTTP request.
- :type query_string: str
- :param headers: The headers for the HTTP request.
- :type headers: dict
- :param ensure_callback: A callback to ensure we have the correct
- target_replica_uid, if it was just created.
- :type ensure_callback: callable
- """
- HTTPClientBase.__init__(self, raw_url, creds=creds)
- # info needed to perform the request
- self._query_string = query_string
- self._headers = headers
- self._ensure_callback = ensure_callback
- # the actual request method
- self._request_method = None
- self._success_callback = None
- self._failure_callback = None
-
- def _reset(self):
- """
- Reset this document syncer so we can reuse it.
- """
- self._request_method = None
- self._success_callback = None
- self._failure_callback = None
- self._request_method = None
-
- def set_request_method(self, method, *args, **kwargs):
- """
- Set the actual method to perform the request.
-
- :param method: Either 'get' or 'put'.
- :type method: str
- :param args: Arguments for the request method.
- :type args: list
- :param kwargs: Keyworded arguments for the request method.
- :type kwargs: dict
- """
- self._reset()
- # resolve request method
- if method is 'get':
- self._request_method = self._get_doc
- elif method is 'put':
- self._request_method = self._put_doc
- else:
- raise Exception
- # store request method args
- self._args = args
- self._kwargs = kwargs
-
- def set_success_callback(self, callback):
- self._success_callback = callback
-
- def set_failure_callback(self, callback):
- self._failure_callback = callback
-
- @property
- def success_callback(self):
- return self._success_callback
-
- @property
- def failure_callback(self):
- return self._failure_callback
-
- def do_request(self):
- """
- Actually perform the request.
-
- :return: The body and headers of the response.
- :rtype: tuple
- """
- self._ensure_connection()
- args = self._args
- kwargs = self._kwargs
- return self._request_method(*args, **kwargs)
-
- def _request(self, method, url_parts, params=None, body=None,
- content_type=None):
- """
- Perform an HTTP request.
-
- :param method: The HTTP request method.
- :type method: str
- :param url_parts: A list representing the request path.
- :type url_parts: list
- :param params: Parameters for the URL query string.
- :type params: dict
- :param body: The body of the request.
- :type body: str
- :param content-type: The content-type of the request.
- :type content-type: str
-
- :return: The body and headers of the response.
- :rtype: tuple
-
- :raise errors.Unavailable: Raised after a number of unsuccesful
- request attempts.
- :raise Exception: Raised for any other exception ocurring during the
- request.
- """
-
- self._ensure_connection()
- unquoted_url = url_query = self._url.path
- if url_parts:
- if not url_query.endswith('/'):
- url_query += '/'
- unquoted_url = url_query
- url_query += '/'.join(urllib.quote(part, safe='')
- for part in url_parts)
- # oauth performs its own quoting
- unquoted_url += '/'.join(url_parts)
- encoded_params = {}
- if params:
- for key, value in params.items():
- key = unicode(key).encode('utf-8')
- encoded_params[key] = _encode_query_parameter(value)
- url_query += ('?' + urllib.urlencode(encoded_params))
- if body is not None and not isinstance(body, basestring):
- body = json.dumps(body)
- content_type = 'application/json'
- headers = {}
- if content_type:
- headers['content-type'] = content_type
-
- # Patched: We would like to receive gzip pretty please
- # ----------------------------------------------------
- headers['accept-encoding'] = "gzip"
- # ----------------------------------------------------
-
- headers.update(
- self._sign_request(method, unquoted_url, encoded_params))
-
- for delay in self._delays:
- try:
- self._conn.request(method, url_query, body, headers)
- return self._response()
- except errors.Unavailable, e:
- sleep(delay)
- raise e
-
- def _response(self):
- """
- Return the response of the (possibly gzipped) HTTP request.
-
- :return: The body and headers of the response.
- :rtype: tuple
- """
- resp = self._conn.getresponse()
- body = resp.read()
- headers = dict(resp.getheaders())
-
- # Patched: We would like to decode gzip
- # ----------------------------------------------------
- encoding = headers.get('content-encoding', '')
- if "gzip" in encoding:
- body = _gunzip(body)
- # ----------------------------------------------------
-
- if resp.status in (200, 201):
- return body, headers
- elif resp.status in http_errors.ERROR_STATUSES:
- try:
- respdic = json.loads(body)
- except ValueError:
- pass
- else:
- self._error(respdic)
- # special case
- if resp.status == 503:
- raise errors.Unavailable(body, headers)
- raise errors.HTTPError(resp.status, body, headers)
-
- def _prepare(self, comma, entries, **dic):
- """
- Prepare an entry to be sent through a syncing POST request.
-
- :param comma: A string to be prepended to the current entry.
- :type comma: str
- :param entries: A list of entries accumulated to be sent on the
- request.
- :type entries: list
- :param dic: The data to be included in this entry.
- :type dic: dict
-
- :return: The size of the prepared entry.
- :rtype: int
- """
- entry = comma + '\r\n' + json.dumps(dic)
- entries.append(entry)
- return len(entry)
-
- def _init_post_request(self, action, content_length):
- """
- Initiate a syncing POST request.
-
- :param url: The syncing URL.
- :type url: str
- :param action: The syncing action, either 'get' or 'receive'.
- :type action: str
- :param headers: The initial headers to be sent on this request.
- :type headers: dict
- :param content_length: The content-length of the request.
- :type content_length: int
- """
- self._conn.putrequest('POST', self._query_string)
- self._conn.putheader(
- 'content-type', 'application/x-soledad-sync-%s' % action)
- for header_name, header_value in self._headers:
- self._conn.putheader(header_name, header_value)
- self._conn.putheader('accept-encoding', 'gzip')
- self._conn.putheader('content-length', str(content_length))
- self._conn.endheaders()
-
- def _get_doc(self, received, sync_id, last_known_generation,
- last_known_trans_id):
- """
- Get a sync document from server by means of a POST request.
-
- :param received: The number of documents already received in the
- current sync session.
- :type received: int
- :param sync_id: The id for the current sync session.
- :type sync_id: str
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
-
- :return: The body and headers of the response.
- :rtype: tuple
- """
- entries = ['[']
- size = 1
- # add remote replica metadata to the request
- size += self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=self._ensure_callback is not None)
- # inform server of how many documents have already been received
- size += self._prepare(
- ',', entries, received=received)
- entries.append('\r\n]')
- size += len(entries[-1])
- # send headers
- self._init_post_request('get', size)
- # get document
- for entry in entries:
- self._conn.send(entry)
- return self._response()
-
- def _put_doc(self, sync_id, last_known_generation, last_known_trans_id,
- id, rev, content, gen, trans_id, number_of_docs, doc_idx):
- """
- Put a sync document on server by means of a POST request.
-
- :param sync_id: The id for the current sync session.
- :type sync_id: str
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
- :param id: The document id.
- :type id: str
- :param rev: The document revision.
- :type rev: str
- :param content: The serialized document content.
- :type content: str
- :param gen: The generation of the modification of the document.
- :type gen: int
- :param trans_id: The transaction id of the modification of the
- document.
- :type trans_id: str
- :param number_of_docs: The total amount of documents sent on this sync
- session.
- :type number_of_docs: int
- :param doc_idx: The index of the current document being sent.
- :type doc_idx: int
-
- :return: The body and headers of the response.
- :rtype: tuple
- """
- # prepare to send the document
- entries = ['[']
- size = 1
- # add remote replica metadata to the request
- size += self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=self._ensure_callback is not None)
- # add the document to the request
- size += self._prepare(
- ',', entries,
- id=id, rev=rev, content=content, gen=gen, trans_id=trans_id,
- number_of_docs=number_of_docs, doc_idx=doc_idx)
- entries.append('\r\n]')
- size += len(entries[-1])
- # send headers
- self._init_post_request('put', size)
- # send document
- for entry in entries:
- self._conn.send(entry)
- return self._response()
-
- def _sign_request(self, method, url_query, params):
- """
- Return an authorization header to be included in the HTTP request.
-
- :param method: The HTTP method.
- :type method: str
- :param url_query: The URL query string.
- :type url_query: str
- :param params: A list with encoded query parameters.
- :type param: list
-
- :return: The Authorization header.
- :rtype: list of tuple
- """
- return TokenBasedAuth._sign_request(self, method, url_query, params)
-
- def set_token_credentials(self, uuid, token):
- """
- Store given credentials so we can sign the request later.
-
- :param uuid: The user's uuid.
- :type uuid: str
- :param token: The authentication token.
- :type token: str
- """
- TokenBasedAuth.set_token_credentials(self, uuid, token)
-
-
-class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
- """
- A SyncTarget that encrypts data before sending and decrypts data after
- receiving.
-
- Normally encryption will have been written to the sync database upon
- document modification. The sync database is also used to write temporarily
- the parsed documents that the remote send us, before being decrypted and
- written to the main database.
- """
-
- # will later keep a reference to the insert-doc callback
- # passed to sync_exchange
- _insert_doc_cb = defaultdict(lambda: ProxyBase(None))
-
- """
- Period of recurrence of the periodic decrypting task, in seconds.
- """
- DECRYPT_LOOP_PERIOD = 0.5
-
- #
- # Modified HTTPSyncTarget methods.
- #
-
- def __init__(self, url, source_replica_uid=None, creds=None, crypto=None,
- sync_db=None, sync_db_write_lock=None):
- """
- Initialize the SoledadSyncTarget.
-
- :param source_replica_uid: The source replica uid which we use when
- deferring decryption.
- :type source_replica_uid: str
- :param url: The url of the target replica to sync with.
- :type url: str
- :param creds: Optional dictionary giving credentials.
- to authorize the operation with the server.
- :type creds: dict
- :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
- :type crypto: soledad.crypto.SoledadCrypto
- :param sync_db: Optional. handler for the db with the symmetric
- encryption of the syncing documents. If
- None, encryption will be done in-place,
- instead of retreiving it from the dedicated
- database.
- :type sync_db: Sqlite handler
- :param sync_db_write_lock: a write lock for controlling concurrent
- access to the sync_db
- :type sync_db_write_lock: threading.Lock
- """
- HTTPSyncTarget.__init__(self, url, creds)
- self._raw_url = url
- self._raw_creds = creds
- self._crypto = crypto
- self._stopped = True
- self._stop_lock = threading.Lock()
- self._sync_exchange_lock = threading.Lock()
- self.source_replica_uid = source_replica_uid
- self._defer_decryption = False
- self._syncer_pool = None
-
- # deferred decryption attributes
- self._sync_db = None
- self._sync_db_write_lock = None
- self._decryption_callback = None
- self._sync_decr_pool = None
- self._sync_loop = None
- if sync_db and sync_db_write_lock is not None:
- self._sync_db = sync_db
- self._sync_db_write_lock = sync_db_write_lock
-
- def _setup_sync_decr_pool(self):
- """
- Set up the SyncDecrypterPool for deferred decryption.
- """
- if self._sync_decr_pool is None:
- # initialize syncing queue decryption pool
- self._sync_decr_pool = SyncDecrypterPool(
- self._crypto, self._sync_db,
- self._sync_db_write_lock,
- insert_doc_cb=self._insert_doc_cb)
- self._sync_decr_pool.set_source_replica_uid(
- self.source_replica_uid)
-
- def _teardown_sync_decr_pool(self):
- """
- Tear down the SyncDecrypterPool.
- """
- if self._sync_decr_pool is not None:
- self._sync_decr_pool.close()
- self._sync_decr_pool = None
-
- def _setup_sync_loop(self):
- """
- Set up the sync loop for deferred decryption.
- """
- if self._sync_loop is None:
- self._sync_loop = LoopingCall(
- self._decrypt_syncing_received_docs)
- self._sync_loop.start(self.DECRYPT_LOOP_PERIOD)
-
- def _teardown_sync_loop(self):
- """
- Tear down the sync loop.
- """
- if self._sync_loop is not None:
- self._sync_loop.stop()
- self._sync_loop = None
-
- def _get_replica_uid(self, url):
- """
- Return replica uid from the url, or None.
-
- :param url: the replica url
- :type url: str
- """
- replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url)
- return replica_uid_match[0] if len(replica_uid_match) > 0 else None
-
- @staticmethod
- def connect(url, source_replica_uid=None, crypto=None):
- return SoledadSyncTarget(
- url, source_replica_uid=source_replica_uid, crypto=crypto)
-
- def _parse_received_doc_response(self, response):
- """
- Parse the response from the server containing the received document.
-
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
- """
- data, _ = response
- # decode incoming stream
- parts = data.splitlines()
- if not parts or parts[0] != '[' or parts[-1] != ']':
- raise errors.BrokenSyncStream
- data = parts[1:-1]
- # decode metadata
- line, comma = utils.check_and_strip_comma(data[0])
- metadata = None
- try:
- metadata = json.loads(line)
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
- number_of_changes = metadata['number_of_changes']
- except (json.JSONDecodeError, KeyError):
- raise errors.BrokenSyncStream
- # make sure we have replica_uid from fresh new dbs
- if self._ensure_callback and 'replica_uid' in metadata:
- self._ensure_callback(metadata['replica_uid'])
- # parse incoming document info
- doc_id = None
- rev = None
- content = None
- gen = None
- trans_id = None
- if number_of_changes > 0:
- try:
- entry = json.loads(data[1])
- doc_id = entry['id']
- rev = entry['rev']
- content = entry['content']
- gen = entry['gen']
- trans_id = entry['trans_id']
- except (IndexError, KeyError):
- raise errors.BrokenSyncStream
- return new_generation, new_transaction_id, number_of_changes, \
- doc_id, rev, content, gen, trans_id
-
- def _insert_received_doc(self, idx, total, response):
- """
- Insert a received document into the local replica.
-
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
- """
- new_generation, new_transaction_id, number_of_changes, doc_id, \
- rev, content, gen, trans_id = \
- self._parse_received_doc_response(response)
- if doc_id is not None:
- # decrypt incoming document and insert into local database
- # -------------------------------------------------------------
- # symmetric decryption of document's contents
- # -------------------------------------------------------------
- # If arriving content was symmetrically encrypted, we decrypt it.
- # We do it inline if defer_decryption flag is False or no sync_db
- # was defined, otherwise we defer it writing it to the received
- # docs table.
- doc = SoledadDocument(doc_id, rev, content)
- if is_symmetrically_encrypted(doc):
- if self._queue_for_decrypt:
- self._save_encrypted_received_doc(
- doc, gen, trans_id, idx, total)
- else:
- # defer_decryption is False or no-sync-db fallback
- doc.set_json(decrypt_doc(self._crypto, doc))
- self._return_doc_cb(doc, gen, trans_id)
- else:
- # not symmetrically encrypted doc, insert it directly
- # or save it in the decrypted stage.
- if self._queue_for_decrypt:
- self._save_received_doc(doc, gen, trans_id, idx, total)
- else:
- self._return_doc_cb(doc, gen, trans_id)
- # -------------------------------------------------------------
- # end of symmetric decryption
- # -------------------------------------------------------------
- msg = "%d/%d" % (idx + 1, total)
- signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg)
- logger.debug("Soledad sync receive status: %s" % msg)
- return number_of_changes, new_generation, new_transaction_id
-
- def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
- headers, return_doc_cb, ensure_callback, sync_id,
- defer_decryption=False):
- """
- Fetch sync documents from the remote database and insert them in the
- local database.
-
- If an incoming document's encryption scheme is equal to
- EncryptionSchemes.SYMKEY, then this method will decrypt it with
- Soledad's symmetric key.
-
- :param url: The syncing URL.
- :type url: str
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
- :param headers: The headers of the HTTP request.
- :type headers: dict
- :param return_doc_cb: A callback to insert docs from target.
- :type return_doc_cb: callable
- :param ensure_callback: A callback to ensure we have the correct
- target_replica_uid, if it was just created.
- :type ensure_callback: callable
- :param sync_id: The id for the current sync session.
- :type sync_id: str
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
- :type defer_decryption: bool
-
- :raise BrokenSyncStream: If `data` is malformed.
-
- :return: A dictionary representing the first line of the response got
- from remote replica.
- :rtype: dict
- """
- # we keep a reference to the callback in case we defer the decryption
- self._return_doc_cb = return_doc_cb
- self._queue_for_decrypt = defer_decryption \
- and self._sync_db is not None
-
- new_generation = last_known_generation
- new_transaction_id = last_known_trans_id
-
- if self._queue_for_decrypt:
- logger.debug(
- "Soledad sync: will queue received docs for decrypting.")
-
- idx = 0
- number_of_changes = 1
-
- first_request = True
- last_callback_lock = None
- threads = []
-
- # get incoming documents
- while idx < number_of_changes:
- # bail out if sync process was interrupted
- if self.stopped is True:
- break
-
- # launch a thread to fetch one document from target
- t = self._syncer_pool.new_syncer_thread(
- idx, number_of_changes,
- last_callback_lock=last_callback_lock)
-
- # bail out if any thread failed
- if t is None:
- self.stop()
- break
-
- t.doc_syncer.set_request_method(
- 'get', idx, sync_id, last_known_generation,
- last_known_trans_id)
- t.doc_syncer.set_success_callback(self._insert_received_doc)
-
- def _failure_callback(idx, total, exception):
- _failure_msg = "Soledad sync: error while getting document " \
- "%d/%d: %s" \
- % (idx + 1, total, exception)
- logger.warning("%s" % _failure_msg)
- logger.warning("Soledad sync: failing gracefully, will "
- "recover on next sync.")
-
- t.doc_syncer.set_failure_callback(_failure_callback)
- threads.append(t)
- t.start()
- last_callback_lock = t.callback_lock
- idx += 1
-
- # if this is the first request, wait to update the number of
- # changes
- if first_request is True:
- t.join()
- if t.success:
- number_of_changes, _, _ = t.result
- else:
- raise t.exception
- first_request = False
-
- # make sure all threads finished and we have up-to-date info
- last_successful_thread = None
- while threads:
- # check if there are failures
- t = threads.pop(0)
- t.join()
- if t.success:
- last_successful_thread = t
- else:
- raise t.exception
-
- # get information about last successful thread
- if last_successful_thread is not None:
- body, _ = last_successful_thread.response
- parsed_body = json.loads(body)
- # get current target gen and trans id in case no documents were
- # transferred
- if len(parsed_body) == 1:
- metadata = parsed_body[0]
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
- # get current target gen and trans id from last transferred
- # document
- else:
- doc_data = parsed_body[1]
- new_generation = doc_data['gen']
- new_transaction_id = doc_data['trans_id']
-
- return new_generation, new_transaction_id
-
- def sync_exchange(self, docs_by_generations,
- source_replica_uid, last_known_generation,
- last_known_trans_id, return_doc_cb,
- ensure_callback=None, defer_decryption=True,
- sync_id=None):
- """
- Find out which documents the remote database does not know about,
- encrypt and send them.
-
- This does the same as the parent's method but encrypts content before
- syncing.
-
- :param docs_by_generations: A list of (doc_id, generation, trans_id)
- of local documents that were changed since
- the last local generation the remote
- replica knows about.
- :type docs_by_generations: list of tuples
-
- :param source_replica_uid: The uid of the source replica.
- :type source_replica_uid: str
-
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
-
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
-
- :param return_doc_cb: A callback for inserting received documents from
- target. If not overriden, this will call u1db
- insert_doc_from_target in synchronizer, which
- implements the TAKE OTHER semantics.
- :type return_doc_cb: function
-
- :param ensure_callback: A callback that ensures we know the target
- replica uid if the target replica was just
- created.
- :type ensure_callback: function
-
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
- :type defer_decryption: bool
-
- :return: The new generation and transaction id of the target replica.
- :rtype: tuple
- """
- self._ensure_callback = ensure_callback
-
- if defer_decryption and self._sync_db is not None:
- self._sync_exchange_lock.acquire()
- self._setup_sync_decr_pool()
- self._setup_sync_loop()
- self._defer_decryption = True
- else:
- # fall back
- defer_decryption = False
-
- self.start()
-
- if sync_id is None:
- sync_id = str(uuid4())
- self.source_replica_uid = source_replica_uid
- # let the decrypter pool access the passed callback to insert docs
- setProxiedObject(self._insert_doc_cb[source_replica_uid],
- return_doc_cb)
-
- # empty the database before starting a new sync
- if defer_decryption is True and not self.clear_to_sync():
- self._sync_decr_pool.empty()
-
- self._ensure_connection()
- if self._trace_hook: # for tests
- self._trace_hook('sync_exchange')
- url = '%s/sync-from/%s' % (self._url.path, source_replica_uid)
- headers = self._sign_request('POST', url, {})
-
- cur_target_gen = last_known_generation
- cur_target_trans_id = last_known_trans_id
-
- # send docs
- msg = "%d/%d" % (0, len(docs_by_generations))
- signal(SOLEDAD_SYNC_SEND_STATUS, msg)
- logger.debug("Soledad sync send status: %s" % msg)
-
- defer_encryption = self._sync_db is not None
- self._syncer_pool = DocumentSyncerPool(
- self._raw_url, self._raw_creds, url, headers, ensure_callback,
- self.stop_syncer)
- threads = []
- last_callback_lock = None
- sent = 0
- total = len(docs_by_generations)
-
- synced = []
- number_of_docs = len(docs_by_generations)
-
- last_request_lock = None
- for doc, gen, trans_id in docs_by_generations:
- # allow for interrupting the sync process
- if self.stopped is True:
- break
-
- # skip non-syncable docs
- if isinstance(doc, SoledadDocument) and not doc.syncable:
- continue
-
- # -------------------------------------------------------------
- # symmetric encryption of document's contents
- # -------------------------------------------------------------
- doc_json = doc.get_json()
- if not doc.is_tombstone():
- if not defer_encryption:
- # fallback case, for tests
- doc_json = encrypt_doc(self._crypto, doc)
- else:
- try:
- doc_json = self.get_encrypted_doc_from_db(
- doc.doc_id, doc.rev)
- except Exception as exc:
- logger.error("Error while getting "
- "encrypted doc from db")
- logger.exception(exc)
- continue
- if doc_json is None:
- # Not marked as tombstone, but we got nothing
- # from the sync db. As it is not encrypted yet, we
- # force inline encryption.
- # TODO: implement a queue to deal with these cases.
- doc_json = encrypt_doc(self._crypto, doc)
- # -------------------------------------------------------------
- # end of symmetric encryption
- # -------------------------------------------------------------
- t = self._syncer_pool.new_syncer_thread(
- sent + 1, total, last_request_lock=last_request_lock,
- last_callback_lock=last_callback_lock)
-
- # bail out if any thread failed
- if t is None:
- self.stop()
- break
-
- # set the request method
- t.doc_syncer.set_request_method(
- 'put', sync_id, cur_target_gen, cur_target_trans_id,
- id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
- trans_id=trans_id, number_of_docs=number_of_docs,
- doc_idx=sent + 1)
- # set the success calback
-
- def _success_callback(idx, total, response):
- _success_msg = "Soledad sync send status: %d/%d" \
- % (idx, total)
- signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg)
- logger.debug(_success_msg)
-
- t.doc_syncer.set_success_callback(_success_callback)
-
- # set the failure callback
- def _failure_callback(idx, total, exception):
- _failure_msg = "Soledad sync: error while sending document " \
- "%d/%d: %s" % (idx, total, exception)
- logger.warning("%s" % _failure_msg)
- logger.warning("Soledad sync: failing gracefully, will "
- "recover on next sync.")
-
- t.doc_syncer.set_failure_callback(_failure_callback)
-
- # save thread and append
- t.start()
- threads.append((t, doc))
-
- # update lock references so they can be used in next call to
- # syncer_pool.new_syncer_thread() above
- last_callback_lock = t.callback_lock
- last_request_lock = t.request_lock
-
- sent += 1
-
- # make sure all threads finished and we have up-to-date info
- last_successful_thread = None
- while threads:
- # check if there are failures
- t, doc = threads.pop(0)
- t.join()
- if t.success:
- synced.append((doc.doc_id, doc.rev))
- last_successful_thread = t
- else:
- raise t.exception
-
- # delete documents from the sync database
- if defer_encryption:
- self.delete_encrypted_docs_from_db(synced)
-
- # get target gen and trans_id after docs
- gen_after_send = None
- trans_id_after_send = None
- if last_successful_thread is not None:
- response_dict = json.loads(last_successful_thread.response[0])[0]
- gen_after_send = response_dict['new_generation']
- trans_id_after_send = response_dict['new_transaction_id']
-
- # get docs from target
- if self.stopped is False:
- cur_target_gen, cur_target_trans_id = self._get_remote_docs(
- url,
- last_known_generation, last_known_trans_id, headers,
- return_doc_cb, ensure_callback, sync_id,
- defer_decryption=defer_decryption)
-
- self._syncer_pool.cleanup()
-
- # decrypt docs in case of deferred decryption
- if defer_decryption:
- while not self.clear_to_sync():
- sleep(self.DECRYPT_LOOP_PERIOD)
- self._teardown_sync_loop()
- self._teardown_sync_decr_pool()
- self._sync_exchange_lock.release()
-
- # update gen and trans id info in case we just sent and did not
- # receive docs.
- if gen_after_send is not None and gen_after_send > cur_target_gen:
- cur_target_gen = gen_after_send
- cur_target_trans_id = trans_id_after_send
-
- self.stop()
- self._syncer_pool = None
- return cur_target_gen, cur_target_trans_id
-
- def start(self):
- """
- Mark current sync session as running.
- """
- with self._stop_lock:
- self._stopped = False
-
-
- def stop_syncer(self):
- with self._stop_lock:
- self._stopped = True
-
- def stop(self):
- """
- Mark current sync session as stopped.
-
- This will eventually interrupt the sync_exchange() method and return
- enough information to the synchronizer so the sync session can be
- recovered afterwards.
- """
- self.stop_syncer()
- if self._syncer_pool:
- self._syncer_pool.cancel_threads()
-
- @property
- def stopped(self):
- """
- Return whether this sync session is stopped.
-
- :return: Whether this sync session is stopped.
- :rtype: bool
- """
- with self._stop_lock:
- return self._stopped is True
-
- def get_encrypted_doc_from_db(self, doc_id, doc_rev):
- """
- Retrieve encrypted document from the database of encrypted docs for
- sync.
-
- :param doc_id: The Document id.
- :type doc_id: str
-
- :param doc_rev: The document revision
- :type doc_rev: str
- """
- encr = SyncEncrypterPool
- sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
- encr.TABLE_NAME,))
- res = self._fetchall(sql, (doc_id, doc_rev))
- if res:
- val = res.pop()
- return val[0]
- else:
- # no doc found
- return None
-
- def delete_encrypted_docs_from_db(self, docs_ids):
- """
- Delete several encrypted documents from the database of symmetrically
- encrypted docs to sync.
-
- :param docs_ids: an iterable with (doc_id, doc_rev) for all documents
- to be deleted.
- :type docs_ids: any iterable of tuples of str
- """
- if docs_ids:
- encr = SyncEncrypterPool
- for doc_id, doc_rev in docs_ids:
- sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % (
- encr.TABLE_NAME,))
- self._sync_db.execute(sql, (doc_id, doc_rev))
-
- def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
- """
- Save a symmetrically encrypted incoming document into the received
- docs table in the sync db. A decryption task will pick it up
- from here in turn.
-
- :param doc: The document to save.
- :type doc: SoledadDocument
- :param gen: The generation.
- :type gen: str
- :param trans_id: Transacion id.
- :type gen: str
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- """
- logger.debug(
- "Enqueueing doc for decryption: %d/%d."
- % (idx + 1, total))
- self._sync_decr_pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id)
-
- def _save_received_doc(self, doc, gen, trans_id, idx, total):
- """
- Save any incoming document into the received docs table in the sync db.
-
- :param doc: The document to save.
- :type doc: SoledadDocument
- :param gen: The generation.
- :type gen: str
- :param trans_id: Transacion id.
- :type gen: str
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- """
- logger.debug(
- "Enqueueing doc, no decryption needed: %d/%d."
- % (idx + 1, total))
- self._sync_decr_pool.insert_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id)
-
- #
- # Symmetric decryption of syncing docs
- #
-
- def clear_to_sync(self):
- """
- Return whether sync can proceed (ie, the received db table is empty).
-
- :return: Whether sync can proceed.
- :rtype: bool
- """
- if self._sync_decr_pool:
- return self._sync_decr_pool.count_docs_in_sync_db() == 0
- return True
-
- def set_decryption_callback(self, cb):
- """
- Set callback to be called when the decryption finishes.
-
- :param cb: The callback to be set.
- :type cb: callable
- """
- self._decryption_callback = cb
-
- def has_decryption_callback(self):
- """
- Return True if there is a decryption callback set.
- :rtype: bool
- """
- return self._decryption_callback is not None
-
- def has_syncdb(self):
- """
- Return True if we have an initialized syncdb.
- """
- return self._sync_db is not None
-
- def _decrypt_syncing_received_docs(self):
- """
- Decrypt the documents received from remote replica and insert them
- into the local one.
-
- Called periodically from LoopingCall self._sync_loop.
- """
- if sameProxiedObjects(
- self._insert_doc_cb.get(self.source_replica_uid),
- None):
- return
-
- decrypter = self._sync_decr_pool
- decrypter.raise_in_case_of_failed_async_calls()
- decrypter.decrypt_received_docs()
- decrypter.process_decrypted()
-
- def _sign_request(self, method, url_query, params):
- """
- Return an authorization header to be included in the HTTP request.
-
- :param method: The HTTP method.
- :type method: str
- :param url_query: The URL query string.
- :type url_query: str
- :param params: A list with encoded query parameters.
- :type param: list
-
- :return: The Authorization header.
- :rtype: list of tuple
- """
- return TokenBasedAuth._sign_request(self, method, url_query, params)
-
- def set_token_credentials(self, uuid, token):
- """
- Store given credentials so we can sign the request later.
-
- :param uuid: The user's uuid.
- :type uuid: str
- :param token: The authentication token.
- :type token: str
- """
- TokenBasedAuth.set_token_credentials(self, uuid, token)
-
- def _fetchall(self, *args, **kwargs):
- with self._sync_db:
- c = self._sync_db.cursor()
- c.execute(*args, **kwargs)
- return c.fetchall()