summaryrefslogtreecommitdiff
path: root/client/src/leap
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap')
-rw-r--r--client/src/leap/soledad/client/__init__.py73
-rw-r--r--client/src/leap/soledad/client/crypto.py1006
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py249
-rw-r--r--client/src/leap/soledad/client/sync.py163
-rw-r--r--client/src/leap/soledad/client/target.py1622
5 files changed, 2507 insertions, 606 deletions
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py
index 0d3a21fd..586e3389 100644
--- a/client/src/leap/soledad/client/__init__.py
+++ b/client/src/leap/soledad/client/__init__.py
@@ -223,33 +223,48 @@ class Soledad(object):
"""
def __init__(self, uuid, passphrase, secrets_path, local_db_path,
- server_url, cert_file, auth_token=None, secret_id=None):
+ server_url, cert_file,
+ auth_token=None, secret_id=None, defer_encryption=False):
"""
Initialize configuration, cryptographic keys and dbs.
:param uuid: User's uuid.
:type uuid: str
+
:param passphrase: The passphrase for locking and unlocking encryption
secrets for local and remote storage.
:type passphrase: unicode
+
:param secrets_path: Path for storing encrypted key used for
symmetric encryption.
:type secrets_path: str
+
:param local_db_path: Path for local encrypted storage db.
:type local_db_path: str
+
:param server_url: URL for Soledad server. This is used either to sync
- with the user's remote db and to interact with the shared recovery
- database.
+ with the user's remote db and to interact with the
+ shared recovery database.
:type server_url: str
+
:param cert_file: Path to the certificate of the ca used
to validate the SSL certificate used by the remote
soledad server.
:type cert_file: str
+
:param auth_token: Authorization token for accessing remote databases.
:type auth_token: str
+ :param secret_id: The id of the storage secret to be used.
+ :type secret_id: str
+
+ :param defer_encryption: Whether to defer encryption/decryption of
+ documents, or do it inline while syncing.
+ :type defer_encryption: bool
+
:raise BootstrapSequenceError: Raised when the secret generation and
- storage on server sequence has failed for some reason.
+ storage on server sequence has failed
+ for some reason.
"""
# get config params
self._uuid = uuid
@@ -258,8 +273,10 @@ class Soledad(object):
# init crypto variables
self._secrets = {}
self._secret_id = secret_id
- # init config (possibly with default values)
+ self._defer_encryption = defer_encryption
+
self._init_config(secrets_path, local_db_path, server_url)
+
self._set_token(auth_token)
self._shared_db_instance = None
# configure SSL certificate
@@ -390,6 +407,7 @@ class Soledad(object):
# release the lock on shared db
try:
self._shared_db.unlock(token)
+ self._shared_db.close()
except NotLockedError:
# for some reason the lock expired. Despite that, secret
# loading or generation/storage must have been executed
@@ -469,24 +487,20 @@ class Soledad(object):
create=True,
document_factory=SoledadDocument,
crypto=self._crypto,
- raw_key=True)
+ raw_key=True,
+ defer_encryption=self._defer_encryption)
def close(self):
"""
Close underlying U1DB database.
"""
+ logger.debug("Closing soledad")
if hasattr(self, '_db') and isinstance(
self._db,
SQLCipherDatabase):
+ self._db.stop_sync()
self._db.close()
- def __del__(self):
- """
- Make sure local database is closed when object is destroyed.
- """
- # Watch out! We have no guarantees that this is properly called.
- self.close()
-
#
# Management of secret for symmetric encryption.
#
@@ -520,6 +534,9 @@ class Soledad(object):
Define the id of the storage secret to be used.
This method will also replace the secret in the crypto object.
+
+ :param secret_id: The id of the storage secret to be used.
+ :type secret_id: str
"""
self._secret_id = secret_id
@@ -881,7 +898,7 @@ class Soledad(object):
:type json: str
:param doc_id: An optional identifier specifying the document id.
:type doc_id:
- :return: The new cocument
+ :return: The new document
:rtype: SoledadDocument
"""
return self._db.create_doc_from_json(json, doc_id=doc_id)
@@ -1041,7 +1058,7 @@ class Soledad(object):
if self._db:
return self._db.resolve_doc(doc, conflicted_doc_revs)
- def sync(self):
+ def sync(self, defer_decryption=True):
"""
Synchronize the local encrypted replica with a remote replica.
@@ -1051,16 +1068,25 @@ class Soledad(object):
:param url: the url of the target replica to sync with
:type url: str
- :return: the local generation before the synchronisation was
- performed.
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+
+ :return: The local generation before the synchronisation was
+ performed.
:rtype: str
"""
if self._db:
+ try:
local_gen = self._db.sync(
urlparse.urljoin(self.server_url, 'user-%s' % self._uuid),
- creds=self._creds, autocreate=False)
+ creds=self._creds, autocreate=False,
+ defer_decryption=defer_decryption)
signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)
return local_gen
+ except Exception as e:
+ logger.error("Soledad exception when syncing: %s" % str(e))
def stop_sync(self):
"""
@@ -1079,7 +1105,9 @@ class Soledad(object):
:return: Whether remote replica and local replica differ.
:rtype: bool
"""
- target = SoledadSyncTarget(url, creds=self._creds, crypto=self._crypto)
+ target = SoledadSyncTarget(
+ url, self._db._get_replica_uid(), creds=self._creds,
+ crypto=self._crypto)
info = target.get_sync_info(self._db._get_replica_uid())
# compare source generation with target's last known source generation
if self._db._get_generation() != info[4]:
@@ -1087,6 +1115,13 @@ class Soledad(object):
return True
return False
+ @property
+ def syncing(self):
+ """
+ Property, True if the syncer is syncing.
+ """
+ return self._db.syncing
+
def _set_token(self, token):
"""
Set the authentication token for remote database access.
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index a6372107..7133f804 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# crypto.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,27 +14,45 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
Cryptographic utilities for Soledad.
"""
-
-
import os
import binascii
import hmac
import hashlib
-
+import json
+import logging
+import multiprocessing
+import threading
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
+from zope.proxy import sameProxiedObjects
+
+from leap.soledad.common import soledad_assert
+from leap.soledad.common import soledad_assert_type
+from leap.soledad.common.document import SoledadDocument
+
+
+from leap.soledad.common.crypto import (
+ EncryptionSchemes,
+ UnknownEncryptionScheme,
+ MacMethods,
+ UnknownMacMethod,
+ WrongMac,
+ ENC_JSON_KEY,
+ ENC_SCHEME_KEY,
+ ENC_METHOD_KEY,
+ ENC_IV_KEY,
+ MAC_KEY,
+ MAC_METHOD_KEY,
+)
+logger = logging.getLogger(__name__)
-from leap.soledad.common import (
- soledad_assert,
- soledad_assert_type,
-)
+
+MAC_KEY_LENGTH = 64
class EncryptionMethods(object):
@@ -45,6 +63,17 @@ class EncryptionMethods(object):
AES_256_CTR = 'aes-256-ctr'
XSALSA20 = 'xsalsa20'
+#
+# Exceptions
+#
+
+
+class DocumentNotEncrypted(Exception):
+ """
+ Raised for failures in document encryption.
+ """
+ pass
+
class UnknownEncryptionMethod(Exception):
"""
@@ -59,13 +88,116 @@ class NoSymmetricSecret(Exception):
"""
-class SoledadCrypto(object):
+def encrypt_sym(data, key, method):
"""
- General cryptographic functionality.
+ Encrypt C{data} using a {password}.
+
+ Currently, the only encryption methods supported are AES-256 in CTR
+ mode and XSalsa20.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param key: The key used to encrypt C{data} (must be 256 bits long).
+ :type key: str
+ :param method: The encryption method to use.
+ :type method: str
+
+ :return: A tuple with the initial value and the encrypted data.
+ :rtype: (long, str)
+ """
+ soledad_assert_type(key, str)
+
+ soledad_assert(
+ len(key) == 32, # 32 x 8 = 256 bits.
+ 'Wrong key size: %s bits (must be 256 bits long).' %
+ (len(key) * 8))
+ iv = None
+ # AES-256 in CTR mode
+ if method == EncryptionMethods.AES_256_CTR:
+ iv = os.urandom(16)
+ ciphertext = AES(key=key, iv=iv).process(data)
+ # XSalsa20
+ elif method == EncryptionMethods.XSALSA20:
+ iv = os.urandom(24)
+ ciphertext = XSalsa20(key=key, iv=iv).process(data)
+ else:
+ # raise if method is unknown
+ raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
+ return binascii.b2a_base64(iv), ciphertext
+
+
+def decrypt_sym(data, key, method, **kwargs):
"""
+ Decrypt data using symmetric secret.
+
+ Currently, the only encryption method supported is AES-256 CTR mode.
- MAC_KEY_LENGTH = 64
+ :param data: The data to be decrypted.
+ :type data: str
+ :param key: The key used to decrypt C{data} (must be 256 bits long).
+ :type key: str
+ :param method: The encryption method to use.
+ :type method: str
+ :param kwargs: Other parameters specific to each encryption method.
+ :type kwargs: dict
+
+ :return: The decrypted data.
+ :rtype: str
+ """
+ soledad_assert_type(key, str)
+ # assert params
+ soledad_assert(
+ len(key) == 32, # 32 x 8 = 256 bits.
+ 'Wrong key size: %s (must be 256 bits long).' % len(key))
+ soledad_assert(
+ 'iv' in kwargs,
+ '%s needs an initial value.' % method)
+ # AES-256 in CTR mode
+ if method == EncryptionMethods.AES_256_CTR:
+ return AES(
+ key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
+ elif method == EncryptionMethods.XSALSA20:
+ return XSalsa20(
+ key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
+
+ # raise if method is unknown
+ raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
+
+
+def doc_mac_key(doc_id, secret):
+ """
+ Generate a key for calculating a MAC for a document whose id is
+ C{doc_id}.
+ The key is derived using HMAC having sha256 as underlying hash
+ function. The key used for HMAC is the first MAC_KEY_LENGTH characters
+ of Soledad's storage secret. The HMAC message is C{doc_id}.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+
+ :param secret: soledad secret storage
+ :type secret: Soledad.storage_secret
+
+ :return: The key.
+ :rtype: str
+
+ :raise NoSymmetricSecret: if no symmetric secret was supplied.
+ """
+ if secret is None:
+ raise NoSymmetricSecret()
+
+ return hmac.new(
+ secret[:MAC_KEY_LENGTH],
+ doc_id,
+ hashlib.sha256).digest()
+
+
+class SoledadCrypto(object):
+ """
+ General cryptographic functionality encapsulated in a
+ object that can be passed along.
+ """
def __init__(self, soledad):
"""
Initialize the crypto object.
@@ -77,78 +209,14 @@ class SoledadCrypto(object):
def encrypt_sym(self, data, key,
method=EncryptionMethods.AES_256_CTR):
- """
- Encrypt C{data} using a {password}.
-
- Currently, the only encryption method supported is AES-256 CTR mode.
-
- :param data: The data to be encrypted.
- :type data: str
- :param key: The key used to encrypt C{data} (must be 256 bits long).
- :type key: str
- :param method: The encryption method to use.
- :type method: str
-
- :return: A tuple with the initial value and the encrypted data.
- :rtype: (long, str)
- """
- soledad_assert_type(key, str)
-
- soledad_assert(
- len(key) == 32, # 32 x 8 = 256 bits.
- 'Wrong key size: %s bits (must be 256 bits long).' %
- (len(key) * 8))
- iv = None
- # AES-256 in CTR mode
- if method == EncryptionMethods.AES_256_CTR:
- iv = os.urandom(16)
- ciphertext = AES(key=key, iv=iv).process(data)
- # XSalsa20
- elif method == EncryptionMethods.XSALSA20:
- iv = os.urandom(24)
- ciphertext = XSalsa20(key=key, iv=iv).process(data)
- else:
- # raise if method is unknown
- raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
- return binascii.b2a_base64(iv), ciphertext
+ return encrypt_sym(data, key, method)
def decrypt_sym(self, data, key,
method=EncryptionMethods.AES_256_CTR, **kwargs):
- """
- Decrypt data using symmetric secret.
+ return decrypt_sym(data, key, method, **kwargs)
- Currently, the only encryption method supported is AES-256 CTR mode.
-
- :param data: The data to be decrypted.
- :type data: str
- :param key: The key used to decrypt C{data} (must be 256 bits long).
- :type key: str
- :param method: The encryption method to use.
- :type method: str
- :param kwargs: Other parameters specific to each encryption method.
- :type kwargs: dict
-
- :return: The decrypted data.
- :rtype: str
- """
- soledad_assert_type(key, str)
- # assert params
- soledad_assert(
- len(key) == 32, # 32 x 8 = 256 bits.
- 'Wrong key size: %s (must be 256 bits long).' % len(key))
- soledad_assert(
- 'iv' in kwargs,
- '%s needs an initial value.' % method)
- # AES-256 in CTR mode
- if method == EncryptionMethods.AES_256_CTR:
- return AES(
- key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
- elif method == EncryptionMethods.XSALSA20:
- return XSalsa20(
- key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
-
- # raise if method is unknown
- raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
+ def doc_mac_key(self, doc_id, secret):
+ return doc_mac_key(doc_id, self.secret)
def doc_passphrase(self, doc_id):
"""
@@ -173,41 +241,769 @@ class SoledadCrypto(object):
raise NoSymmetricSecret()
return hmac.new(
self.secret[
- self.MAC_KEY_LENGTH:
+ MAC_KEY_LENGTH:
self._soledad.REMOTE_STORAGE_SECRET_LENGTH],
doc_id,
hashlib.sha256).digest()
- def doc_mac_key(self, doc_id):
+ #
+ # secret setters/getters
+ #
+
+ def _get_secret(self):
+ return self._soledad.storage_secret
+
+ secret = property(
+ _get_secret, doc='The secret used for symmetric encryption')
+
+#
+# Crypto utilities for a SoledadDocument.
+#
+
+
+def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret):
+ """
+ Calculate a MAC for C{doc} using C{ciphertext}.
+
+ Current MAC method used is HMAC, with the following parameters:
+
+ * key: sha256(storage_secret, doc_id)
+ * msg: doc_id + doc_rev + ciphertext
+ * digestmod: sha256
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+ :param ciphertext: The content of the document.
+ :type ciphertext: str
+ :param mac_method: The MAC method to use.
+ :type mac_method: str
+ :param secret: soledad secret
+ :type secret: Soledad.secret_storage
+
+ :return: The calculated MAC.
+ :rtype: str
+ """
+ if mac_method == MacMethods.HMAC:
+ return hmac.new(
+ doc_mac_key(doc_id, secret),
+ str(doc_id) + str(doc_rev) + ciphertext,
+ hashlib.sha256).digest()
+ # raise if we do not know how to handle this MAC method
+ raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method)
+
+
+def encrypt_doc(crypto, doc):
+ """
+ Wrapper around encrypt_docstr that accepts a crypto object and the document
+ as arguments.
+
+ :param crypto: a soledad crypto object.
+ :type crypto: SoledadCrypto
+ :param doc: the document.
+ :type doc: SoledadDocument
+ """
+ key = crypto.doc_passphrase(doc.doc_id)
+ secret = crypto.secret
+
+ return encrypt_docstr(
+ doc.get_json(), doc.doc_id, doc.rev, key, secret)
+
+
+def encrypt_docstr(docstr, doc_id, doc_rev, key, secret):
+ """
+ Encrypt C{doc}'s content.
+
+ Encrypt doc's contents using AES-256 CTR mode and return a valid JSON
+ string representing the following:
+
+ {
+ ENC_JSON_KEY: '<encrypted doc JSON string>',
+ ENC_SCHEME_KEY: 'symkey',
+ ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
+ ENC_IV_KEY: '<the initial value used to encrypt>',
+ MAC_KEY: '<mac>'
+ MAC_METHOD_KEY: 'hmac'
+ }
+
+ :param docstr: A representation of the document to be encrypted.
+ :type docstr: str or unicode.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+
+ :param key: The key used to encrypt ``data`` (must be 256 bits long).
+ :type key: str
+
+ :param secret: The Soledad secret (used for MAC auth).
+ :type secret: str
+
+ :return: The JSON serialization of the dict representing the encrypted
+ content.
+ :rtype: str
+ """
+ # encrypt content using AES-256 CTR mode
+ iv, ciphertext = encrypt_sym(
+ str(docstr), # encryption/decryption routines expect str
+ key, method=EncryptionMethods.AES_256_CTR)
+ # Return a representation for the encrypted content. In the following, we
+ # convert binary data to hexadecimal representation so the JSON
+ # serialization does not complain about what it tries to serialize.
+ hex_ciphertext = binascii.b2a_hex(ciphertext)
+ return json.dumps({
+ ENC_JSON_KEY: hex_ciphertext,
+ ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY,
+ ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
+ ENC_IV_KEY: iv,
+ MAC_KEY: binascii.b2a_hex(mac_doc( # store the mac as hex.
+ doc_id, doc_rev, ciphertext,
+ MacMethods.HMAC, secret)),
+ MAC_METHOD_KEY: MacMethods.HMAC,
+ })
+
+
+def decrypt_doc(crypto, doc):
+ """
+ Wrapper around decrypt_doc_dict that accepts a crypto object and the
+ document as arguments.
+
+ :param crypto: a soledad crypto object.
+ :type crypto: SoledadCrypto
+ :param doc: the document.
+ :type doc: SoledadDocument
+
+ :return: json string with the decrypted document
+ :rtype: str
+ """
+ key = crypto.doc_passphrase(doc.doc_id)
+ secret = crypto.secret
+ return decrypt_doc_dict(doc.content, doc.doc_id, doc.rev, key, secret)
+
+
+def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret):
+ """
+ Decrypt C{doc}'s content.
+
+ Return the JSON string representation of the document's decrypted content.
+
+ The passed doc_dict argument should have the following structure:
+
+ {
+ ENC_JSON_KEY: '<enc_blob>',
+ ENC_SCHEME_KEY: '<enc_scheme>',
+ ENC_METHOD_KEY: '<enc_method>',
+ ENC_IV_KEY: '<initial value used to encrypt>', # (optional)
+ MAC_KEY: '<mac>'
+ MAC_METHOD_KEY: 'hmac'
+ }
+
+ C{enc_blob} is the encryption of the JSON serialization of the document's
+ content. For now Soledad just deals with documents whose C{enc_scheme} is
+ EncryptionSchemes.SYMKEY and C{enc_method} is
+ EncryptionMethods.AES_256_CTR.
+
+ :param doc_dict: The content of the document to be decrypted.
+ :type doc_dict: dict
+
+ :param doc_id: The document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+
+ :param key: The key used to encrypt ``data`` (must be 256 bits long).
+ :type key: str
+
+ :param secret:
+ :type secret:
+
+ :return: The JSON serialization of the decrypted content.
+ :rtype: str
+ """
+ soledad_assert(ENC_JSON_KEY in doc_dict)
+ soledad_assert(ENC_SCHEME_KEY in doc_dict)
+ soledad_assert(ENC_METHOD_KEY in doc_dict)
+ soledad_assert(MAC_KEY in doc_dict)
+ soledad_assert(MAC_METHOD_KEY in doc_dict)
+
+ # verify MAC
+ ciphertext = binascii.a2b_hex( # content is stored as hex.
+ doc_dict[ENC_JSON_KEY])
+ mac = mac_doc(
+ doc_id, doc_rev,
+ ciphertext,
+ doc_dict[MAC_METHOD_KEY], secret)
+ # we compare mac's hashes to avoid possible timing attacks that might
+ # exploit python's builtin comparison operator behaviour, which fails
+ # immediatelly when non-matching bytes are found.
+ doc_mac_hash = hashlib.sha256(
+ binascii.a2b_hex( # the mac is stored as hex
+ doc_dict[MAC_KEY])).digest()
+ calculated_mac_hash = hashlib.sha256(mac).digest()
+
+ if doc_mac_hash != calculated_mac_hash:
+ logger.warning("Wrong MAC while decrypting doc...")
+ raise WrongMac('Could not authenticate document\'s contents.')
+ # decrypt doc's content
+ enc_scheme = doc_dict[ENC_SCHEME_KEY]
+ plainjson = None
+ if enc_scheme == EncryptionSchemes.SYMKEY:
+ enc_method = doc_dict[ENC_METHOD_KEY]
+ if enc_method == EncryptionMethods.AES_256_CTR:
+ soledad_assert(ENC_IV_KEY in doc_dict)
+ plainjson = decrypt_sym(
+ ciphertext, key,
+ method=enc_method,
+ iv=doc_dict[ENC_IV_KEY])
+ else:
+ raise UnknownEncryptionMethod(enc_method)
+ else:
+ raise UnknownEncryptionScheme(enc_scheme)
+
+ return plainjson
+
+
+def is_symmetrically_encrypted(doc):
+ """
+ Return True if the document was symmetrically encrypted.
+
+ :param doc: The document to check.
+ :type doc: SoledadDocument
+
+ :rtype: bool
+ """
+ if doc.content and ENC_SCHEME_KEY in doc.content:
+ if doc.content[ENC_SCHEME_KEY] == EncryptionSchemes.SYMKEY:
+ return True
+ return False
+
+
+#
+# Encrypt/decrypt pools of workers
+#
+
+class SyncEncryptDecryptPool(object):
+ """
+ Base class for encrypter/decrypter pools.
+ """
+ WORKERS = 5
+
+ def __init__(self, crypto, sync_db, write_lock):
+ """
+ Initialize the pool of encryption-workers.
+
+ :param crypto: A SoledadCryto instance to perform the encryption.
+ :type crypto: leap.soledad.crypto.SoledadCrypto
+
+ :param sync_db: a database connection handle
+ :type sync_db: handle
+
+ :param write_lock: a write lock for controlling concurrent access
+ to the sync_db
+ :type write_lock: threading.Lock
+ """
+ self._pool = multiprocessing.Pool(self.WORKERS)
+ self._crypto = crypto
+ self._sync_db = sync_db
+ self._sync_db_write_lock = write_lock
+
+ def close(self):
+ """
+ Cleanly close the pool of workers.
+ """
+ logger.debug("Closing %s" % (self.__class__.__name__,))
+ self._pool.close()
+ try:
+ self._pool.join()
+ except Exception:
+ pass
+
+ def terminate(self):
+ """
+ Terminate the pool of workers.
+ """
+ logger.debug("Terminating %s" % (self.__class__.__name__,))
+ self._pool.terminate()
+
+
+def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
+ """
+ Encrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad secret (used for MAC auth).
+ :type secret: str
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ encrypted_content = encrypt_docstr(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, encrypted_content
+
+
+class SyncEncrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric encryption
+ of documents to be synced.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ WORKERS = 5
+ TABLE_NAME = "docs_tosync"
+ FIELD_NAMES = "doc_id, rev, content"
+
+ def encrypt_doc(self, doc, workers=True):
+ """
+ Symmetrically encrypt a document.
+
+ :param doc: The document with contents to be encrypted.
+ :type doc: SoledadDocument
+
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+ """
+ soledad_assert(self._crypto is not None, "need a crypto object")
+ docstr = doc.get_json()
+ key = self._crypto.doc_passphrase(doc.doc_id)
+ secret = self._crypto.secret
+ args = doc.doc_id, doc.rev, docstr, key, secret
+
+ try:
+ if workers:
+ res = self._pool.apply_async(
+ encrypt_doc_task, args,
+ callback=self.encrypt_doc_cb)
+ else:
+ # encrypt inline
+ res = encrypt_doc_task(*args)
+ self.encrypt_doc_cb(res)
+
+ except Exception as exc:
+ logger.exception(exc)
+
+ def encrypt_doc_cb(self, result):
"""
- Generate a key for calculating a MAC for a document whose id is
- C{doc_id}.
+ Insert results of encryption routine into the local sync database.
- The key is derived using HMAC having sha256 as underlying hash
- function. The key used for HMAC is the first MAC_KEY_LENGTH characters
- of Soledad's storage secret. The HMAC message is C{doc_id}.
+ :param result: A tuple containing the doc id, revision and encrypted
+ content.
+ :type result: tuple(str, str, str)
+ """
+ doc_id, doc_rev, content = result
+ self.insert_encrypted_local_doc(doc_id, doc_rev, content)
- :param doc_id: The id of the document.
+ def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
+ """
+ Insert the contents of the encrypted doc into the local sync
+ database.
+
+ :param doc_id: The document id.
:type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param content: The encrypted document.
+ :type content: str
+ """
+ sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,)
- :return: The key.
- :rtype: str
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_del, (doc_id, ))
+ con.execute(sql_ins, (doc_id, doc_rev, content))
- :raise NoSymmetricSecret: if no symmetric secret was supplied.
+
+def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
+ """
+ Decrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The encrypted content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification of
+ that document.
+ :type trans_id: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad secret (used for MAC auth).
+ :type secret: str
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ decrypted_content = decrypt_doc_dict(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, decrypted_content, gen, trans_id
+
+
+def get_insertable_docs_by_gen(expected, got):
+ """
+ Return a list of documents ready to be inserted. This list is computed
+ by aligning the expected list with the already gotten docs, and returning
+ the maximum number of docs that can be processed in the expected order
+ before finding a gap.
+
+ :param expected: A list of generations to be inserted.
+ :type expected: list
+
+ :param got: A dictionary whose values are the docs to be inserted.
+ :type got: dict
+ """
+ ordered = [got.get(i) for i in expected]
+ if None in ordered:
+ return ordered[:ordered.index(None)]
+ else:
+ return ordered
+
+
+class SyncDecrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric decryption
+ of documents that were received.
+
+ The decryption of the received documents is done in two steps:
+
+ 1. All the encrypted docs are collected, together with their generation
+ and transaction-id
+ 2. The docs are enqueued for decryption. When completed, they are
+ inserted following the generation order.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ TABLE_NAME = "docs_received"
+ FIELD_NAMES = "doc_id, rev, content, gen, trans_id"
+
+ write_encrypted_lock = threading.Lock()
+
+ def __init__(self, *args, **kwargs):
"""
- if self.secret is None:
- raise NoSymmetricSecret()
- return hmac.new(
- self.secret[:self.MAC_KEY_LENGTH],
- doc_id,
- hashlib.sha256).digest()
+ Initialize the decrypter pool, and setup a dict for putting the
+ results of the decrypted docs until they are picked by the insert
+ routine that gets them in order.
+ """
+ self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+ self.decrypted_docs = {}
+ self.source_replica_uid = None
- #
- # secret setters/getters
- #
+ def set_source_replica_uid(self, source_replica_uid):
+ """
+ Set the source replica uid for this decrypter pool instance.
- def _get_secret(self):
- return self._soledad.storage_secret
+ :param source_replica_uid: The uid of the source replica.
+ :type source_replica_uid: str
+ """
+ self.source_replica_uid = source_replica_uid
- secret = property(
- _get_secret, doc='The secret used for symmetric encryption')
+ def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
+ gen, trans_id):
+ """
+ Insert a received message with encrypted content, to be decrypted later
+ on.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ """
+ docstr = json.dumps(content)
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
+ self.TABLE_NAME,)
+
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id))
+
+ def insert_marker_for_received_doc(self, doc_id, doc_rev, gen):
+ """
+ Insert a marker with the document id, revision and generation on the
+ sync db. This document does not have an encrypted payload, so the
+ content has already been inserted into the decrypted_docs dictionary
+ from where it can be picked following generation order.
+ We need to leave here the marker to be able to calculate the expected
+ insertion order for a synchronization batch.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param gen: the Document Generation
+ :type gen: int
+ """
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
+ self.TABLE_NAME,)
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_ins, (doc_id, doc_rev, '', gen, ''))
+
+ def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):
+ """
+ Insert a document that is not symmetrically encrypted.
+ We store it in the staging area (the decrypted_docs dictionary) to be
+ picked up in order as the preceding documents are decrypted.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ """
+ # XXX this need a deeper review / testing.
+ # I believe that what I'm doing here is prone to problems
+ # if the sync is interrupted (ie, client crash) in the worst possible
+ # moment. We would need a recover strategy in that case
+ # (or, insert the document in the table all the same, but with a flag
+ # saying if the document is sym-encrypted or not),
+ content = json.dumps(content)
+ result = doc_id, doc_rev, content, gen, trans_id
+ self.decrypted_docs[gen] = result
+ self.insert_marker_for_received_doc(doc_id, doc_rev, gen)
+
+ def delete_encrypted_received_doc(self, doc_id, doc_rev):
+ """
+ Delete a encrypted received doc after it was inserted into the local
+ db.
+
+ :param doc_id: Document ID.
+ :type doc_id: str
+ :param doc_rev: Document revision.
+ :type doc_rev: str
+ """
+ sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
+ self.TABLE_NAME,)
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_del, (doc_id, doc_rev))
+
+ def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True):
+ """
+ Symmetrically decrypt a document.
+
+ :param doc_id: The ID for the document with contents to be encrypted.
+ :type doc: str
+ :param rev: The revision of the document.
+ :type rev: str
+ :param source_replica_uid:
+ :type source_replica_uid: str
+
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+ """
+ self.source_replica_uid = source_replica_uid
+
+ # insert_doc_cb is a proxy object that gets updated with the right
+ # insert function only when the sync_target invokes the sync_exchange
+ # method. so, if we don't still have a non-empty callback, we refuse
+ # to proceed.
+ if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid),
+ None):
+ logger.debug("Sync decrypter pool: no insert_doc_cb() yet.")
+ return
+
+ # XXX move to get_doc function...
+ c = self._sync_db.cursor()
+ sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % (
+ self.TABLE_NAME,)
+ try:
+ c.execute(sql, (doc_id, rev))
+ res = c.fetchone()
+ except Exception as exc:
+ logger.warning("Error getting docs from syncdb: %r" % (exc,))
+ return
+ if res is None:
+ logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev))
+ return
+
+ soledad_assert(self._crypto is not None, "need a crypto object")
+ try:
+ doc_id, rev, docstr, gen, trans_id = res
+ except ValueError:
+ logger.warning("Wrong entry in sync db")
+ return
+
+ if len(docstr) == 0:
+ # not encrypted payload
+ return
+
+ try:
+ content = json.loads(docstr)
+ except TypeError:
+ logger.warning("Wrong type while decoding json: %s" % repr(docstr))
+ return
+
+ key = self._crypto.doc_passphrase(doc_id)
+ secret = self._crypto.secret
+ args = doc_id, rev, content, gen, trans_id, key, secret
+
+ try:
+ if workers:
+ # Ouch. This is sent to the workers asynchronously, so
+ # we have no way of logging errors. We'd have to inspect
+ # lingering results by querying successful / get() over them...
+ # Or move the heck out of it to twisted.
+ res = self._pool.apply_async(
+ decrypt_doc_task, args,
+ callback=self.decrypt_doc_cb)
+ else:
+ # decrypt inline
+ res = decrypt_doc_task(*args)
+ self.decrypt_doc_cb(res)
+
+ except Exception as exc:
+ logger.exception(exc)
+
+ def decrypt_doc_cb(self, result):
+ """
+ Temporarily store the decryption result in a dictionary where it will
+ be picked by process_decrypted.
+
+ :param result: A tuple containing the doc id, revision and encrypted
+ content.
+ :type result: tuple(str, str, str)
+ """
+ doc_id, rev, content, gen, trans_id = result
+ logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen))
+ self.decrypted_docs[gen] = result
+
+ def get_docs_by_generation(self):
+ """
+ Get all documents in the received table from the sync db,
+ ordered by generation.
+
+ :return: list of doc_id, rev, generation
+ """
+ c = self._sync_db.cursor()
+ sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % (
+ self.TABLE_NAME,)
+ c.execute(sql)
+ return c.fetchall()
+
+ def count_received_encrypted_docs(self):
+ """
+ Count how many documents we have in the table for received and
+ encrypted docs.
+
+ :return: The count of documents.
+ :rtype: int
+ """
+ if self._sync_db is None:
+ logger.warning("cannot return count with null sync_db")
+ return
+ c = self._sync_db.cursor()
+ sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
+ c.execute(sql)
+ res = c.fetchone()
+ if res is not None:
+ return res[0]
+ else:
+ return 0
+
+ def decrypt_received_docs(self):
+ """
+ Get all the encrypted documents from the sync database and dispatch a
+ decrypt worker to decrypt each one of them.
+ """
+ docs_by_generation = self.get_docs_by_generation()
+ logger.debug("Sync decrypter pool: There are %d documents to " \
+ "decrypt." % len(docs_by_generation))
+ for doc_id, rev, gen in filter(None, docs_by_generation):
+ self.decrypt_doc(doc_id, rev, self.source_replica_uid)
+
+ def process_decrypted(self):
+ """
+ Process the already decrypted documents, and insert as many documents
+ as can be taken from the expected order without finding a gap.
+
+ :return: Whether we have processed all the pending docs.
+ :rtype: bool
+ """
+ # Acquire the lock to avoid processing while we're still
+ # getting data from the syncing stream, to avoid InvalidGeneration
+ # problems.
+ with self.write_encrypted_lock:
+ already_decrypted = self.decrypted_docs
+ docs = self.get_docs_by_generation()
+ docs = filter(lambda entry: len(entry) > 0, docs)
+ expected = [gen for doc_id, rev, gen in docs]
+ docs_to_insert = get_insertable_docs_by_gen(
+ expected, already_decrypted)
+ for doc_fields in docs_to_insert:
+ self.insert_decrypted_local_doc(*doc_fields)
+ remaining = self.count_received_encrypted_docs()
+ return remaining == 0
+
+ def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
+ gen, trans_id):
+ """
+ Insert the decrypted document into the local sqlcipher database.
+ Makes use of the passed callback `return_doc_cb` passed to the caller
+ by u1db sync.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
+ """
+ # could pass source_replica in params for callback chain
+ insert_fun = self._insert_doc_cb[self.source_replica_uid]
+ logger.debug("Sync decrypter pool: inserting doc in local db: " \
+ "%s:%s %s" % (doc_id, doc_rev, gen))
+ try:
+ # convert deleted documents to avoid error on document creation
+ if content == 'null':
+ content = None
+ doc = SoledadDocument(doc_id, doc_rev, content)
+ insert_fun(doc, int(gen), trans_id)
+ except Exception as exc:
+ logger.error("Sync decrypter pool: error while inserting "
+ "decrypted doc into local db.")
+ logger.exception(exc)
+
+ else:
+ # If no errors found, remove it from the local temporary dict
+ # and from the received database.
+ self.decrypted_docs.pop(gen)
+ self.delete_encrypted_received_doc(doc_id, doc_rev)
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 5ffa9c7e..2df9606e 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# sqlcipher.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,8 +14,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
A U1DB backend that uses SQLCipher as its persistence layer.
@@ -40,11 +38,13 @@ cipher_default_use_hmac can be used to globally alter the default use of HMAC
when opening a database.
So, as the statements above were introduced for backwards compatibility with
-SLCipher 1.1 databases, we do not implement them as all SQLCipher databases
+SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases
handled by Soledad should be created by SQLCipher >= 2.0.
"""
import logging
+import multiprocessing
import os
+import sqlite3
import string
import threading
import time
@@ -57,9 +57,12 @@ from collections import defaultdict
from pysqlcipher import dbapi2
from u1db.backends import sqlite_backend
from u1db import errors as u1db_errors
+from taskthread import TimerTask
-from leap.soledad.client.sync import Synchronizer
+from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
from leap.soledad.client.target import SoledadSyncTarget
+from leap.soledad.client.target import PendingReceivedDocsSyncError
+from leap.soledad.client.sync import SoledadSynchronizer
from leap.soledad.common.document import SoledadDocument
@@ -88,7 +91,7 @@ SQLITE_ISOLATION_LEVEL = None
def open(path, password, create=True, document_factory=None, crypto=None,
raw_key=False, cipher='aes-256-cbc', kdf_iter=4000,
- cipher_page_size=1024):
+ cipher_page_size=1024, defer_encryption=False):
"""Open a database at the given location.
Will raise u1db.errors.DatabaseDoesNotExist if create=False and the
@@ -114,6 +117,9 @@ def open(path, password, create=True, document_factory=None, crypto=None,
:type kdf_iter: int
:param cipher_page_size: The page size.
:type cipher_page_size: int
+ :param defer_encryption: Whether to defer encryption/decryption of
+ documents, or do it inline while syncing.
+ :type defer_encryption: bool
:return: An instance of Database.
:rtype SQLCipherDatabase
@@ -121,7 +127,7 @@ def open(path, password, create=True, document_factory=None, crypto=None,
return SQLCipherDatabase.open_database(
path, password, create=create, document_factory=document_factory,
crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter,
- cipher_page_size=cipher_page_size)
+ cipher_page_size=cipher_page_size, defer_encryption=defer_encryption)
#
@@ -147,19 +153,40 @@ class NotAnHexString(Exception):
#
class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
- """A U1DB implementation that uses SQLCipher as its persistence layer."""
+ """
+ A U1DB implementation that uses SQLCipher as its persistence layer.
+ """
+ defer_encryption = False
_index_storage_value = 'expand referenced encrypted'
k_lock = threading.Lock()
create_doc_lock = threading.Lock()
update_indexes_lock = threading.Lock()
+ _sync_watcher = None
+ _sync_enc_pool = None
+
+ """
+ The name of the local symmetrically encrypted documents to
+ sync database file.
+ """
+ LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db'
- syncing_lock = defaultdict(threading.Lock)
"""
A dictionary that hold locks which avoid multiple sync attempts from the
same database replica.
"""
+ encrypting_lock = threading.Lock()
+ """
+ Period or recurrence of the periodic encrypting task, in seconds.
+ """
+ ENCRYPT_TASK_PERIOD = 1
+
+ syncing_lock = defaultdict(threading.Lock)
+ """
+ A dictionary that hold locks which avoid multiple sync attempts from the
+ same database replica.
+ """
def __init__(self, sqlcipher_file, password, document_factory=None,
crypto=None, raw_key=False, cipher='aes-256-cbc',
@@ -194,7 +221,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
self.assert_db_is_encrypted(
sqlcipher_file, password, raw_key, cipher, kdf_iter,
cipher_page_size)
- # connect to the database
+
+ # connect to the sqlcipher database
with self.k_lock:
self._db_handle = dbapi2.connect(
sqlcipher_file,
@@ -215,18 +243,45 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
self._ensure_schema()
self._crypto = crypto
+ self._sync_db = None
+ self._sync_db_write_lock = None
+ self._sync_enc_pool = None
+
+ if self.defer_encryption:
+ if sqlcipher_file != ":memory:":
+ self._sync_db_path = "%s-sync" % sqlcipher_file
+ else:
+ self._sync_db_path = ":memory:"
+
+ # initialize sync db
+ self._init_sync_db()
+
+ # initialize syncing queue encryption pool
+ self._sync_enc_pool = SyncEncrypterPool(
+ self._crypto, self._sync_db, self._sync_db_write_lock)
+ self._sync_watcher = TimerTask(self._encrypt_syncing_docs,
+ self.ENCRYPT_TASK_PERIOD)
+ self._sync_watcher.start()
+
def factory(doc_id=None, rev=None, json='{}', has_conflicts=False,
syncable=True):
return SoledadDocument(doc_id=doc_id, rev=rev, json=json,
has_conflicts=has_conflicts,
syncable=syncable)
self.set_document_factory(factory)
+ # we store syncers in a dictionary indexed by the target URL. We also
+ # store a hash of the auth info in case auth info expires and we need
+ # to rebuild the syncer for that target. The final self._syncers
+ # format is the following:
+ #
+ # self._syncers = {'<url>': ('<auth_hash>', syncer), ...}
self._syncers = {}
@classmethod
def _open_database(cls, sqlcipher_file, password, document_factory=None,
crypto=None, raw_key=False, cipher='aes-256-cbc',
- kdf_iter=4000, cipher_page_size=1024):
+ kdf_iter=4000, cipher_page_size=1024,
+ defer_encryption=False):
"""
Open a SQLCipher database.
@@ -249,10 +304,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:type kdf_iter: int
:param cipher_page_size: The page size.
:type cipher_page_size: int
+ :param defer_encryption: Whether to defer encryption/decryption of
+ documents, or do it inline while syncing.
+ :type defer_encryption: bool
:return: The database object.
:rtype: SQLCipherDatabase
"""
+ cls.defer_encryption = defer_encryption
if not os.path.isfile(sqlcipher_file):
raise u1db_errors.DatabaseDoesNotExist()
@@ -298,43 +357,59 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
def open_database(cls, sqlcipher_file, password, create, backend_cls=None,
document_factory=None, crypto=None, raw_key=False,
cipher='aes-256-cbc', kdf_iter=4000,
- cipher_page_size=1024):
+ cipher_page_size=1024, defer_encryption=False):
"""
Open a SQLCipher database.
:param sqlcipher_file: The path for the SQLCipher file.
:type sqlcipher_file: str
+
:param password: The password that protects the SQLCipher db.
:type password: str
+
:param create: Should the datbase be created if it does not already
- exist?
- :type: bool
+ exist?
+ :type create: bool
+
:param backend_cls: A class to use as backend.
:type backend_cls: type
+
:param document_factory: A function that will be called with the same
- parameters as Document.__init__.
+ parameters as Document.__init__.
:type document_factory: callable
+
:param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
+ document contents when syncing.
:type crypto: soledad.crypto.SoledadCrypto
+
:param raw_key: Whether C{password} is a raw 64-char hex string or a
- passphrase that should be hashed to obtain the encyrption key.
+ passphrase that should be hashed to obtain the
+ encyrption key.
:type raw_key: bool
+
:param cipher: The cipher and mode to use.
:type cipher: str
+
:param kdf_iter: The number of iterations to use.
:type kdf_iter: int
+
:param cipher_page_size: The page size.
:type cipher_page_size: int
+ :param defer_encryption: Whether to defer encryption/decryption of
+ documents, or do it inline while syncing.
+ :type defer_encryption: bool
+
:return: The database object.
:rtype: SQLCipherDatabase
"""
+ cls.defer_encryption = defer_encryption
try:
return cls._open_database(
sqlcipher_file, password, document_factory=document_factory,
crypto=crypto, raw_key=raw_key, cipher=cipher,
- kdf_iter=kdf_iter, cipher_page_size=cipher_page_size)
+ kdf_iter=kdf_iter, cipher_page_size=cipher_page_size,
+ defer_encryption=defer_encryption)
except u1db_errors.DatabaseDoesNotExist:
if not create:
raise
@@ -347,7 +422,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
crypto=crypto, raw_key=raw_key, cipher=cipher,
kdf_iter=kdf_iter, cipher_page_size=cipher_page_size)
- def sync(self, url, creds=None, autocreate=True):
+ def sync(self, url, creds=None, autocreate=True, defer_decryption=True):
"""
Synchronize documents with remote replica exposed at url.
@@ -362,6 +437,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:type creds: dict
:param autocreate: Ask the target to create the db if non-existent.
:type autocreate: bool
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
:return: The local generation before the synchronisation was performed.
:rtype: int
@@ -370,7 +449,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
# the following context manager blocks until the syncing lock can be
# acquired.
with self.syncer(url, creds=creds) as syncer:
- res = syncer.sync(autocreate=autocreate)
+
+ # XXX could mark the critical section here...
+ try:
+ res = syncer.sync(autocreate=autocreate,
+ defer_decryption=defer_decryption)
+
+ except PendingReceivedDocsSyncError:
+ logger.warning("Local sync db is not clear, skipping sync...")
+ return
+
return res
def stop_sync(self):
@@ -394,7 +482,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]:
syncer = self._get_syncer(url, creds=creds)
yield syncer
- syncer.sync_target.close()
+
+ @property
+ def syncing(self):
+ lock = SQLCipherDatabase.syncing_lock[self._get_replica_uid()]
+ acquired_lock = lock.acquire(False)
+ if acquired_lock is False:
+ return True
+ lock.release()
+ return False
def _get_syncer(self, url, creds=None):
"""
@@ -415,11 +511,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
h = sha256(json.dumps([url, creds])).hexdigest()
cur_h, syncer = self._syncers.get(url, (None, None))
if syncer is None or h != cur_h:
- syncer = Synchronizer(
+ wlock = self._sync_db_write_lock
+ syncer = SoledadSynchronizer(
self,
SoledadSyncTarget(url,
+ self._replica_uid,
creds=creds,
- crypto=self._crypto))
+ crypto=self._crypto,
+ sync_db=self._sync_db,
+ sync_db_write_lock=wlock))
self._syncers[url] = (h, syncer)
# in order to reuse the same synchronizer multiple times we have to
# reset its state (i.e. the number of documents received from target
@@ -442,21 +542,85 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
'ALTER TABLE document '
'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE')
- def create_doc(self, content, doc_id=None):
+ def _init_sync_db(self):
+ """
+ Initialize the Symmetrically-Encrypted document to be synced database,
+ and the queue to communicate with subprocess workers.
"""
- Create a new document in the local encrypted database.
+ self._sync_db = sqlite3.connect(self._sync_db_path,
+ check_same_thread=False)
- :param content: the contents of the new document
- :type content: dict
- :param doc_id: an optional identifier specifying the document id
- :type doc_id: str
+ self._sync_db_write_lock = threading.Lock()
+ self._create_sync_db_tables()
+ self.sync_queue = multiprocessing.Queue()
+
+ def _create_sync_db_tables(self):
+ """
+ Create tables for the local sync documents db if needed.
+ """
+ encr = SyncEncrypterPool
+ decr = SyncDecrypterPool
+ sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % (
+ encr.TABLE_NAME, encr.FIELD_NAMES))
+ sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % (
+ decr.TABLE_NAME, decr.FIELD_NAMES))
+
+ with self._sync_db_write_lock:
+ with self._sync_db:
+ self._sync_db.execute(sql_encr)
+ self._sync_db.execute(sql_decr)
+
+ #
+ # Symmetric encryption of syncing docs
+ #
+
+ def _encrypt_syncing_docs(self):
+ """
+ Process the syncing queue and send the documents there
+ to be encrypted in the sync db. They will be read by the
+ SoledadSyncTarget during the sync_exchange.
+
+ Called periodical from the TimerTask self._sync_watcher.
+ """
+ lock = self.encrypting_lock
+ # optional wait flag used to avoid blocking
+ if not lock.acquire(False):
+ return
+ else:
+ queue = self.sync_queue
+ try:
+ while not queue.empty():
+ doc = queue.get_nowait()
+ self._sync_enc_pool.encrypt_doc(doc)
+
+ except Exception as exc:
+ logger.error("Error while encrypting docs to sync")
+ logger.exception(exc)
+ finally:
+ lock.release()
+
+ #
+ # Document operations
+ #
- :return: the new document
- :rtype: SoledadDocument
+ def put_doc(self, doc):
"""
- with self.create_doc_lock:
- return sqlite_backend.SQLitePartialExpandDatabase.create_doc(
- self, content, doc_id=doc_id)
+ Overwrite the put_doc method, to enqueue the modified document for
+ encryption before sync.
+
+ :param doc: The document to be put.
+ :type doc: u1db.Document
+
+ :return: The new document revision.
+ :rtype: str
+ """
+ doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(
+ self, doc)
+ if self.defer_encryption:
+ self.sync_queue.put_nowait(doc)
+ return doc_rev
+
+ # indexes
def _put_and_update_indexes(self, old_doc, doc):
"""
@@ -906,12 +1070,25 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
res = c.fetchall()
return res[0][0]
- def __del__(self):
+ def close(self):
"""
- Closes db_handle upon object destruction.
+ Close db_handle and close syncer.
"""
+ logger.debug("Sqlcipher backend: closing")
+ if self._sync_watcher is not None:
+ self._sync_watcher.stop()
+ self._sync_watcher.shutdown()
+ for url in self._syncers:
+ _, syncer = self._syncers[url]
+ syncer.close()
+ if self._sync_enc_pool is not None:
+ self._sync_enc_pool.close()
if self._db_handle is not None:
self._db_handle.close()
+ @property
+ def replica_uid(self):
+ return self._get_replica_uid()
+
sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index 56e63416..5d545a77 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -17,35 +17,85 @@
"""
-Sync infrastructure that can be interrupted and recovered.
+Soledad synchronization utilities.
+
+
+Extend u1db Synchronizer with the ability to:
+
+ * Defer the update of the known replica uid until all the decryption of
+ the incoming messages has been processed.
+
+ * Be interrupted and recovered.
"""
+
import json
+import logging
+import traceback
+from threading import Lock
from u1db import errors
-from u1db.sync import Synchronizer as U1DBSynchronizer
+from u1db.sync import Synchronizer
-class Synchronizer(U1DBSynchronizer):
+logger = logging.getLogger(__name__)
+
+
+class SoledadSynchronizer(Synchronizer):
"""
Collect the state around synchronizing 2 U1DB replicas.
- Modified to allow for interrupting the synchronization process.
+ Synchronization is bi-directional, in that new items in the source are sent
+ to the target, and new items in the target are returned to the source.
+ However, it still recognizes that one side is initiating the request. Also,
+ at the moment, conflicts are only created in the source.
+
+ Also modified to allow for interrupting the synchronization process.
"""
+ syncing_lock = Lock()
+
def stop(self):
"""
Stop the current sync in progress.
"""
self.sync_target.stop()
- def sync(self, autocreate=False):
+ def sync(self, autocreate=False, defer_decryption=True):
"""
Synchronize documents between source and target.
+ Differently from u1db `Synchronizer.sync` method, this one allows to
+ pass a `defer_decryption` flag that will postpone the last
+ step in the synchronization dance, namely, the setting of the last
+ known generation and transaction id for a given remote replica.
+
+ This is done to allow the ongoing parallel decryption of the incoming
+ docs to proceed without `InvalidGeneration` conflicts.
+
:param autocreate: Whether the target replica should be created or not.
:type autocreate: bool
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+ """
+ self.syncing_lock.acquire()
+ try:
+ return self._sync(autocreate=autocreate,
+ defer_decryption=defer_decryption)
+ except Exception:
+ # re-raising the exceptions to let syqlcipher.sync catch them
+ # (and re-create the syncer instance if needed)
+ raise
+ finally:
+ self.release_syncing_lock()
+
+ def _sync(self, autocreate=False, defer_decryption=True):
+ """
+ Helper function, called from the main `sync` method.
+ See `sync` docstring.
"""
sync_target = self.sync_target
@@ -64,6 +114,16 @@ class Synchronizer(U1DBSynchronizer):
target_gen, target_trans_id = (0, '')
target_my_gen, target_my_trans_id = (0, '')
+ logger.debug(
+ "Soledad target sync info:\n"
+ " target replica uid: %s\n"
+ " target generation: %d\n"
+ " target trans id: %s\n"
+ " target my gen: %d\n"
+ " target my trans_id: %s"
+ % (self.target_replica_uid, target_gen, target_trans_id,
+ target_my_gen, target_my_trans_id))
+
# make sure we'll have access to target replica uid once it exists
if self.target_replica_uid is None:
@@ -80,6 +140,8 @@ class Synchronizer(U1DBSynchronizer):
# what's changed since that generation and this current gen
my_gen, _, changes = self.source.whats_changed(target_my_gen)
+ logger.debug("Soledad sync: there are %d documents to send." \
+ % len(changes))
# get source last-seen database generation for the target
if self.target_replica_uid is None:
@@ -88,6 +150,11 @@ class Synchronizer(U1DBSynchronizer):
target_last_known_gen, target_last_known_trans_id = \
self.source._get_replica_gen_and_trans_id(
self.target_replica_uid)
+ logger.debug(
+ "Soledad source sync info:\n"
+ " source target gen: %d\n"
+ " source target trans_id: %s"
+ % (target_last_known_gen, target_last_known_trans_id))
# validate transaction ids
if not changes and target_last_known_gen == target_gen:
@@ -111,15 +178,85 @@ class Synchronizer(U1DBSynchronizer):
#
# The sync_exchange method may be interrupted, in which case it will
# return a tuple of Nones.
- new_gen, new_trans_id = sync_target.sync_exchange(
- docs_by_generation, self.source._replica_uid,
- target_last_known_gen, target_last_known_trans_id,
- self._insert_doc_from_target, ensure_callback=ensure_callback)
+ try:
+ new_gen, new_trans_id = sync_target.sync_exchange(
+ docs_by_generation, self.source._replica_uid,
+ target_last_known_gen, target_last_known_trans_id,
+ self._insert_doc_from_target, ensure_callback=ensure_callback,
+ defer_decryption=defer_decryption)
+ logger.debug(
+ "Soledad source sync info after sync exchange:\n"
+ " source target gen: %d\n"
+ " source target trans_id: %s"
+ % (new_gen, new_trans_id))
+ info = {
+ "target_replica_uid": self.target_replica_uid,
+ "new_gen": new_gen,
+ "new_trans_id": new_trans_id,
+ "my_gen": my_gen
+ }
+ self._syncing_info = info
+ if defer_decryption and not sync_target.has_syncdb():
+ logger.debug("Sync target has no valid sync db, "
+ "aborting defer_decryption")
+ defer_decryption = False
+ self.complete_sync()
+ except Exception as e:
+ logger.error("Soledad sync error: %s" % str(e))
+ logger.error(traceback.format_exc())
+ sync_target.stop()
+ finally:
+ sync_target.close()
- # record target synced-up-to generation including applying what we sent
+ return my_gen
+
+ def complete_sync(self):
+ """
+ Last stage of the synchronization:
+ (a) record last known generation and transaction uid for the remote
+ replica, and
+ (b) make target aware of our current reached generation.
+ """
+ logger.debug("Completing deferred last step in SYNC...")
+
+ # record target synced-up-to generation including applying what we
+ # sent
+ info = self._syncing_info
self.source._set_replica_gen_and_trans_id(
- self.target_replica_uid, new_gen, new_trans_id)
+ info["target_replica_uid"], info["new_gen"], info["new_trans_id"])
+
# if gapless record current reached generation with target
- self._record_sync_info_with_the_target(my_gen)
+ self._record_sync_info_with_the_target(info["my_gen"])
- return my_gen
+ @property
+ def syncing(self):
+ """
+ Return True if a sync is ongoing, False otherwise.
+ :rtype: bool
+ """
+ # XXX FIXME we need some mechanism for timeout: should cleanup and
+ # release if something in the syncdb-decrypt goes wrong. we could keep
+ # track of the release date and cleanup unrealistic sync entries after
+ # some time.
+ locked = self.syncing_lock.locked()
+ return locked
+
+ def release_syncing_lock(self):
+ """
+ Release syncing lock if it's locked.
+ """
+ if self.syncing_lock.locked():
+ self.syncing_lock.release()
+
+ def close(self):
+ """
+ Close sync target pool of workers.
+ """
+ self.release_syncing_lock()
+ self.sync_target.close()
+
+ def __del__(self):
+ """
+ Cleanup: release lock.
+ """
+ self.release_syncing_lock()
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 968545b6..70e4d3a2 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -14,458 +14,466 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
"""
A U1DB backend for encrypting data before sending to server and decrypting
after receiving.
"""
-import binascii
+
+
import cStringIO
import gzip
-import hashlib
-import hmac
import logging
+import re
import urllib
import threading
+import urlparse
-import simplejson as json
+from collections import defaultdict
from time import sleep
from uuid import uuid4
+from contextlib import contextmanager
-from u1db.remote import utils, http_errors
-from u1db.errors import BrokenSyncStream
+import simplejson as json
+from taskthread import TimerTask
from u1db import errors
+from u1db.remote import utils, http_errors
from u1db.remote.http_target import HTTPSyncTarget
-from u1db.remote.http_client import _encode_query_parameter
-
+from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
+from zope.proxy import ProxyBase
+from zope.proxy import sameProxiedObjects, setProxiedObject
from leap.soledad.common import soledad_assert
-from leap.soledad.common.crypto import (
- EncryptionSchemes,
- UnknownEncryptionScheme,
- MacMethods,
- UnknownMacMethod,
- WrongMac,
- ENC_JSON_KEY,
- ENC_SCHEME_KEY,
- ENC_METHOD_KEY,
- ENC_IV_KEY,
- MAC_KEY,
- MAC_METHOD_KEY,
-)
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
-from leap.soledad.client.crypto import (
- EncryptionMethods,
- UnknownEncryptionMethod,
-)
-from leap.soledad.client.events import (
- SOLEDAD_SYNC_SEND_STATUS,
- SOLEDAD_SYNC_RECEIVE_STATUS,
- signal,
-)
+from leap.soledad.client.crypto import is_symmetrically_encrypted
+from leap.soledad.client.crypto import encrypt_doc, decrypt_doc
+from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
+from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
+from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
+from leap.soledad.client.events import signal
-logger = logging.getLogger(__name__)
-#
-# Exceptions
-#
+logger = logging.getLogger(__name__)
-class DocumentNotEncrypted(Exception):
+def _gunzip(data):
"""
- Raised for failures in document encryption.
+ Uncompress data that is gzipped.
+
+ :param data: gzipped data
+ :type data: basestring
"""
- pass
+ buffer = cStringIO.StringIO()
+ buffer.write(data)
+ buffer.seek(0)
+ try:
+ data = gzip.GzipFile(mode='r', fileobj=buffer).read()
+ except Exception:
+ logger.warning("Error while decrypting gzipped data")
+ buffer.close()
+ return data
-#
-# Crypto utilities for a SoledadDocument.
-#
+class PendingReceivedDocsSyncError(Exception):
+ pass
-def mac_doc(crypto, doc_id, doc_rev, ciphertext, mac_method):
+class DocumentSyncerThread(threading.Thread):
"""
- Calculate a MAC for C{doc} using C{ciphertext}.
-
- Current MAC method used is HMAC, with the following parameters:
-
- * key: sha256(storage_secret, doc_id)
- * msg: doc_id + doc_rev + ciphertext
- * digestmod: sha256
-
- :param crypto: A SoledadCryto instance used to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
- :param doc_id: The id of the document.
- :type doc_id: str
- :param doc_rev: The revision of the document.
- :type doc_rev: str
- :param ciphertext: The content of the document.
- :type ciphertext: str
- :param mac_method: The MAC method to use.
- :type mac_method: str
-
- :return: The calculated MAC.
- :rtype: str
+ A thread that knowns how to either send or receive a document during the
+ sync process.
"""
- if mac_method == MacMethods.HMAC:
- return hmac.new(
- crypto.doc_mac_key(doc_id),
- str(doc_id) + str(doc_rev) + ciphertext,
- hashlib.sha256).digest()
- # raise if we do not know how to handle this MAC method
- raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method)
+ def __init__(self, doc_syncer, release_method, failed_method,
+ idx, total, last_request_lock=None, last_callback_lock=None):
+ """
+ Initialize a new syncer thread.
+
+ :param doc_syncer: A document syncer.
+ :type doc_syncer: HTTPDocumentSyncer
+ :param release_method: A method to be called when finished running.
+ :type release_method: callable(DocumentSyncerThread)
+ :param failed_method: A method to be called when we failed.
+ :type failed_method: callable(DocumentSyncerThread)
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ :param last_request_lock: A lock to wait for before actually performing
+ the request.
+ :type last_request_lock: threading.Lock
+ :param last_callback_lock: A lock to wait for before actually running
+ the success callback.
+ :type last_callback_lock: threading.Lock
+ """
+ threading.Thread.__init__(self)
+ self._doc_syncer = doc_syncer
+ self._release_method = release_method
+ self._failed_method = failed_method
+ self._idx = idx
+ self._total = total
+ self._last_request_lock = last_request_lock
+ self._last_callback_lock = last_callback_lock
+ self._response = None
+ self._exception = None
+ self._result = None
+ self._success = False
+ # a lock so we can signal when we're finished
+ self._request_lock = threading.Lock()
+ self._request_lock.acquire()
+ self._callback_lock = threading.Lock()
+ self._callback_lock.acquire()
+ # make thread interruptable
+ self._stopped = None
+ self._stop_lock = threading.Lock()
-def encrypt_doc(crypto, doc):
- """
- Encrypt C{doc}'s content.
-
- Encrypt doc's contents using AES-256 CTR mode and return a valid JSON
- string representing the following:
-
- {
- ENC_JSON_KEY: '<encrypted doc JSON string>',
- ENC_SCHEME_KEY: 'symkey',
- ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
- ENC_IV_KEY: '<the initial value used to encrypt>',
- MAC_KEY: '<mac>'
- MAC_METHOD_KEY: 'hmac'
- }
-
- :param crypto: A SoledadCryto instance used to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
- :param doc: The document with contents to be encrypted.
- :type doc: SoledadDocument
-
- :return: The JSON serialization of the dict representing the encrypted
- content.
- :rtype: str
- """
- soledad_assert(doc.is_tombstone() is False)
- # encrypt content using AES-256 CTR mode
- iv, ciphertext = crypto.encrypt_sym(
- str(doc.get_json()), # encryption/decryption routines expect str
- crypto.doc_passphrase(doc.doc_id),
- method=EncryptionMethods.AES_256_CTR)
- # Return a representation for the encrypted content. In the following, we
- # convert binary data to hexadecimal representation so the JSON
- # serialization does not complain about what it tries to serialize.
- hex_ciphertext = binascii.b2a_hex(ciphertext)
- return json.dumps({
- ENC_JSON_KEY: hex_ciphertext,
- ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY,
- ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
- ENC_IV_KEY: iv,
- # store the mac as hex.
- MAC_KEY: binascii.b2a_hex(
- mac_doc(
- crypto, doc.doc_id, doc.rev,
- ciphertext,
- MacMethods.HMAC)),
- MAC_METHOD_KEY: MacMethods.HMAC,
- })
-
-
-def decrypt_doc(crypto, doc):
- """
- Decrypt C{doc}'s content.
+ def run(self):
+ """
+ Run the HTTP request and store results.
- Return the JSON string representation of the document's decrypted content.
+ This method will block and wait for an eventual previous operation to
+ finish before actually performing the request. It also traps any
+ exception and register any failure with the request.
+ """
+ with self._stop_lock:
+ if self._stopped is None:
+ self._stopped = False
+ else:
+ return
+
+ # eventually wait for the previous thread to finish
+ if self._last_request_lock is not None:
+ self._last_request_lock.acquire()
+
+ # bail out in case we've been interrupted
+ if self.stopped is True:
+ return
+
+ try:
+ self._response = self._doc_syncer.do_request()
+ self._request_lock.release()
+
+ # run success callback
+ if self._doc_syncer.success_callback is not None:
+
+ # eventually wait for callback lock release
+ if self._last_callback_lock is not None:
+ self._last_callback_lock.acquire()
+
+ # bail out in case we've been interrupted
+ if self._stopped is True:
+ return
+
+ self._result = self._doc_syncer.success_callback(
+ self._idx, self._total, self._response)
+ self._success = True
+ doc_syncer = self._doc_syncer
+ self._release_method(self, doc_syncer)
+ self._doc_syncer = None
+ # let next thread executed its callback
+ self._callback_lock.release()
+
+ # trap any exception and signal failure
+ except Exception as e:
+ self._exception = e
+ self._success = False
+ # run failure callback
+ if self._doc_syncer.failure_callback is not None:
+
+ # eventually wait for callback lock release
+ if self._last_callback_lock is not None:
+ self._last_callback_lock.acquire()
+
+ # bail out in case we've been interrupted
+ if self.stopped is True:
+ return
+
+ self._doc_syncer.failure_callback(
+ self._idx, self._total, self._exception)
+
+ self._failed_method(self)
+ # we do not release the callback lock here because we
+ # failed and so we don't want other threads to succeed.
- The content of the document should have the following structure:
+ @property
+ def doc_syncer(self):
+ return self._doc_syncer
- {
- ENC_JSON_KEY: '<enc_blob>',
- ENC_SCHEME_KEY: '<enc_scheme>',
- ENC_METHOD_KEY: '<enc_method>',
- ENC_IV_KEY: '<initial value used to encrypt>', # (optional)
- MAC_KEY: '<mac>'
- MAC_METHOD_KEY: 'hmac'
- }
+ @property
+ def response(self):
+ return self._response
- C{enc_blob} is the encryption of the JSON serialization of the document's
- content. For now Soledad just deals with documents whose C{enc_scheme} is
- EncryptionSchemes.SYMKEY and C{enc_method} is
- EncryptionMethods.AES_256_CTR.
+ @property
+ def exception(self):
+ return self._exception
- :param crypto: A SoledadCryto instance to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
- :param doc: The document to be decrypted.
- :type doc: SoledadDocument
+ @property
+ def callback_lock(self):
+ return self._callback_lock
- :return: The JSON serialization of the decrypted content.
- :rtype: str
- """
- soledad_assert(doc.is_tombstone() is False)
- soledad_assert(ENC_JSON_KEY in doc.content)
- soledad_assert(ENC_SCHEME_KEY in doc.content)
- soledad_assert(ENC_METHOD_KEY in doc.content)
- soledad_assert(MAC_KEY in doc.content)
- soledad_assert(MAC_METHOD_KEY in doc.content)
- # verify MAC
- ciphertext = binascii.a2b_hex( # content is stored as hex.
- doc.content[ENC_JSON_KEY])
- mac = mac_doc(
- crypto, doc.doc_id, doc.rev,
- ciphertext,
- doc.content[MAC_METHOD_KEY])
- # we compare mac's hashes to avoid possible timing attacks that might
- # exploit python's builtin comparison operator behaviour, which fails
- # immediatelly when non-matching bytes are found.
- doc_mac_hash = hashlib.sha256(
- binascii.a2b_hex( # the mac is stored as hex
- doc.content[MAC_KEY])).digest()
- calculated_mac_hash = hashlib.sha256(mac).digest()
- if doc_mac_hash != calculated_mac_hash:
- raise WrongMac('Could not authenticate document\'s contents.')
- # decrypt doc's content
- enc_scheme = doc.content[ENC_SCHEME_KEY]
- plainjson = None
- if enc_scheme == EncryptionSchemes.SYMKEY:
- enc_method = doc.content[ENC_METHOD_KEY]
- if enc_method == EncryptionMethods.AES_256_CTR:
- soledad_assert(ENC_IV_KEY in doc.content)
- plainjson = crypto.decrypt_sym(
- ciphertext,
- crypto.doc_passphrase(doc.doc_id),
- method=enc_method,
- iv=doc.content[ENC_IV_KEY])
- else:
- raise UnknownEncryptionMethod(enc_method)
- else:
- raise UnknownEncryptionScheme(enc_scheme)
- return plainjson
+ @property
+ def request_lock(self):
+ return self._request_lock
+ @property
+ def success(self):
+ return self._success
-def _gunzip(data):
- """
- Uncompress data that is gzipped.
+ def stop(self):
+ with self._stop_lock:
+ self._stopped = True
- :param data: gzipped data
- :type data: basestring
- """
- buffer = cStringIO.StringIO()
- buffer.write(data)
- buffer.seek(0)
- try:
- data = gzip.GzipFile(mode='r', fileobj=buffer).read()
- except Exception:
- logger.warning("Error while decrypting gzipped data")
- buffer.close()
- return data
+ @property
+ def stopped(self):
+ with self._stop_lock:
+ return self._stopped
+ @property
+ def result(self):
+ return self._result
-#
-# SoledadSyncTarget
-#
-class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
+class DocumentSyncerPool(object):
"""
- A SyncTarget that encrypts data before sending and decrypts data after
- receiving.
+ A pool of reusable document syncers.
"""
- #
- # Token auth methods.
- #
+ POOL_SIZE = 10
+ """
+ The maximum amount of syncer threads running at the same time.
+ """
- def set_token_credentials(self, uuid, token):
+ def __init__(self, raw_url, raw_creds, query_string, headers,
+ ensure_callback, stop_method):
"""
- Store given credentials so we can sign the request later.
+ Initialize the document syncer pool.
+
+ :param raw_url: The complete raw URL for the HTTP request.
+ :type raw_url: str
+ :param raw_creds: The credentials for the HTTP request.
+ :type raw_creds: dict
+ :param query_string: The query string for the HTTP request.
+ :type query_string: str
+ :param headers: The headers for the HTTP request.
+ :type headers: dict
+ :param ensure_callback: A callback to ensure we have the correct
+ target_replica_uid, if it was just created.
+ :type ensure_callback: callable
- :param uuid: The user's uuid.
- :type uuid: str
- :param token: The authentication token.
- :type token: str
"""
- TokenBasedAuth.set_token_credentials(self, uuid, token)
-
- def _sign_request(self, method, url_query, params):
+ # save syncer params
+ self._raw_url = raw_url
+ self._raw_creds = raw_creds
+ self._query_string = query_string
+ self._headers = headers
+ self._ensure_callback = ensure_callback
+ self._stop_method = stop_method
+ # pool attributes
+ self._failures = False
+ self._semaphore_pool = threading.BoundedSemaphore(
+ DocumentSyncerPool.POOL_SIZE)
+ self._pool_access_lock = threading.Lock()
+ self._doc_syncers = []
+ self._threads = []
+
+ def new_syncer_thread(self, idx, total, last_request_lock=None,
+ last_callback_lock=None):
"""
- Return an authorization header to be included in the HTTP request.
+ Yield a new document syncer thread.
+
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ :param last_request_lock: A lock to wait for before actually performing
+ the request.
+ :type last_request_lock: threading.Lock
+ :param last_callback_lock: A lock to wait for before actually running
+ the success callback.
+ :type last_callback_lock: threading.Lock
+ """
+ t = None
+ # wait for available threads
+ self._semaphore_pool.acquire()
+ with self._pool_access_lock:
+ if self._failures is True:
+ return None
+ # get a syncer
+ doc_syncer = self._get_syncer()
+ # we rely on DocumentSyncerThread.run() to release the lock using
+ # self.release_syncer so we can launch a new thread.
+ t = DocumentSyncerThread(
+ doc_syncer, self.release_syncer, self.cancel_threads,
+ idx, total,
+ last_request_lock=last_request_lock,
+ last_callback_lock=last_callback_lock)
+ self._threads.append(t)
+ return t
+
+ def _failed(self):
+ with self._pool_access_lock:
+ self._failures = True
- :param method: The HTTP method.
- :type method: str
- :param url_query: The URL query string.
- :type url_query: str
- :param params: A list with encoded query parameters.
- :type param: list
+ @property
+ def failures(self):
+ return self._failures
- :return: The Authorization header.
- :rtype: list of tuple
+ def _get_syncer(self):
"""
- return TokenBasedAuth._sign_request(self, method, url_query, params)
-
- #
- # Modified HTTPSyncTarget methods.
- #
+ Get a document syncer from the pool.
- @staticmethod
- def connect(url, crypto=None):
- return SoledadSyncTarget(url, crypto=crypto)
+ This method will create a new syncer whenever there is no syncer
+ available in the pool.
- def __init__(self, url, creds=None, crypto=None):
+ :return: A syncer.
+ :rtype: HTTPDocumentSyncer
"""
- Initialize the SoledadSyncTarget.
-
- :param url: The url of the target replica to sync with.
- :type url: str
- :param creds: optional dictionary giving credentials.
- to authorize the operation with the server.
- :type creds: dict
- :param soledad: An instance of Soledad so we can encrypt/decrypt
- document contents when syncing.
- :type soledad: soledad.Soledad
+ syncer = None
+ # get an available syncer or create a new one
+ try:
+ syncer = self._doc_syncers.pop()
+ except IndexError:
+ syncer = HTTPDocumentSyncer(
+ self._raw_url, self._raw_creds, self._query_string,
+ self._headers, self._ensure_callback)
+ return syncer
+
+ def release_syncer(self, syncer_thread, doc_syncer):
"""
- HTTPSyncTarget.__init__(self, url, creds)
- self._crypto = crypto
- self._stopped = True
- self._stop_lock = threading.Lock()
+ Return a syncer to the pool after use and check for any failures.
- def _init_post_request(self, url, action, headers, content_length):
+ :param syncer: The syncer to be returned to the pool.
+ :type syncer: HTTPDocumentSyncer
"""
- Initiate a syncing POST request.
+ with self._pool_access_lock:
+ self._doc_syncers.append(doc_syncer)
+ if syncer_thread.success is True:
+ self._threads.remove(syncer_thread)
+ self._semaphore_pool.release()
- :param url: The syncing URL.
- :type url: str
- :param action: The syncing action, either 'get' or 'receive'.
- :type action: str
- :param headers: The initial headers to be sent on this request.
- :type headers: dict
- :param content_length: The content-length of the request.
- :type content_length: int
+ def cancel_threads(self, calling_thread):
"""
- self._conn.putrequest('POST', url)
- self._conn.putheader(
- 'content-type', 'application/x-soledad-sync-%s' % action)
- for header_name, header_value in headers:
- self._conn.putheader(header_name, header_value)
- self._conn.putheader('accept-encoding', 'gzip')
- self._conn.putheader('content-length', str(content_length))
- self._conn.endheaders()
-
- def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
- headers, return_doc_cb, ensure_callback, sync_id):
+ Stop all threads in the pool.
"""
- Fetch sync documents from the remote database and insert them in the
- local database.
+ # stop sync
+ self._stop_method()
+ stopped = []
+ # stop all threads
+ logger.warning("Soledad sync: cancelling sync threads...")
+ with self._pool_access_lock:
+ self._failures = True
+ while self._threads:
+ t = self._threads.pop(0)
+ t.stop()
+ self._doc_syncers.append(t.doc_syncer)
+ stopped.append(t)
+ # release locks and join
+ while stopped:
+ t = stopped.pop(0)
+ t.request_lock.acquire(False) # just in case
+ t.request_lock.release()
+ t.callback_lock.acquire(False) # just in case
+ t.callback_lock.release()
+ logger.warning("Soledad sync: cancelled sync threads.")
+
+ def cleanup(self):
+ """
+ Close and remove any syncers from the pool.
+ """
+ with self._pool_access_lock:
+ while self._doc_syncers:
+ syncer = self._doc_syncers.pop()
+ syncer.close()
+ del syncer
- If an incoming document's encryption scheme is equal to
- EncryptionSchemes.SYMKEY, then this method will decrypt it with
- Soledad's symmetric key.
- :param url: The syncing URL.
- :type url: str
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
- :param headers: The headers of the HTTP request.
+class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):
+
+ def __init__(self, raw_url, creds, query_string, headers, ensure_callback):
+ """
+ Initialize the client.
+
+ :param raw_url: The raw URL of the target HTTP server.
+ :type raw_url: str
+ :param creds: Authentication credentials.
+ :type creds: dict
+ :param query_string: The query string for the HTTP request.
+ :type query_string: str
+ :param headers: The headers for the HTTP request.
:type headers: dict
- :param return_doc_cb: A callback to insert docs from target.
- :type return_doc_cb: callable
:param ensure_callback: A callback to ensure we have the correct
target_replica_uid, if it was just created.
:type ensure_callback: callable
+ """
+ HTTPClientBase.__init__(self, raw_url, creds=creds)
+ # info needed to perform the request
+ self._query_string = query_string
+ self._headers = headers
+ self._ensure_callback = ensure_callback
+ # the actual request method
+ self._request_method = None
+ self._success_callback = None
+ self._failure_callback = None
+
+ def _reset(self):
+ """
+ Reset this document syncer so we can reuse it.
+ """
+ self._request_method = None
+ self._success_callback = None
+ self._failure_callback = None
+ self._request_method = None
- :raise BrokenSyncStream: If C{data} is malformed.
+ def set_request_method(self, method, *args, **kwargs):
+ """
+ Set the actual method to perform the request.
- :return: A dictionary representing the first line of the response got
- from remote replica.
- :rtype: list of str
- """
-
- def _post_get_doc(received):
- """
- Get a sync document from server by means of a POST request.
-
- :param received: The number of documents already received in the
- current sync session.
- :type received: int
- """
- entries = ['[']
- size = 1
- # add remote replica metadata to the request
- size += self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=ensure_callback is not None)
- # inform server of how many documents have already been received
- size += self._prepare(
- ',', entries, received=received)
- entries.append('\r\n]')
- size += len(entries[-1])
- # send headers
- self._init_post_request(url, 'get', headers, size)
- # get document
- for entry in entries:
- self._conn.send(entry)
- return self._response()
-
- number_of_changes = None
- received = 0
+ :param method: Either 'get' or 'put'.
+ :type method: str
+ :param args: Arguments for the request method.
+ :type args: list
+ :param kwargs: Keyworded arguments for the request method.
+ :type kwargs: dict
+ """
+ self._reset()
+ # resolve request method
+ if method is 'get':
+ self._request_method = self._get_doc
+ elif method is 'put':
+ self._request_method = self._put_doc
+ else:
+ raise Exception
+ # store request method args
+ self._args = args
+ self._kwargs = kwargs
- new_generation = last_known_generation
- new_transaction_id = last_known_trans_id
- while number_of_changes is None or received < number_of_changes:
- # bail out if sync process was interrupted
- if self.stopped is True:
- return last_known_generation, last_known_trans_id
- # try to fetch one document from target
- data, _ = _post_get_doc(received)
- # decode incoming stream
- parts = data.splitlines()
- if not parts or parts[0] != '[' or parts[-1] != ']':
- raise BrokenSyncStream
- data = parts[1:-1]
- # decode metadata
- line, comma = utils.check_and_strip_comma(data[0])
- metadata = None
- try:
- metadata = json.loads(line)
- soledad_assert('number_of_changes' in metadata)
- soledad_assert('new_generation' in metadata)
- soledad_assert('new_transaction_id' in metadata)
- number_of_changes = metadata['number_of_changes']
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
- except json.JSONDecodeError, AssertionError:
- raise BrokenSyncStream
- # make sure we have replica_uid from fresh new dbs
- if ensure_callback and 'replica_uid' in metadata:
- ensure_callback(metadata['replica_uid'])
- # bail out if there are no documents to be received
- if number_of_changes == 0:
- break
- # decrypt incoming document and insert into local database
- entry = None
- try:
- entry = json.loads(data[1])
- except IndexError:
- raise BrokenSyncStream
- # -------------------------------------------------------------
- # symmetric decryption of document's contents
- # -------------------------------------------------------------
- # if arriving content was symmetrically encrypted, we decrypt
- # it.
- doc = SoledadDocument(
- entry['id'], entry['rev'], entry['content'])
- if doc.content and ENC_SCHEME_KEY in doc.content:
- if doc.content[ENC_SCHEME_KEY] == \
- EncryptionSchemes.SYMKEY:
- doc.set_json(decrypt_doc(self._crypto, doc))
- # -------------------------------------------------------------
- # end of symmetric decryption
- # -------------------------------------------------------------
- return_doc_cb(doc, entry['gen'], entry['trans_id'])
- received += 1
- signal(
- SOLEDAD_SYNC_RECEIVE_STATUS,
- "%d/%d" %
- (received, number_of_changes))
- return new_generation, new_transaction_id
+ def set_success_callback(self, callback):
+ self._success_callback = callback
+
+ def set_failure_callback(self, callback):
+ self._failure_callback = callback
+
+ @property
+ def success_callback(self):
+ return self._success_callback
+
+ @property
+ def failure_callback(self):
+ return self._failure_callback
+
+ def do_request(self):
+ """
+ Actually perform the request.
+
+ :return: The body and headers of the response.
+ :rtype: tuple
+ """
+ self._ensure_connection()
+ args = self._args
+ kwargs = self._kwargs
+ return self._request_method(*args, **kwargs)
def _request(self, method, url_parts, params=None, body=None,
content_type=None):
@@ -482,6 +490,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:type body: str
:param content-type: The content-type of the request.
:type content-type: str
+
+ :return: The body and headers of the response.
+ :rtype: tuple
+
+ :raise errors.Unavailable: Raised after a number of unsuccesful
+ request attempts.
+ :raise Exception: Raised for any other exception ocurring during the
+ request.
"""
self._ensure_connection()
@@ -566,14 +582,506 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:type entries: list
:param dic: The data to be included in this entry.
:type dic: dict
+
+ :return: The size of the prepared entry.
+ :rtype: int
"""
entry = comma + '\r\n' + json.dumps(dic)
entries.append(entry)
return len(entry)
- def sync_exchange(self, docs_by_generations, source_replica_uid,
- last_known_generation, last_known_trans_id,
- return_doc_cb, ensure_callback=None):
+ def _init_post_request(self, action, content_length):
+ """
+ Initiate a syncing POST request.
+
+ :param url: The syncing URL.
+ :type url: str
+ :param action: The syncing action, either 'get' or 'receive'.
+ :type action: str
+ :param headers: The initial headers to be sent on this request.
+ :type headers: dict
+ :param content_length: The content-length of the request.
+ :type content_length: int
+ """
+ self._conn.putrequest('POST', self._query_string)
+ self._conn.putheader(
+ 'content-type', 'application/x-soledad-sync-%s' % action)
+ for header_name, header_value in self._headers:
+ self._conn.putheader(header_name, header_value)
+ self._conn.putheader('accept-encoding', 'gzip')
+ self._conn.putheader('content-length', str(content_length))
+ self._conn.endheaders()
+
+ def _get_doc(self, received, sync_id, last_known_generation,
+ last_known_trans_id):
+ """
+ Get a sync document from server by means of a POST request.
+
+ :param received: The number of documents already received in the
+ current sync session.
+ :type received: int
+ :param sync_id: The id for the current sync session.
+ :type sync_id: str
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+
+ :return: The body and headers of the response.
+ :rtype: tuple
+ """
+ entries = ['[']
+ size = 1
+ # add remote replica metadata to the request
+ size += self._prepare(
+ '', entries,
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ # inform server of how many documents have already been received
+ size += self._prepare(
+ ',', entries, received=received)
+ entries.append('\r\n]')
+ size += len(entries[-1])
+ # send headers
+ self._init_post_request('get', size)
+ # get document
+ for entry in entries:
+ self._conn.send(entry)
+ return self._response()
+
+ def _put_doc(self, sync_id, last_known_generation, last_known_trans_id,
+ id, rev, content, gen, trans_id, number_of_docs, doc_idx):
+ """
+ Put a sync document on server by means of a POST request.
+
+ :param sync_id: The id for the current sync session.
+ :type sync_id: str
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+ :param id: The document id.
+ :type id: str
+ :param rev: The document revision.
+ :type rev: str
+ :param content: The serialized document content.
+ :type content: str
+ :param gen: The generation of the modification of the document.
+ :type gen: int
+ :param trans_id: The transaction id of the modification of the
+ document.
+ :type trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
+
+ :return: The body and headers of the response.
+ :rtype: tuple
+ """
+ # prepare to send the document
+ entries = ['[']
+ size = 1
+ # add remote replica metadata to the request
+ size += self._prepare(
+ '', entries,
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ # add the document to the request
+ size += self._prepare(
+ ',', entries,
+ id=id, rev=rev, content=content, gen=gen, trans_id=trans_id,
+ number_of_docs=number_of_docs, doc_idx=doc_idx)
+ entries.append('\r\n]')
+ size += len(entries[-1])
+ # send headers
+ self._init_post_request('put', size)
+ # send document
+ for entry in entries:
+ self._conn.send(entry)
+ return self._response()
+
+ def _sign_request(self, method, url_query, params):
+ """
+ Return an authorization header to be included in the HTTP request.
+
+ :param method: The HTTP method.
+ :type method: str
+ :param url_query: The URL query string.
+ :type url_query: str
+ :param params: A list with encoded query parameters.
+ :type param: list
+
+ :return: The Authorization header.
+ :rtype: list of tuple
+ """
+ return TokenBasedAuth._sign_request(self, method, url_query, params)
+
+ def set_token_credentials(self, uuid, token):
+ """
+ Store given credentials so we can sign the request later.
+
+ :param uuid: The user's uuid.
+ :type uuid: str
+ :param token: The authentication token.
+ :type token: str
+ """
+ TokenBasedAuth.set_token_credentials(self, uuid, token)
+
+
+class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
+ """
+ A SyncTarget that encrypts data before sending and decrypts data after
+ receiving.
+
+ Normally encryption will have been written to the sync database upon
+ document modification. The sync database is also used to write temporarily
+ the parsed documents that the remote send us, before being decrypted and
+ written to the main database.
+ """
+
+ # will later keep a reference to the insert-doc callback
+ # passed to sync_exchange
+ _insert_doc_cb = defaultdict(lambda: ProxyBase(None))
+
+ """
+ Period of recurrence of the periodic decrypting task, in seconds.
+ """
+ DECRYPT_TASK_PERIOD = 0.5
+
+ #
+ # Modified HTTPSyncTarget methods.
+ #
+
+ def __init__(self, url, source_replica_uid=None, creds=None, crypto=None,
+ sync_db=None, sync_db_write_lock=None):
+ """
+ Initialize the SoledadSyncTarget.
+
+ :param source_replica_uid: The source replica uid which we use when
+ deferring decryption.
+ :type source_replica_uid: str
+ :param url: The url of the target replica to sync with.
+ :type url: str
+ :param creds: Optional dictionary giving credentials.
+ to authorize the operation with the server.
+ :type creds: dict
+ :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
+ document contents when syncing.
+ :type crypto: soledad.crypto.SoledadCrypto
+ :param sync_db: Optional. handler for the db with the symmetric
+ encryption of the syncing documents. If
+ None, encryption will be done in-place,
+ instead of retreiving it from the dedicated
+ database.
+ :type sync_db: Sqlite handler
+ :param sync_db_write_lock: a write lock for controlling concurrent
+ access to the sync_db
+ :type sync_db_write_lock: threading.Lock
+ """
+ HTTPSyncTarget.__init__(self, url, creds)
+ self._raw_url = url
+ self._raw_creds = creds
+ self._crypto = crypto
+ self._stopped = True
+ self._stop_lock = threading.Lock()
+ self._sync_exchange_lock = threading.Lock()
+ self.source_replica_uid = source_replica_uid
+ self._defer_decryption = False
+
+ # deferred decryption attributes
+ self._sync_db = None
+ self._sync_db_write_lock = None
+ self._decryption_callback = None
+ self._sync_decr_pool = None
+ self._sync_watcher = None
+ if sync_db and sync_db_write_lock is not None:
+ self._sync_db = sync_db
+ self._sync_db_write_lock = sync_db_write_lock
+
+ def _setup_sync_decr_pool(self):
+ """
+ Set up the SyncDecrypterPool for deferred decryption.
+ """
+ if self._sync_decr_pool is None:
+ # initialize syncing queue decryption pool
+ self._sync_decr_pool = SyncDecrypterPool(
+ self._crypto, self._sync_db,
+ self._sync_db_write_lock,
+ insert_doc_cb=self._insert_doc_cb)
+ self._sync_decr_pool.set_source_replica_uid(
+ self.source_replica_uid)
+
+ def _teardown_sync_decr_pool(self):
+ """
+ Tear down the SyncDecrypterPool.
+ """
+ if self._sync_decr_pool is not None:
+ self._sync_decr_pool.close()
+ self._sync_decr_pool = None
+
+ def _setup_sync_watcher(self):
+ """
+ Set up the sync watcher for deferred decryption.
+ """
+ if self._sync_watcher is None:
+ self._sync_watcher = TimerTask(
+ self._decrypt_syncing_received_docs,
+ delay=self.DECRYPT_TASK_PERIOD)
+
+ def _teardown_sync_watcher(self):
+ """
+ Tear down the sync watcher.
+ """
+ if self._sync_watcher is not None:
+ self._sync_watcher.stop()
+ self._sync_watcher.shutdown()
+ self._sync_watcher = None
+
+ def _get_replica_uid(self, url):
+ """
+ Return replica uid from the url, or None.
+
+ :param url: the replica url
+ :type url: str
+ """
+ replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url)
+ return replica_uid_match[0] if len(replica_uid_match) > 0 else None
+
+ @staticmethod
+ def connect(url, source_replica_uid=None, crypto=None):
+ return SoledadSyncTarget(
+ url, source_replica_uid=source_replica_uid, crypto=crypto)
+
+ def _parse_received_doc_response(self, response):
+ """
+ Parse the response from the server containing the received document.
+
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+ """
+ data, _ = response
+ # decode incoming stream
+ parts = data.splitlines()
+ if not parts or parts[0] != '[' or parts[-1] != ']':
+ raise errors.BrokenSyncStream
+ data = parts[1:-1]
+ # decode metadata
+ line, comma = utils.check_and_strip_comma(data[0])
+ metadata = None
+ try:
+ metadata = json.loads(line)
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
+ number_of_changes = metadata['number_of_changes']
+ except (json.JSONDecodeError, KeyError):
+ raise errors.BrokenSyncStream
+ # make sure we have replica_uid from fresh new dbs
+ if self._ensure_callback and 'replica_uid' in metadata:
+ self._ensure_callback(metadata['replica_uid'])
+ # parse incoming document info
+ doc_id = None
+ rev = None
+ content = None
+ gen = None
+ trans_id = None
+ if number_of_changes > 0:
+ try:
+ entry = json.loads(data[1])
+ doc_id = entry['id']
+ rev = entry['rev']
+ content = entry['content']
+ gen = entry['gen']
+ trans_id = entry['trans_id']
+ except (IndexError, KeyError):
+ raise errors.BrokenSyncStream
+ return new_generation, new_transaction_id, number_of_changes, \
+ doc_id, rev, content, gen, trans_id
+
+ def _insert_received_doc(self, idx, total, response):
+ """
+ Insert a received document into the local replica.
+
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+ """
+ new_generation, new_transaction_id, number_of_changes, doc_id, \
+ rev, content, gen, trans_id = \
+ self._parse_received_doc_response(response)
+ if doc_id is not None:
+ # decrypt incoming document and insert into local database
+ # -------------------------------------------------------------
+ # symmetric decryption of document's contents
+ # -------------------------------------------------------------
+ # If arriving content was symmetrically encrypted, we decrypt it.
+ # We do it inline if defer_decryption flag is False or no sync_db
+ # was defined, otherwise we defer it writing it to the received
+ # docs table.
+ doc = SoledadDocument(doc_id, rev, content)
+ if is_symmetrically_encrypted(doc):
+ if self._queue_for_decrypt:
+ self._save_encrypted_received_doc(
+ doc, gen, trans_id, idx, total)
+ else:
+ # defer_decryption is False or no-sync-db fallback
+ doc.set_json(decrypt_doc(self._crypto, doc))
+ self._return_doc_cb(doc, gen, trans_id)
+ else:
+ # not symmetrically encrypted doc, insert it directly
+ # or save it in the decrypted stage.
+ if self._queue_for_decrypt:
+ self._save_received_doc(doc, gen, trans_id, idx, total)
+ else:
+ self._return_doc_cb(doc, gen, trans_id)
+ # -------------------------------------------------------------
+ # end of symmetric decryption
+ # -------------------------------------------------------------
+ msg = "%d/%d" % (idx + 1, total)
+ signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg)
+ logger.debug("Soledad sync receive status: %s" % msg)
+ return number_of_changes, new_generation, new_transaction_id
+
+ def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
+ headers, return_doc_cb, ensure_callback, sync_id,
+ syncer_pool, defer_decryption=False):
+ """
+ Fetch sync documents from the remote database and insert them in the
+ local database.
+
+ If an incoming document's encryption scheme is equal to
+ EncryptionSchemes.SYMKEY, then this method will decrypt it with
+ Soledad's symmetric key.
+
+ :param url: The syncing URL.
+ :type url: str
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+ :param headers: The headers of the HTTP request.
+ :type headers: dict
+ :param return_doc_cb: A callback to insert docs from target.
+ :type return_doc_cb: callable
+ :param ensure_callback: A callback to ensure we have the correct
+ target_replica_uid, if it was just created.
+ :type ensure_callback: callable
+ :param sync_id: The id for the current sync session.
+ :type sync_id: str
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+
+ :raise BrokenSyncStream: If `data` is malformed.
+
+ :return: A dictionary representing the first line of the response got
+ from remote replica.
+ :rtype: dict
+ """
+ # we keep a reference to the callback in case we defer the decryption
+ self._return_doc_cb = return_doc_cb
+ self._queue_for_decrypt = defer_decryption \
+ and self._sync_db is not None
+
+ new_generation = last_known_generation
+ new_transaction_id = last_known_trans_id
+
+ if self._queue_for_decrypt:
+ logger.debug(
+ "Soledad sync: will queue received docs for decrypting.")
+
+ idx = 0
+ number_of_changes = 1
+
+ first_request = True
+ last_callback_lock = None
+ threads = []
+
+ # get incoming documents
+ while idx < number_of_changes:
+ # bail out if sync process was interrupted
+ if self.stopped is True:
+ break
+
+ # launch a thread to fetch one document from target
+ t = syncer_pool.new_syncer_thread(
+ idx, number_of_changes,
+ last_callback_lock=last_callback_lock)
+
+ # bail out if any thread failed
+ if t is None:
+ self.stop()
+ break
+
+ t.doc_syncer.set_request_method(
+ 'get', idx, sync_id, last_known_generation,
+ last_known_trans_id)
+ t.doc_syncer.set_success_callback(self._insert_received_doc)
+
+ def _failure_callback(idx, total, exception):
+ _failure_msg = "Soledad sync: error while getting document " \
+ "%d/%d: %s" \
+ % (idx + 1, total, exception)
+ logger.warning("%s" % _failure_msg)
+ logger.warning("Soledad sync: failing gracefully, will "
+ "recover on next sync.")
+
+ t.doc_syncer.set_failure_callback(_failure_callback)
+ threads.append(t)
+ t.start()
+ last_callback_lock = t.callback_lock
+ idx += 1
+
+ # if this is the first request, wait to update the number of
+ # changes
+ if first_request is True:
+ t.join()
+ if t.success:
+ number_of_changes, _, _ = t.result
+ first_request = False
+
+ # make sure all threads finished and we have up-to-date info
+ last_successful_thread = None
+ while threads:
+ # check if there are failures
+ t = threads.pop(0)
+ t.join()
+ if t.success:
+ last_successful_thread = t
+
+ # get information about last successful thread
+ if last_successful_thread is not None:
+ body, _ = last_successful_thread.response
+ parsed_body = json.loads(body)
+ # get current target gen and trans id in case no documents were
+ # transferred
+ if len(parsed_body) == 1:
+ metadata = parsed_body[0]
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
+ # get current target gen and trans id from last transferred
+ # document
+ else:
+ doc_data = parsed_body[1]
+ new_generation = doc_data['gen']
+ new_transaction_id = doc_data['trans_id']
+
+ return new_generation, new_transaction_id
+
+ def sync_exchange(self, docs_by_generations,
+ source_replica_uid, last_known_generation,
+ last_known_trans_id, return_doc_cb,
+ ensure_callback=None, defer_decryption=True,
+ sync_id=None):
"""
Find out which documents the remote database does not know about,
encrypt and send them.
@@ -586,24 +1094,54 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
the last local generation the remote
replica knows about.
:type docs_by_generations: list of tuples
+
:param source_replica_uid: The uid of the source replica.
:type source_replica_uid: str
+
:param last_known_generation: Target's last known generation.
:type last_known_generation: int
+
:param last_known_trans_id: Target's last known transaction id.
:type last_known_trans_id: str
+
:param return_doc_cb: A callback for inserting received documents from
- target.
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
:type return_doc_cb: function
+
:param ensure_callback: A callback that ensures we know the target
- replica uid if the target replica was just created.
+ replica uid if the target replica was just
+ created.
:type ensure_callback: function
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+
:return: The new generation and transaction id of the target replica.
:rtype: tuple
"""
+ self._ensure_callback = ensure_callback
+
+ if defer_decryption:
+ self._sync_exchange_lock.acquire()
+ self._setup_sync_decr_pool()
+ self._setup_sync_watcher()
+ self._defer_decryption = True
+
self.start()
- sync_id = str(uuid4())
+
+ if sync_id is None:
+ sync_id = str(uuid4())
+ self.source_replica_uid = source_replica_uid
+ # let the decrypter pool access the passed callback to insert docs
+ setProxiedObject(self._insert_doc_cb[source_replica_uid],
+ return_doc_cb)
+
+ if not self.clear_to_sync():
+ raise PendingReceivedDocsSyncError
self._ensure_connection()
if self._trace_hook: # for tests
@@ -611,78 +1149,135 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
url = '%s/sync-from/%s' % (self._url.path, source_replica_uid)
headers = self._sign_request('POST', url, {})
- def _post_put_doc(headers, last_known_generation, last_known_trans_id,
- id, rev, content, gen, trans_id, sync_id):
- """
- Put a sync document on server by means of a POST request.
-
- :param received: How many documents have already been received in
- this sync session.
- :type received: int
- """
- # prepare to send the document
- entries = ['[']
- size = 1
- # add remote replica metadata to the request
- size += self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=ensure_callback is not None)
- # add the document to the request
- size += self._prepare(
- ',', entries,
- id=id, rev=rev, content=content, gen=gen, trans_id=trans_id)
- entries.append('\r\n]')
- size += len(entries[-1])
- # send headers
- self._init_post_request(url, 'put', headers, size)
- # send document
- for entry in entries:
- self._conn.send(entry)
- data, _ = self._response()
- data = json.loads(data)
- return data[0]['new_generation'], data[0]['new_transaction_id']
-
cur_target_gen = last_known_generation
cur_target_trans_id = last_known_trans_id
# send docs
+ msg = "%d/%d" % (0, len(docs_by_generations))
+ signal(SOLEDAD_SYNC_SEND_STATUS, msg)
+ logger.debug("Soledad sync send status: %s" % msg)
+
+ defer_encryption = self._sync_db is not None
+ syncer_pool = DocumentSyncerPool(
+ self._raw_url, self._raw_creds, url, headers, ensure_callback,
+ self.stop)
+ threads = []
+ last_request_lock = None
+ last_callback_lock = None
sent = 0
- signal(
- SOLEDAD_SYNC_SEND_STATUS,
- "%d/%d" % (0, len(docs_by_generations)))
+ total = len(docs_by_generations)
+
+ synced = []
+ number_of_docs = len(docs_by_generations)
+
for doc, gen, trans_id in docs_by_generations:
# allow for interrupting the sync process
if self.stopped is True:
break
+
# skip non-syncable docs
if isinstance(doc, SoledadDocument) and not doc.syncable:
continue
+
# -------------------------------------------------------------
# symmetric encryption of document's contents
# -------------------------------------------------------------
doc_json = doc.get_json()
if not doc.is_tombstone():
- doc_json = encrypt_doc(self._crypto, doc)
+ if not defer_encryption:
+ # fallback case, for tests
+ doc_json = encrypt_doc(self._crypto, doc)
+ else:
+ try:
+ doc_json = self.get_encrypted_doc_from_db(
+ doc.doc_id, doc.rev)
+ except Exception as exc:
+ logger.error("Error while getting "
+ "encrypted doc from db")
+ logger.exception(exc)
+ continue
+ if doc_json is None:
+ # Not marked as tombstone, but we got nothing
+ # from the sync db. As it is not encrypted yet, we
+ # force inline encryption.
+ # TODO: implement a queue to deal with these cases.
+ doc_json = encrypt_doc(self._crypto, doc)
# -------------------------------------------------------------
# end of symmetric encryption
# -------------------------------------------------------------
- cur_target_gen, cur_target_trans_id = _post_put_doc(
- headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id,
- rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id,
- sync_id=sync_id)
+ t = syncer_pool.new_syncer_thread(
+ sent + 1, total, last_request_lock=None,
+ last_callback_lock=last_callback_lock)
+
+ # bail out if any thread failed
+ if t is None:
+ self.stop()
+ break
+
+ # set the request method
+ t.doc_syncer.set_request_method(
+ 'put', sync_id, cur_target_gen, cur_target_trans_id,
+ id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
+ trans_id=trans_id, number_of_docs=number_of_docs, doc_idx=sent + 1)
+ # set the success calback
+
+ def _success_callback(idx, total, response):
+ _success_msg = "Soledad sync send status: %d/%d" \
+ % (idx, total)
+ signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg)
+ logger.debug(_success_msg)
+
+ t.doc_syncer.set_success_callback(_success_callback)
+
+ # set the failure callback
+ def _failure_callback(idx, total, exception):
+ _failure_msg = "Soledad sync: error while sending document " \
+ "%d/%d: %s" % (idx, total, exception)
+ logger.warning("%s" % _failure_msg)
+ logger.warning("Soledad sync: failing gracefully, will "
+ "recover on next sync.")
+
+ t.doc_syncer.set_failure_callback(_failure_callback)
+
+ # save thread and append
+ t.start()
+ threads.append((t, doc))
+ last_request_lock = t.request_lock
+ last_callback_lock = t.callback_lock
sent += 1
- signal(
- SOLEDAD_SYNC_SEND_STATUS,
- "%d/%d" % (sent, len(docs_by_generations)))
+
+ # make sure all threads finished and we have up-to-date info
+ while threads:
+ # check if there are failures
+ t, doc = threads.pop(0)
+ t.join()
+ if t.success:
+ synced.append((doc.doc_id, doc.rev))
+
+ if defer_decryption:
+ self._sync_watcher.start()
# get docs from target
- cur_target_gen, cur_target_trans_id = self._get_remote_docs(
- url,
- last_known_generation, last_known_trans_id, headers,
- return_doc_cb, ensure_callback, sync_id)
+ if self.stopped is False:
+ cur_target_gen, cur_target_trans_id = self._get_remote_docs(
+ url,
+ last_known_generation, last_known_trans_id, headers,
+ return_doc_cb, ensure_callback, sync_id, syncer_pool,
+ defer_decryption=defer_decryption)
+ syncer_pool.cleanup()
+
+ # delete documents from the sync database
+ if defer_encryption:
+ self.delete_encrypted_docs_from_db(synced)
+
+ # wait for deferred decryption to finish
+ if defer_decryption:
+ while self.clear_to_sync() is False:
+ sleep(self.DECRYPT_TASK_PERIOD)
+ self._teardown_sync_watcher()
+ self._teardown_sync_decr_pool()
+ self._sync_exchange_lock.release()
+
self.stop()
return cur_target_gen, cur_target_trans_id
@@ -714,3 +1309,164 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
with self._stop_lock:
return self._stopped is True
+
+ def get_encrypted_doc_from_db(self, doc_id, doc_rev):
+ """
+ Retrieve encrypted document from the database of encrypted docs for
+ sync.
+
+ :param doc_id: The Document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision
+ :type doc_rev: str
+ """
+ encr = SyncEncrypterPool
+ c = self._sync_db.cursor()
+ sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
+ encr.TABLE_NAME,))
+ c.execute(sql, (doc_id, doc_rev))
+ res = c.fetchall()
+ if len(res) != 0:
+ return res[0][0]
+
+ def delete_encrypted_docs_from_db(self, docs_ids):
+ """
+ Delete several encrypted documents from the database of symmetrically
+ encrypted docs to sync.
+
+ :param docs_ids: an iterable with (doc_id, doc_rev) for all documents
+ to be deleted.
+ :type docs_ids: any iterable of tuples of str
+ """
+ if docs_ids:
+ encr = SyncEncrypterPool
+ c = self._sync_db.cursor()
+ for doc_id, doc_rev in docs_ids:
+ sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % (
+ encr.TABLE_NAME,))
+ c.execute(sql, (doc_id, doc_rev))
+ self._sync_db.commit()
+
+ def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
+ """
+ Save a symmetrically encrypted incoming document into the received
+ docs table in the sync db. A decryption task will pick it up
+ from here in turn.
+
+ :param doc: The document to save.
+ :type doc: SoledadDocument
+ :param gen: The generation.
+ :type gen: str
+ :param trans_id: Transacion id.
+ :type gen: str
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ """
+ logger.debug(
+ "Enqueueing doc for decryption: %d/%d."
+ % (idx + 1, total))
+ self._sync_decr_pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id)
+
+ def _save_received_doc(self, doc, gen, trans_id, idx, total):
+ """
+ Save any incoming document into the received docs table in the sync db.
+
+ :param doc: The document to save.
+ :type doc: SoledadDocument
+ :param gen: The generation.
+ :type gen: str
+ :param trans_id: Transacion id.
+ :type gen: str
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ """
+ logger.debug(
+ "Enqueueing doc, no decryption needed: %d/%d."
+ % (idx + 1, total))
+ self._sync_decr_pool.insert_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id)
+
+ #
+ # Symmetric decryption of syncing docs
+ #
+
+ def clear_to_sync(self):
+ """
+ Return True if sync can proceed (ie, the received db table is empty).
+ :rtype: bool
+ """
+ if self._sync_decr_pool is not None:
+ return self._sync_decr_pool.count_received_encrypted_docs() == 0
+ else:
+ return True
+
+ def set_decryption_callback(self, cb):
+ """
+ Set callback to be called when the decryption finishes.
+
+ :param cb: The callback to be set.
+ :type cb: callable
+ """
+ self._decryption_callback = cb
+
+ def has_decryption_callback(self):
+ """
+ Return True if there is a decryption callback set.
+ :rtype: bool
+ """
+ return self._decryption_callback is not None
+
+ def has_syncdb(self):
+ """
+ Return True if we have an initialized syncdb.
+ """
+ return self._sync_db is not None
+
+ def _decrypt_syncing_received_docs(self):
+ """
+ Decrypt the documents received from remote replica and insert them
+ into the local one.
+
+ Called periodically from TimerTask self._sync_watcher.
+ """
+ if sameProxiedObjects(
+ self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ return
+
+ decrypter = self._sync_decr_pool
+ decrypter.decrypt_received_docs()
+ done = decrypter.process_decrypted()
+
+ def _sign_request(self, method, url_query, params):
+ """
+ Return an authorization header to be included in the HTTP request.
+
+ :param method: The HTTP method.
+ :type method: str
+ :param url_query: The URL query string.
+ :type url_query: str
+ :param params: A list with encoded query parameters.
+ :type param: list
+
+ :return: The Authorization header.
+ :rtype: list of tuple
+ """
+ return TokenBasedAuth._sign_request(self, method, url_query, params)
+
+ def set_token_credentials(self, uuid, token):
+ """
+ Store given credentials so we can sign the request later.
+
+ :param uuid: The user's uuid.
+ :type uuid: str
+ :param token: The authentication token.
+ :type token: str
+ """
+ TokenBasedAuth.set_token_credentials(self, uuid, token)