summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2014-07-02 12:21:31 -0300
committerdrebs <drebs@leap.se>2014-07-02 12:40:59 -0300
commitc3870a4315acf30893679e4cd11c990a4338e47b (patch)
tree651d5f0391b88822beddbf793fd0a91c35bff78d
parent2628bbf4c5faff50491cdd227c787ca7f148f368 (diff)
Decouple sync from actual encryption/decryption (#5326).
-rw-r--r--client/pkg/requirements.pip5
-rw-r--r--client/src/leap/soledad/client/__init__.py71
-rw-r--r--client/src/leap/soledad/client/crypto.py1006
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py246
-rw-r--r--client/src/leap/soledad/client/sync.py156
5 files changed, 1312 insertions, 172 deletions
diff --git a/client/pkg/requirements.pip b/client/pkg/requirements.pip
index aefb8653..ae8d2dac 100644
--- a/client/pkg/requirements.pip
+++ b/client/pkg/requirements.pip
@@ -3,6 +3,9 @@ simplejson
u1db
scrypt
pycryptopp
+cchardet
+taskthread
+zope.proxy
#
# leap deps
@@ -20,5 +23,3 @@ oauth
# pysqlite should not be a dep, see #2945
pysqlite
-
-cchardet
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py
index 0d3a21fd..e06335f0 100644
--- a/client/src/leap/soledad/client/__init__.py
+++ b/client/src/leap/soledad/client/__init__.py
@@ -223,33 +223,48 @@ class Soledad(object):
"""
def __init__(self, uuid, passphrase, secrets_path, local_db_path,
- server_url, cert_file, auth_token=None, secret_id=None):
+ server_url, cert_file,
+ auth_token=None, secret_id=None, defer_encryption=False):
"""
Initialize configuration, cryptographic keys and dbs.
:param uuid: User's uuid.
:type uuid: str
+
:param passphrase: The passphrase for locking and unlocking encryption
secrets for local and remote storage.
:type passphrase: unicode
+
:param secrets_path: Path for storing encrypted key used for
symmetric encryption.
:type secrets_path: str
+
:param local_db_path: Path for local encrypted storage db.
:type local_db_path: str
+
:param server_url: URL for Soledad server. This is used either to sync
- with the user's remote db and to interact with the shared recovery
- database.
+ with the user's remote db and to interact with the
+ shared recovery database.
:type server_url: str
+
:param cert_file: Path to the certificate of the ca used
to validate the SSL certificate used by the remote
soledad server.
:type cert_file: str
+
:param auth_token: Authorization token for accessing remote databases.
:type auth_token: str
+ :param secret_id: The id of the storage secret to be used.
+ :type secret_id: str
+
+ :param defer_encryption: Whether to defer encryption/decryption of
+ documents, or do it inline while syncing.
+ :type defer_encryption: bool
+
:raise BootstrapSequenceError: Raised when the secret generation and
- storage on server sequence has failed for some reason.
+ storage on server sequence has failed
+ for some reason.
"""
# get config params
self._uuid = uuid
@@ -258,8 +273,10 @@ class Soledad(object):
# init crypto variables
self._secrets = {}
self._secret_id = secret_id
- # init config (possibly with default values)
+ self._defer_encryption = defer_encryption
+
self._init_config(secrets_path, local_db_path, server_url)
+
self._set_token(auth_token)
self._shared_db_instance = None
# configure SSL certificate
@@ -469,24 +486,19 @@ class Soledad(object):
create=True,
document_factory=SoledadDocument,
crypto=self._crypto,
- raw_key=True)
+ raw_key=True,
+ defer_encryption=self._defer_encryption)
def close(self):
"""
Close underlying U1DB database.
"""
+ logger.debug("Closing soledad")
if hasattr(self, '_db') and isinstance(
self._db,
SQLCipherDatabase):
self._db.close()
- def __del__(self):
- """
- Make sure local database is closed when object is destroyed.
- """
- # Watch out! We have no guarantees that this is properly called.
- self.close()
-
#
# Management of secret for symmetric encryption.
#
@@ -520,6 +532,9 @@ class Soledad(object):
Define the id of the storage secret to be used.
This method will also replace the secret in the crypto object.
+
+ :param secret_id: The id of the storage secret to be used.
+ :type secret_id: str
"""
self._secret_id = secret_id
@@ -881,7 +896,7 @@ class Soledad(object):
:type json: str
:param doc_id: An optional identifier specifying the document id.
:type doc_id:
- :return: The new cocument
+ :return: The new document
:rtype: SoledadDocument
"""
return self._db.create_doc_from_json(json, doc_id=doc_id)
@@ -1041,7 +1056,7 @@ class Soledad(object):
if self._db:
return self._db.resolve_doc(doc, conflicted_doc_revs)
- def sync(self):
+ def sync(self, defer_decryption=True):
"""
Synchronize the local encrypted replica with a remote replica.
@@ -1051,16 +1066,25 @@ class Soledad(object):
:param url: the url of the target replica to sync with
:type url: str
- :return: the local generation before the synchronisation was
- performed.
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+
+ :return: The local generation before the synchronisation was
+ performed.
:rtype: str
"""
if self._db:
+ try:
local_gen = self._db.sync(
urlparse.urljoin(self.server_url, 'user-%s' % self._uuid),
- creds=self._creds, autocreate=False)
+ creds=self._creds, autocreate=False,
+ defer_decryption=defer_decryption)
signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)
return local_gen
+ except Exception as e:
+ logger.error("Soledad exception when syncing: %s" % str(e))
def stop_sync(self):
"""
@@ -1079,7 +1103,9 @@ class Soledad(object):
:return: Whether remote replica and local replica differ.
:rtype: bool
"""
- target = SoledadSyncTarget(url, creds=self._creds, crypto=self._crypto)
+ target = SoledadSyncTarget(
+ url, self._db._get_replica_uid(), creds=self._creds,
+ crypto=self._crypto)
info = target.get_sync_info(self._db._get_replica_uid())
# compare source generation with target's last known source generation
if self._db._get_generation() != info[4]:
@@ -1087,6 +1113,13 @@ class Soledad(object):
return True
return False
+ @property
+ def syncing(self):
+ """
+ Property, True if the syncer is syncing.
+ """
+ return self._db.syncing
+
def _set_token(self, token):
"""
Set the authentication token for remote database access.
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index a6372107..7133f804 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# crypto.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,27 +14,45 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
Cryptographic utilities for Soledad.
"""
-
-
import os
import binascii
import hmac
import hashlib
-
+import json
+import logging
+import multiprocessing
+import threading
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
+from zope.proxy import sameProxiedObjects
+
+from leap.soledad.common import soledad_assert
+from leap.soledad.common import soledad_assert_type
+from leap.soledad.common.document import SoledadDocument
+
+
+from leap.soledad.common.crypto import (
+ EncryptionSchemes,
+ UnknownEncryptionScheme,
+ MacMethods,
+ UnknownMacMethod,
+ WrongMac,
+ ENC_JSON_KEY,
+ ENC_SCHEME_KEY,
+ ENC_METHOD_KEY,
+ ENC_IV_KEY,
+ MAC_KEY,
+ MAC_METHOD_KEY,
+)
+logger = logging.getLogger(__name__)
-from leap.soledad.common import (
- soledad_assert,
- soledad_assert_type,
-)
+
+MAC_KEY_LENGTH = 64
class EncryptionMethods(object):
@@ -45,6 +63,17 @@ class EncryptionMethods(object):
AES_256_CTR = 'aes-256-ctr'
XSALSA20 = 'xsalsa20'
+#
+# Exceptions
+#
+
+
+class DocumentNotEncrypted(Exception):
+ """
+ Raised for failures in document encryption.
+ """
+ pass
+
class UnknownEncryptionMethod(Exception):
"""
@@ -59,13 +88,116 @@ class NoSymmetricSecret(Exception):
"""
-class SoledadCrypto(object):
+def encrypt_sym(data, key, method):
"""
- General cryptographic functionality.
+ Encrypt C{data} using a {password}.
+
+ Currently, the only encryption methods supported are AES-256 in CTR
+ mode and XSalsa20.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param key: The key used to encrypt C{data} (must be 256 bits long).
+ :type key: str
+ :param method: The encryption method to use.
+ :type method: str
+
+ :return: A tuple with the initial value and the encrypted data.
+ :rtype: (long, str)
+ """
+ soledad_assert_type(key, str)
+
+ soledad_assert(
+ len(key) == 32, # 32 x 8 = 256 bits.
+ 'Wrong key size: %s bits (must be 256 bits long).' %
+ (len(key) * 8))
+ iv = None
+ # AES-256 in CTR mode
+ if method == EncryptionMethods.AES_256_CTR:
+ iv = os.urandom(16)
+ ciphertext = AES(key=key, iv=iv).process(data)
+ # XSalsa20
+ elif method == EncryptionMethods.XSALSA20:
+ iv = os.urandom(24)
+ ciphertext = XSalsa20(key=key, iv=iv).process(data)
+ else:
+ # raise if method is unknown
+ raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
+ return binascii.b2a_base64(iv), ciphertext
+
+
+def decrypt_sym(data, key, method, **kwargs):
"""
+ Decrypt data using symmetric secret.
+
+ Currently, the only encryption method supported is AES-256 CTR mode.
- MAC_KEY_LENGTH = 64
+ :param data: The data to be decrypted.
+ :type data: str
+ :param key: The key used to decrypt C{data} (must be 256 bits long).
+ :type key: str
+ :param method: The encryption method to use.
+ :type method: str
+ :param kwargs: Other parameters specific to each encryption method.
+ :type kwargs: dict
+
+ :return: The decrypted data.
+ :rtype: str
+ """
+ soledad_assert_type(key, str)
+ # assert params
+ soledad_assert(
+ len(key) == 32, # 32 x 8 = 256 bits.
+ 'Wrong key size: %s (must be 256 bits long).' % len(key))
+ soledad_assert(
+ 'iv' in kwargs,
+ '%s needs an initial value.' % method)
+ # AES-256 in CTR mode
+ if method == EncryptionMethods.AES_256_CTR:
+ return AES(
+ key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
+ elif method == EncryptionMethods.XSALSA20:
+ return XSalsa20(
+ key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
+
+ # raise if method is unknown
+ raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
+
+
+def doc_mac_key(doc_id, secret):
+ """
+ Generate a key for calculating a MAC for a document whose id is
+ C{doc_id}.
+ The key is derived using HMAC having sha256 as underlying hash
+ function. The key used for HMAC is the first MAC_KEY_LENGTH characters
+ of Soledad's storage secret. The HMAC message is C{doc_id}.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+
+ :param secret: soledad secret storage
+ :type secret: Soledad.storage_secret
+
+ :return: The key.
+ :rtype: str
+
+ :raise NoSymmetricSecret: if no symmetric secret was supplied.
+ """
+ if secret is None:
+ raise NoSymmetricSecret()
+
+ return hmac.new(
+ secret[:MAC_KEY_LENGTH],
+ doc_id,
+ hashlib.sha256).digest()
+
+
+class SoledadCrypto(object):
+ """
+ General cryptographic functionality encapsulated in a
+ object that can be passed along.
+ """
def __init__(self, soledad):
"""
Initialize the crypto object.
@@ -77,78 +209,14 @@ class SoledadCrypto(object):
def encrypt_sym(self, data, key,
method=EncryptionMethods.AES_256_CTR):
- """
- Encrypt C{data} using a {password}.
-
- Currently, the only encryption method supported is AES-256 CTR mode.
-
- :param data: The data to be encrypted.
- :type data: str
- :param key: The key used to encrypt C{data} (must be 256 bits long).
- :type key: str
- :param method: The encryption method to use.
- :type method: str
-
- :return: A tuple with the initial value and the encrypted data.
- :rtype: (long, str)
- """
- soledad_assert_type(key, str)
-
- soledad_assert(
- len(key) == 32, # 32 x 8 = 256 bits.
- 'Wrong key size: %s bits (must be 256 bits long).' %
- (len(key) * 8))
- iv = None
- # AES-256 in CTR mode
- if method == EncryptionMethods.AES_256_CTR:
- iv = os.urandom(16)
- ciphertext = AES(key=key, iv=iv).process(data)
- # XSalsa20
- elif method == EncryptionMethods.XSALSA20:
- iv = os.urandom(24)
- ciphertext = XSalsa20(key=key, iv=iv).process(data)
- else:
- # raise if method is unknown
- raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
- return binascii.b2a_base64(iv), ciphertext
+ return encrypt_sym(data, key, method)
def decrypt_sym(self, data, key,
method=EncryptionMethods.AES_256_CTR, **kwargs):
- """
- Decrypt data using symmetric secret.
+ return decrypt_sym(data, key, method, **kwargs)
- Currently, the only encryption method supported is AES-256 CTR mode.
-
- :param data: The data to be decrypted.
- :type data: str
- :param key: The key used to decrypt C{data} (must be 256 bits long).
- :type key: str
- :param method: The encryption method to use.
- :type method: str
- :param kwargs: Other parameters specific to each encryption method.
- :type kwargs: dict
-
- :return: The decrypted data.
- :rtype: str
- """
- soledad_assert_type(key, str)
- # assert params
- soledad_assert(
- len(key) == 32, # 32 x 8 = 256 bits.
- 'Wrong key size: %s (must be 256 bits long).' % len(key))
- soledad_assert(
- 'iv' in kwargs,
- '%s needs an initial value.' % method)
- # AES-256 in CTR mode
- if method == EncryptionMethods.AES_256_CTR:
- return AES(
- key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
- elif method == EncryptionMethods.XSALSA20:
- return XSalsa20(
- key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
-
- # raise if method is unknown
- raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
+ def doc_mac_key(self, doc_id, secret):
+ return doc_mac_key(doc_id, self.secret)
def doc_passphrase(self, doc_id):
"""
@@ -173,41 +241,769 @@ class SoledadCrypto(object):
raise NoSymmetricSecret()
return hmac.new(
self.secret[
- self.MAC_KEY_LENGTH:
+ MAC_KEY_LENGTH:
self._soledad.REMOTE_STORAGE_SECRET_LENGTH],
doc_id,
hashlib.sha256).digest()
- def doc_mac_key(self, doc_id):
+ #
+ # secret setters/getters
+ #
+
+ def _get_secret(self):
+ return self._soledad.storage_secret
+
+ secret = property(
+ _get_secret, doc='The secret used for symmetric encryption')
+
+#
+# Crypto utilities for a SoledadDocument.
+#
+
+
+def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret):
+ """
+ Calculate a MAC for C{doc} using C{ciphertext}.
+
+ Current MAC method used is HMAC, with the following parameters:
+
+ * key: sha256(storage_secret, doc_id)
+ * msg: doc_id + doc_rev + ciphertext
+ * digestmod: sha256
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+ :param ciphertext: The content of the document.
+ :type ciphertext: str
+ :param mac_method: The MAC method to use.
+ :type mac_method: str
+ :param secret: soledad secret
+ :type secret: Soledad.secret_storage
+
+ :return: The calculated MAC.
+ :rtype: str
+ """
+ if mac_method == MacMethods.HMAC:
+ return hmac.new(
+ doc_mac_key(doc_id, secret),
+ str(doc_id) + str(doc_rev) + ciphertext,
+ hashlib.sha256).digest()
+ # raise if we do not know how to handle this MAC method
+ raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method)
+
+
+def encrypt_doc(crypto, doc):
+ """
+ Wrapper around encrypt_docstr that accepts a crypto object and the document
+ as arguments.
+
+ :param crypto: a soledad crypto object.
+ :type crypto: SoledadCrypto
+ :param doc: the document.
+ :type doc: SoledadDocument
+ """
+ key = crypto.doc_passphrase(doc.doc_id)
+ secret = crypto.secret
+
+ return encrypt_docstr(
+ doc.get_json(), doc.doc_id, doc.rev, key, secret)
+
+
+def encrypt_docstr(docstr, doc_id, doc_rev, key, secret):
+ """
+ Encrypt C{doc}'s content.
+
+ Encrypt doc's contents using AES-256 CTR mode and return a valid JSON
+ string representing the following:
+
+ {
+ ENC_JSON_KEY: '<encrypted doc JSON string>',
+ ENC_SCHEME_KEY: 'symkey',
+ ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
+ ENC_IV_KEY: '<the initial value used to encrypt>',
+ MAC_KEY: '<mac>'
+ MAC_METHOD_KEY: 'hmac'
+ }
+
+ :param docstr: A representation of the document to be encrypted.
+ :type docstr: str or unicode.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+
+ :param key: The key used to encrypt ``data`` (must be 256 bits long).
+ :type key: str
+
+ :param secret: The Soledad secret (used for MAC auth).
+ :type secret: str
+
+ :return: The JSON serialization of the dict representing the encrypted
+ content.
+ :rtype: str
+ """
+ # encrypt content using AES-256 CTR mode
+ iv, ciphertext = encrypt_sym(
+ str(docstr), # encryption/decryption routines expect str
+ key, method=EncryptionMethods.AES_256_CTR)
+ # Return a representation for the encrypted content. In the following, we
+ # convert binary data to hexadecimal representation so the JSON
+ # serialization does not complain about what it tries to serialize.
+ hex_ciphertext = binascii.b2a_hex(ciphertext)
+ return json.dumps({
+ ENC_JSON_KEY: hex_ciphertext,
+ ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY,
+ ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
+ ENC_IV_KEY: iv,
+ MAC_KEY: binascii.b2a_hex(mac_doc( # store the mac as hex.
+ doc_id, doc_rev, ciphertext,
+ MacMethods.HMAC, secret)),
+ MAC_METHOD_KEY: MacMethods.HMAC,
+ })
+
+
+def decrypt_doc(crypto, doc):
+ """
+ Wrapper around decrypt_doc_dict that accepts a crypto object and the
+ document as arguments.
+
+ :param crypto: a soledad crypto object.
+ :type crypto: SoledadCrypto
+ :param doc: the document.
+ :type doc: SoledadDocument
+
+ :return: json string with the decrypted document
+ :rtype: str
+ """
+ key = crypto.doc_passphrase(doc.doc_id)
+ secret = crypto.secret
+ return decrypt_doc_dict(doc.content, doc.doc_id, doc.rev, key, secret)
+
+
+def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret):
+ """
+ Decrypt C{doc}'s content.
+
+ Return the JSON string representation of the document's decrypted content.
+
+ The passed doc_dict argument should have the following structure:
+
+ {
+ ENC_JSON_KEY: '<enc_blob>',
+ ENC_SCHEME_KEY: '<enc_scheme>',
+ ENC_METHOD_KEY: '<enc_method>',
+ ENC_IV_KEY: '<initial value used to encrypt>', # (optional)
+ MAC_KEY: '<mac>'
+ MAC_METHOD_KEY: 'hmac'
+ }
+
+ C{enc_blob} is the encryption of the JSON serialization of the document's
+ content. For now Soledad just deals with documents whose C{enc_scheme} is
+ EncryptionSchemes.SYMKEY and C{enc_method} is
+ EncryptionMethods.AES_256_CTR.
+
+ :param doc_dict: The content of the document to be decrypted.
+ :type doc_dict: dict
+
+ :param doc_id: The document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+
+ :param key: The key used to encrypt ``data`` (must be 256 bits long).
+ :type key: str
+
+ :param secret:
+ :type secret:
+
+ :return: The JSON serialization of the decrypted content.
+ :rtype: str
+ """
+ soledad_assert(ENC_JSON_KEY in doc_dict)
+ soledad_assert(ENC_SCHEME_KEY in doc_dict)
+ soledad_assert(ENC_METHOD_KEY in doc_dict)
+ soledad_assert(MAC_KEY in doc_dict)
+ soledad_assert(MAC_METHOD_KEY in doc_dict)
+
+ # verify MAC
+ ciphertext = binascii.a2b_hex( # content is stored as hex.
+ doc_dict[ENC_JSON_KEY])
+ mac = mac_doc(
+ doc_id, doc_rev,
+ ciphertext,
+ doc_dict[MAC_METHOD_KEY], secret)
+ # we compare mac's hashes to avoid possible timing attacks that might
+ # exploit python's builtin comparison operator behaviour, which fails
+ # immediatelly when non-matching bytes are found.
+ doc_mac_hash = hashlib.sha256(
+ binascii.a2b_hex( # the mac is stored as hex
+ doc_dict[MAC_KEY])).digest()
+ calculated_mac_hash = hashlib.sha256(mac).digest()
+
+ if doc_mac_hash != calculated_mac_hash:
+ logger.warning("Wrong MAC while decrypting doc...")
+ raise WrongMac('Could not authenticate document\'s contents.')
+ # decrypt doc's content
+ enc_scheme = doc_dict[ENC_SCHEME_KEY]
+ plainjson = None
+ if enc_scheme == EncryptionSchemes.SYMKEY:
+ enc_method = doc_dict[ENC_METHOD_KEY]
+ if enc_method == EncryptionMethods.AES_256_CTR:
+ soledad_assert(ENC_IV_KEY in doc_dict)
+ plainjson = decrypt_sym(
+ ciphertext, key,
+ method=enc_method,
+ iv=doc_dict[ENC_IV_KEY])
+ else:
+ raise UnknownEncryptionMethod(enc_method)
+ else:
+ raise UnknownEncryptionScheme(enc_scheme)
+
+ return plainjson
+
+
+def is_symmetrically_encrypted(doc):
+ """
+ Return True if the document was symmetrically encrypted.
+
+ :param doc: The document to check.
+ :type doc: SoledadDocument
+
+ :rtype: bool
+ """
+ if doc.content and ENC_SCHEME_KEY in doc.content:
+ if doc.content[ENC_SCHEME_KEY] == EncryptionSchemes.SYMKEY:
+ return True
+ return False
+
+
+#
+# Encrypt/decrypt pools of workers
+#
+
+class SyncEncryptDecryptPool(object):
+ """
+ Base class for encrypter/decrypter pools.
+ """
+ WORKERS = 5
+
+ def __init__(self, crypto, sync_db, write_lock):
+ """
+ Initialize the pool of encryption-workers.
+
+ :param crypto: A SoledadCryto instance to perform the encryption.
+ :type crypto: leap.soledad.crypto.SoledadCrypto
+
+ :param sync_db: a database connection handle
+ :type sync_db: handle
+
+ :param write_lock: a write lock for controlling concurrent access
+ to the sync_db
+ :type write_lock: threading.Lock
+ """
+ self._pool = multiprocessing.Pool(self.WORKERS)
+ self._crypto = crypto
+ self._sync_db = sync_db
+ self._sync_db_write_lock = write_lock
+
+ def close(self):
+ """
+ Cleanly close the pool of workers.
+ """
+ logger.debug("Closing %s" % (self.__class__.__name__,))
+ self._pool.close()
+ try:
+ self._pool.join()
+ except Exception:
+ pass
+
+ def terminate(self):
+ """
+ Terminate the pool of workers.
+ """
+ logger.debug("Terminating %s" % (self.__class__.__name__,))
+ self._pool.terminate()
+
+
+def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
+ """
+ Encrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad secret (used for MAC auth).
+ :type secret: str
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ encrypted_content = encrypt_docstr(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, encrypted_content
+
+
+class SyncEncrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric encryption
+ of documents to be synced.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ WORKERS = 5
+ TABLE_NAME = "docs_tosync"
+ FIELD_NAMES = "doc_id, rev, content"
+
+ def encrypt_doc(self, doc, workers=True):
+ """
+ Symmetrically encrypt a document.
+
+ :param doc: The document with contents to be encrypted.
+ :type doc: SoledadDocument
+
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+ """
+ soledad_assert(self._crypto is not None, "need a crypto object")
+ docstr = doc.get_json()
+ key = self._crypto.doc_passphrase(doc.doc_id)
+ secret = self._crypto.secret
+ args = doc.doc_id, doc.rev, docstr, key, secret
+
+ try:
+ if workers:
+ res = self._pool.apply_async(
+ encrypt_doc_task, args,
+ callback=self.encrypt_doc_cb)
+ else:
+ # encrypt inline
+ res = encrypt_doc_task(*args)
+ self.encrypt_doc_cb(res)
+
+ except Exception as exc:
+ logger.exception(exc)
+
+ def encrypt_doc_cb(self, result):
"""
- Generate a key for calculating a MAC for a document whose id is
- C{doc_id}.
+ Insert results of encryption routine into the local sync database.
- The key is derived using HMAC having sha256 as underlying hash
- function. The key used for HMAC is the first MAC_KEY_LENGTH characters
- of Soledad's storage secret. The HMAC message is C{doc_id}.
+ :param result: A tuple containing the doc id, revision and encrypted
+ content.
+ :type result: tuple(str, str, str)
+ """
+ doc_id, doc_rev, content = result
+ self.insert_encrypted_local_doc(doc_id, doc_rev, content)
- :param doc_id: The id of the document.
+ def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
+ """
+ Insert the contents of the encrypted doc into the local sync
+ database.
+
+ :param doc_id: The document id.
:type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param content: The encrypted document.
+ :type content: str
+ """
+ sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,)
- :return: The key.
- :rtype: str
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_del, (doc_id, ))
+ con.execute(sql_ins, (doc_id, doc_rev, content))
- :raise NoSymmetricSecret: if no symmetric secret was supplied.
+
+def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
+ """
+ Decrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The encrypted content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification of
+ that document.
+ :type trans_id: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad secret (used for MAC auth).
+ :type secret: str
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ decrypted_content = decrypt_doc_dict(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, decrypted_content, gen, trans_id
+
+
+def get_insertable_docs_by_gen(expected, got):
+ """
+ Return a list of documents ready to be inserted. This list is computed
+ by aligning the expected list with the already gotten docs, and returning
+ the maximum number of docs that can be processed in the expected order
+ before finding a gap.
+
+ :param expected: A list of generations to be inserted.
+ :type expected: list
+
+ :param got: A dictionary whose values are the docs to be inserted.
+ :type got: dict
+ """
+ ordered = [got.get(i) for i in expected]
+ if None in ordered:
+ return ordered[:ordered.index(None)]
+ else:
+ return ordered
+
+
+class SyncDecrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric decryption
+ of documents that were received.
+
+ The decryption of the received documents is done in two steps:
+
+ 1. All the encrypted docs are collected, together with their generation
+ and transaction-id
+ 2. The docs are enqueued for decryption. When completed, they are
+ inserted following the generation order.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ TABLE_NAME = "docs_received"
+ FIELD_NAMES = "doc_id, rev, content, gen, trans_id"
+
+ write_encrypted_lock = threading.Lock()
+
+ def __init__(self, *args, **kwargs):
"""
- if self.secret is None:
- raise NoSymmetricSecret()
- return hmac.new(
- self.secret[:self.MAC_KEY_LENGTH],
- doc_id,
- hashlib.sha256).digest()
+ Initialize the decrypter pool, and setup a dict for putting the
+ results of the decrypted docs until they are picked by the insert
+ routine that gets them in order.
+ """
+ self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+ self.decrypted_docs = {}
+ self.source_replica_uid = None
- #
- # secret setters/getters
- #
+ def set_source_replica_uid(self, source_replica_uid):
+ """
+ Set the source replica uid for this decrypter pool instance.
- def _get_secret(self):
- return self._soledad.storage_secret
+ :param source_replica_uid: The uid of the source replica.
+ :type source_replica_uid: str
+ """
+ self.source_replica_uid = source_replica_uid
- secret = property(
- _get_secret, doc='The secret used for symmetric encryption')
+ def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
+ gen, trans_id):
+ """
+ Insert a received message with encrypted content, to be decrypted later
+ on.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ """
+ docstr = json.dumps(content)
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
+ self.TABLE_NAME,)
+
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id))
+
+ def insert_marker_for_received_doc(self, doc_id, doc_rev, gen):
+ """
+ Insert a marker with the document id, revision and generation on the
+ sync db. This document does not have an encrypted payload, so the
+ content has already been inserted into the decrypted_docs dictionary
+ from where it can be picked following generation order.
+ We need to leave here the marker to be able to calculate the expected
+ insertion order for a synchronization batch.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param gen: the Document Generation
+ :type gen: int
+ """
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
+ self.TABLE_NAME,)
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_ins, (doc_id, doc_rev, '', gen, ''))
+
+ def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):
+ """
+ Insert a document that is not symmetrically encrypted.
+ We store it in the staging area (the decrypted_docs dictionary) to be
+ picked up in order as the preceding documents are decrypted.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ """
+ # XXX this need a deeper review / testing.
+ # I believe that what I'm doing here is prone to problems
+ # if the sync is interrupted (ie, client crash) in the worst possible
+ # moment. We would need a recover strategy in that case
+ # (or, insert the document in the table all the same, but with a flag
+ # saying if the document is sym-encrypted or not),
+ content = json.dumps(content)
+ result = doc_id, doc_rev, content, gen, trans_id
+ self.decrypted_docs[gen] = result
+ self.insert_marker_for_received_doc(doc_id, doc_rev, gen)
+
+ def delete_encrypted_received_doc(self, doc_id, doc_rev):
+ """
+ Delete a encrypted received doc after it was inserted into the local
+ db.
+
+ :param doc_id: Document ID.
+ :type doc_id: str
+ :param doc_rev: Document revision.
+ :type doc_rev: str
+ """
+ sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
+ self.TABLE_NAME,)
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_del, (doc_id, doc_rev))
+
+ def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True):
+ """
+ Symmetrically decrypt a document.
+
+ :param doc_id: The ID for the document with contents to be encrypted.
+ :type doc: str
+ :param rev: The revision of the document.
+ :type rev: str
+ :param source_replica_uid:
+ :type source_replica_uid: str
+
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+ """
+ self.source_replica_uid = source_replica_uid
+
+ # insert_doc_cb is a proxy object that gets updated with the right
+ # insert function only when the sync_target invokes the sync_exchange
+ # method. so, if we don't still have a non-empty callback, we refuse
+ # to proceed.
+ if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid),
+ None):
+ logger.debug("Sync decrypter pool: no insert_doc_cb() yet.")
+ return
+
+ # XXX move to get_doc function...
+ c = self._sync_db.cursor()
+ sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % (
+ self.TABLE_NAME,)
+ try:
+ c.execute(sql, (doc_id, rev))
+ res = c.fetchone()
+ except Exception as exc:
+ logger.warning("Error getting docs from syncdb: %r" % (exc,))
+ return
+ if res is None:
+ logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev))
+ return
+
+ soledad_assert(self._crypto is not None, "need a crypto object")
+ try:
+ doc_id, rev, docstr, gen, trans_id = res
+ except ValueError:
+ logger.warning("Wrong entry in sync db")
+ return
+
+ if len(docstr) == 0:
+ # not encrypted payload
+ return
+
+ try:
+ content = json.loads(docstr)
+ except TypeError:
+ logger.warning("Wrong type while decoding json: %s" % repr(docstr))
+ return
+
+ key = self._crypto.doc_passphrase(doc_id)
+ secret = self._crypto.secret
+ args = doc_id, rev, content, gen, trans_id, key, secret
+
+ try:
+ if workers:
+ # Ouch. This is sent to the workers asynchronously, so
+ # we have no way of logging errors. We'd have to inspect
+ # lingering results by querying successful / get() over them...
+ # Or move the heck out of it to twisted.
+ res = self._pool.apply_async(
+ decrypt_doc_task, args,
+ callback=self.decrypt_doc_cb)
+ else:
+ # decrypt inline
+ res = decrypt_doc_task(*args)
+ self.decrypt_doc_cb(res)
+
+ except Exception as exc:
+ logger.exception(exc)
+
+ def decrypt_doc_cb(self, result):
+ """
+ Temporarily store the decryption result in a dictionary where it will
+ be picked by process_decrypted.
+
+ :param result: A tuple containing the doc id, revision and encrypted
+ content.
+ :type result: tuple(str, str, str)
+ """
+ doc_id, rev, content, gen, trans_id = result
+ logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen))
+ self.decrypted_docs[gen] = result
+
+ def get_docs_by_generation(self):
+ """
+ Get all documents in the received table from the sync db,
+ ordered by generation.
+
+ :return: list of doc_id, rev, generation
+ """
+ c = self._sync_db.cursor()
+ sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % (
+ self.TABLE_NAME,)
+ c.execute(sql)
+ return c.fetchall()
+
+ def count_received_encrypted_docs(self):
+ """
+ Count how many documents we have in the table for received and
+ encrypted docs.
+
+ :return: The count of documents.
+ :rtype: int
+ """
+ if self._sync_db is None:
+ logger.warning("cannot return count with null sync_db")
+ return
+ c = self._sync_db.cursor()
+ sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
+ c.execute(sql)
+ res = c.fetchone()
+ if res is not None:
+ return res[0]
+ else:
+ return 0
+
+ def decrypt_received_docs(self):
+ """
+ Get all the encrypted documents from the sync database and dispatch a
+ decrypt worker to decrypt each one of them.
+ """
+ docs_by_generation = self.get_docs_by_generation()
+ logger.debug("Sync decrypter pool: There are %d documents to " \
+ "decrypt." % len(docs_by_generation))
+ for doc_id, rev, gen in filter(None, docs_by_generation):
+ self.decrypt_doc(doc_id, rev, self.source_replica_uid)
+
+ def process_decrypted(self):
+ """
+ Process the already decrypted documents, and insert as many documents
+ as can be taken from the expected order without finding a gap.
+
+ :return: Whether we have processed all the pending docs.
+ :rtype: bool
+ """
+ # Acquire the lock to avoid processing while we're still
+ # getting data from the syncing stream, to avoid InvalidGeneration
+ # problems.
+ with self.write_encrypted_lock:
+ already_decrypted = self.decrypted_docs
+ docs = self.get_docs_by_generation()
+ docs = filter(lambda entry: len(entry) > 0, docs)
+ expected = [gen for doc_id, rev, gen in docs]
+ docs_to_insert = get_insertable_docs_by_gen(
+ expected, already_decrypted)
+ for doc_fields in docs_to_insert:
+ self.insert_decrypted_local_doc(*doc_fields)
+ remaining = self.count_received_encrypted_docs()
+ return remaining == 0
+
+ def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
+ gen, trans_id):
+ """
+ Insert the decrypted document into the local sqlcipher database.
+ Makes use of the passed callback `return_doc_cb` passed to the caller
+ by u1db sync.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
+ """
+ # could pass source_replica in params for callback chain
+ insert_fun = self._insert_doc_cb[self.source_replica_uid]
+ logger.debug("Sync decrypter pool: inserting doc in local db: " \
+ "%s:%s %s" % (doc_id, doc_rev, gen))
+ try:
+ # convert deleted documents to avoid error on document creation
+ if content == 'null':
+ content = None
+ doc = SoledadDocument(doc_id, doc_rev, content)
+ insert_fun(doc, int(gen), trans_id)
+ except Exception as exc:
+ logger.error("Sync decrypter pool: error while inserting "
+ "decrypted doc into local db.")
+ logger.exception(exc)
+
+ else:
+ # If no errors found, remove it from the local temporary dict
+ # and from the received database.
+ self.decrypted_docs.pop(gen)
+ self.delete_encrypted_received_doc(doc_id, doc_rev)
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 5ffa9c7e..46ceca42 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# sqlcipher.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,8 +14,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
A U1DB backend that uses SQLCipher as its persistence layer.
@@ -40,11 +38,13 @@ cipher_default_use_hmac can be used to globally alter the default use of HMAC
when opening a database.
So, as the statements above were introduced for backwards compatibility with
-SLCipher 1.1 databases, we do not implement them as all SQLCipher databases
+SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases
handled by Soledad should be created by SQLCipher >= 2.0.
"""
import logging
+import multiprocessing
import os
+import sqlite3
import string
import threading
import time
@@ -57,9 +57,12 @@ from collections import defaultdict
from pysqlcipher import dbapi2
from u1db.backends import sqlite_backend
from u1db import errors as u1db_errors
+from taskthread import TimerTask
-from leap.soledad.client.sync import Synchronizer
+from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
from leap.soledad.client.target import SoledadSyncTarget
+from leap.soledad.client.target import PendingReceivedDocsSyncError
+from leap.soledad.client.sync import SoledadSynchronizer
from leap.soledad.common.document import SoledadDocument
@@ -88,7 +91,7 @@ SQLITE_ISOLATION_LEVEL = None
def open(path, password, create=True, document_factory=None, crypto=None,
raw_key=False, cipher='aes-256-cbc', kdf_iter=4000,
- cipher_page_size=1024):
+ cipher_page_size=1024, defer_encryption=False):
"""Open a database at the given location.
Will raise u1db.errors.DatabaseDoesNotExist if create=False and the
@@ -114,6 +117,9 @@ def open(path, password, create=True, document_factory=None, crypto=None,
:type kdf_iter: int
:param cipher_page_size: The page size.
:type cipher_page_size: int
+ :param defer_encryption: Whether to defer encryption/decryption of
+ documents, or do it inline while syncing.
+ :type defer_encryption: bool
:return: An instance of Database.
:rtype SQLCipherDatabase
@@ -121,7 +127,7 @@ def open(path, password, create=True, document_factory=None, crypto=None,
return SQLCipherDatabase.open_database(
path, password, create=create, document_factory=document_factory,
crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter,
- cipher_page_size=cipher_page_size)
+ cipher_page_size=cipher_page_size, defer_encryption=defer_encryption)
#
@@ -147,19 +153,40 @@ class NotAnHexString(Exception):
#
class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
- """A U1DB implementation that uses SQLCipher as its persistence layer."""
+ """
+ A U1DB implementation that uses SQLCipher as its persistence layer.
+ """
+ defer_encryption = False
_index_storage_value = 'expand referenced encrypted'
k_lock = threading.Lock()
create_doc_lock = threading.Lock()
update_indexes_lock = threading.Lock()
+ _sync_watcher = None
+ _sync_enc_pool = None
+
+ """
+ The name of the local symmetrically encrypted documents to
+ sync database file.
+ """
+ LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db'
- syncing_lock = defaultdict(threading.Lock)
"""
A dictionary that hold locks which avoid multiple sync attempts from the
same database replica.
"""
+ encrypting_lock = threading.Lock()
+
+ """
+ Period or recurrence of the periodic encrypting task, in seconds.
+ """
+ ENCRYPT_TASK_PERIOD = 1
+ syncing_lock = defaultdict(threading.Lock)
+ """
+ A dictionary that hold locks which avoid multiple sync attempts from the
+ same database replica.
+ """
def __init__(self, sqlcipher_file, password, document_factory=None,
crypto=None, raw_key=False, cipher='aes-256-cbc',
@@ -194,7 +221,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
self.assert_db_is_encrypted(
sqlcipher_file, password, raw_key, cipher, kdf_iter,
cipher_page_size)
- # connect to the database
+
+ # connect to the sqlcipher database
with self.k_lock:
self._db_handle = dbapi2.connect(
sqlcipher_file,
@@ -215,6 +243,26 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
self._ensure_schema()
self._crypto = crypto
+ self._sync_db = None
+ self._sync_db_write_lock = None
+ self._sync_enc_pool = None
+
+ if self.defer_encryption:
+ if sqlcipher_file != ":memory:":
+ self._sync_db_path = "%s-sync" % sqlcipher_file
+ else:
+ self._sync_db_path = ":memory:"
+
+ # initialize sync db
+ self._init_sync_db()
+
+ # initialize syncing queue encryption pool
+ self._sync_enc_pool = SyncEncrypterPool(
+ self._crypto, self._sync_db, self._sync_db_write_lock)
+ self._sync_watcher = TimerTask(self._encrypt_syncing_docs,
+ self.ENCRYPT_TASK_PERIOD)
+ self._sync_watcher.start()
+
def factory(doc_id=None, rev=None, json='{}', has_conflicts=False,
syncable=True):
return SoledadDocument(doc_id=doc_id, rev=rev, json=json,
@@ -226,7 +274,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
@classmethod
def _open_database(cls, sqlcipher_file, password, document_factory=None,
crypto=None, raw_key=False, cipher='aes-256-cbc',
- kdf_iter=4000, cipher_page_size=1024):
+ kdf_iter=4000, cipher_page_size=1024,
+ defer_encryption=False):
"""
Open a SQLCipher database.
@@ -249,10 +298,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:type kdf_iter: int
:param cipher_page_size: The page size.
:type cipher_page_size: int
+ :param defer_encryption: Whether to defer encryption/decryption of
+ documents, or do it inline while syncing.
+ :type defer_encryption: bool
:return: The database object.
:rtype: SQLCipherDatabase
"""
+ cls.defer_encryption = defer_encryption
if not os.path.isfile(sqlcipher_file):
raise u1db_errors.DatabaseDoesNotExist()
@@ -298,43 +351,59 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
def open_database(cls, sqlcipher_file, password, create, backend_cls=None,
document_factory=None, crypto=None, raw_key=False,
cipher='aes-256-cbc', kdf_iter=4000,
- cipher_page_size=1024):
+ cipher_page_size=1024, defer_encryption=False):
"""
Open a SQLCipher database.
:param sqlcipher_file: The path for the SQLCipher file.
:type sqlcipher_file: str
+
:param password: The password that protects the SQLCipher db.
:type password: str
+
:param create: Should the datbase be created if it does not already
- exist?
- :type: bool
+ exist?
+ :type create: bool
+
:param backend_cls: A class to use as backend.
:type backend_cls: type
+
:param document_factory: A function that will be called with the same
- parameters as Document.__init__.
+ parameters as Document.__init__.
:type document_factory: callable
+
:param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
+ document contents when syncing.
:type crypto: soledad.crypto.SoledadCrypto
+
:param raw_key: Whether C{password} is a raw 64-char hex string or a
- passphrase that should be hashed to obtain the encyrption key.
+ passphrase that should be hashed to obtain the
+ encyrption key.
:type raw_key: bool
+
:param cipher: The cipher and mode to use.
:type cipher: str
+
:param kdf_iter: The number of iterations to use.
:type kdf_iter: int
+
:param cipher_page_size: The page size.
:type cipher_page_size: int
+ :param defer_encryption: Whether to defer encryption/decryption of
+ documents, or do it inline while syncing.
+ :type defer_encryption: bool
+
:return: The database object.
:rtype: SQLCipherDatabase
"""
+ cls.defer_encryption = defer_encryption
try:
return cls._open_database(
sqlcipher_file, password, document_factory=document_factory,
crypto=crypto, raw_key=raw_key, cipher=cipher,
- kdf_iter=kdf_iter, cipher_page_size=cipher_page_size)
+ kdf_iter=kdf_iter, cipher_page_size=cipher_page_size,
+ defer_encryption=defer_encryption)
except u1db_errors.DatabaseDoesNotExist:
if not create:
raise
@@ -347,7 +416,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
crypto=crypto, raw_key=raw_key, cipher=cipher,
kdf_iter=kdf_iter, cipher_page_size=cipher_page_size)
- def sync(self, url, creds=None, autocreate=True):
+ def sync(self, url, creds=None, autocreate=True, defer_decryption=True):
"""
Synchronize documents with remote replica exposed at url.
@@ -362,6 +431,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:type creds: dict
:param autocreate: Ask the target to create the db if non-existent.
:type autocreate: bool
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
:return: The local generation before the synchronisation was performed.
:rtype: int
@@ -370,7 +443,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
# the following context manager blocks until the syncing lock can be
# acquired.
with self.syncer(url, creds=creds) as syncer:
- res = syncer.sync(autocreate=autocreate)
+
+ # XXX could mark the critical section here...
+ try:
+ res = syncer.sync(autocreate=autocreate,
+ defer_decryption=defer_decryption)
+
+ except PendingReceivedDocsSyncError:
+ logger.warning("Local sync db is not clear, skipping sync...")
+ return
+
return res
def stop_sync(self):
@@ -394,7 +476,18 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]:
syncer = self._get_syncer(url, creds=creds)
yield syncer
- syncer.sync_target.close()
+ #syncer.sync_target.close()
+
+ @property
+ def syncing(self):
+ syncing = False
+ for url in self._syncers:
+ _, _, lock = self._syncers[url]
+ is_not_locked = lock.acquire(blocking=False)
+ if is_not_locked is False:
+ return True
+ lock.release()
+ return False
def _get_syncer(self, url, creds=None):
"""
@@ -415,11 +508,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
h = sha256(json.dumps([url, creds])).hexdigest()
cur_h, syncer = self._syncers.get(url, (None, None))
if syncer is None or h != cur_h:
- syncer = Synchronizer(
+ wlock = self._sync_db_write_lock
+ syncer = SoledadSynchronizer(
self,
SoledadSyncTarget(url,
+ self._replica_uid,
creds=creds,
- crypto=self._crypto))
+ crypto=self._crypto,
+ sync_db=self._sync_db,
+ sync_db_write_lock=wlock))
self._syncers[url] = (h, syncer)
# in order to reuse the same synchronizer multiple times we have to
# reset its state (i.e. the number of documents received from target
@@ -442,21 +539,85 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
'ALTER TABLE document '
'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE')
- def create_doc(self, content, doc_id=None):
+ def _init_sync_db(self):
+ """
+ Initialize the Symmetrically-Encrypted document to be synced database,
+ and the queue to communicate with subprocess workers.
"""
- Create a new document in the local encrypted database.
+ self._sync_db = sqlite3.connect(self._sync_db_path,
+ check_same_thread=False)
- :param content: the contents of the new document
- :type content: dict
- :param doc_id: an optional identifier specifying the document id
- :type doc_id: str
+ self._sync_db_write_lock = threading.Lock()
+ self._create_sync_db_tables()
+ self.sync_queue = multiprocessing.Queue()
+
+ def _create_sync_db_tables(self):
+ """
+ Create tables for the local sync documents db if needed.
+ """
+ encr = SyncEncrypterPool
+ decr = SyncDecrypterPool
+ sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % (
+ encr.TABLE_NAME, encr.FIELD_NAMES))
+ sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % (
+ decr.TABLE_NAME, decr.FIELD_NAMES))
+
+ with self._sync_db_write_lock:
+ with self._sync_db:
+ self._sync_db.execute(sql_encr)
+ self._sync_db.execute(sql_decr)
+
+ #
+ # Symmetric encryption of syncing docs
+ #
+
+ def _encrypt_syncing_docs(self):
+ """
+ Process the syncing queue and send the documents there
+ to be encrypted in the sync db. They will be read by the
+ SoledadSyncTarget during the sync_exchange.
+
+ Called periodical from the TimerTask self._sync_watcher.
+ """
+ lock = self.encrypting_lock
+ # optional wait flag used to avoid blocking
+ if not lock.acquire(False):
+ return
+ else:
+ queue = self.sync_queue
+ try:
+ while not queue.empty():
+ doc = queue.get_nowait()
+ self._sync_enc_pool.encrypt_doc(doc)
+
+ except Exception as exc:
+ logger.error("Error while encrypting docs to sync")
+ logger.exception(exc)
+ finally:
+ lock.release()
+
+ #
+ # Document operations
+ #
- :return: the new document
- :rtype: SoledadDocument
+ def put_doc(self, doc):
"""
- with self.create_doc_lock:
- return sqlite_backend.SQLitePartialExpandDatabase.create_doc(
- self, content, doc_id=doc_id)
+ Overwrite the put_doc method, to enqueue the modified document for
+ encryption before sync.
+
+ :param doc: The document to be put.
+ :type doc: u1db.Document
+
+ :return: The new document revision.
+ :rtype: str
+ """
+ doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(
+ self, doc)
+ if self.defer_encryption:
+ self.sync_queue.put_nowait(doc)
+ return doc_rev
+
+ # indexes
def _put_and_update_indexes(self, old_doc, doc):
"""
@@ -906,12 +1067,25 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
res = c.fetchall()
return res[0][0]
- def __del__(self):
+ def close(self):
"""
- Closes db_handle upon object destruction.
+ Close db_handle and close syncer.
"""
+ logger.debug("Sqlcipher backend: closing")
+ if self._sync_watcher is not None:
+ self._sync_watcher.stop()
+ self._sync_watcher.shutdown()
+ for url in self._syncers:
+ _, syncer = self._syncers[url]
+ syncer.close()
+ if self._sync_enc_pool is not None:
+ self._sync_enc_pool.close()
if self._db_handle is not None:
self._db_handle.close()
+ @property
+ def replica_uid(self):
+ return self._get_replica_uid()
+
sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index 56e63416..640e22ff 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -17,35 +17,86 @@
"""
-Sync infrastructure that can be interrupted and recovered.
+Soledad synchronization utilities.
+
+
+Extend u1db Synchronizer with the ability to:
+
+ * Defer the update of the known replica uid until all the decryption of
+ the incoming messages has been processed.
+
+ * Be interrupted and recovered.
"""
+
import json
+import logging
+from threading import Lock
from u1db import errors
-from u1db.sync import Synchronizer as U1DBSynchronizer
+from u1db.sync import Synchronizer
-class Synchronizer(U1DBSynchronizer):
+logger = logging.getLogger(__name__)
+
+
+class SoledadSynchronizer(Synchronizer):
"""
Collect the state around synchronizing 2 U1DB replicas.
- Modified to allow for interrupting the synchronization process.
+ Synchronization is bi-directional, in that new items in the source are sent
+ to the target, and new items in the target are returned to the source.
+ However, it still recognizes that one side is initiating the request. Also,
+ at the moment, conflicts are only created in the source.
+
+ Also modified to allow for interrupting the synchronization process.
"""
+ syncing_lock = Lock()
+
def stop(self):
"""
Stop the current sync in progress.
"""
self.sync_target.stop()
- def sync(self, autocreate=False):
+ def sync(self, autocreate=False, defer_decryption=True):
"""
Synchronize documents between source and target.
+ Differently from u1db `Synchronizer.sync` method, this one allows to
+ pass a `defer_decryption` flag that will postpone the last
+ step in the synchronization dance, namely, the setting of the last
+ known generation and transaction id for a given remote replica.
+
+ This is done to allow the ongoing parallel decryption of the incoming
+ docs to proceed without `InvalidGeneration` conflicts.
+
:param autocreate: Whether the target replica should be created or not.
:type autocreate: bool
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+ """
+ self.syncing_lock.acquire()
+ try:
+ return self._sync(autocreate=autocreate,
+ defer_decryption=defer_decryption)
+ except Exception:
+ # We release the lock if there was an error.
+ # Otherwise, the lock should be released from the function
+ # `complete_sync`.
+ self.release_syncing_lock()
+ # re-raising the exceptions to let syqlcipher.sync catch them
+ # (and re-create the syncer instance if needed)
+ raise
+
+ def _sync(self, autocreate=False, defer_decryption=True):
+ """
+ Helper function, called from the main `sync` method.
+ See `sync` docstring.
"""
sync_target = self.sync_target
@@ -64,6 +115,16 @@ class Synchronizer(U1DBSynchronizer):
target_gen, target_trans_id = (0, '')
target_my_gen, target_my_trans_id = (0, '')
+ logger.debug(
+ "Soledad target sync info:\n"
+ " target replica uid: %s\n"
+ " target generation: %d\n"
+ " target trans id: %s\n"
+ " target my gen: %d\n"
+ " target my trans_id: %s"
+ % (self.target_replica_uid, target_gen, target_trans_id,
+ target_my_gen, target_my_trans_id))
+
# make sure we'll have access to target replica uid once it exists
if self.target_replica_uid is None:
@@ -80,6 +141,8 @@ class Synchronizer(U1DBSynchronizer):
# what's changed since that generation and this current gen
my_gen, _, changes = self.source.whats_changed(target_my_gen)
+ logger.debug("Soledad sync: there are %d documents to send." \
+ % len(changes))
# get source last-seen database generation for the target
if self.target_replica_uid is None:
@@ -88,11 +151,17 @@ class Synchronizer(U1DBSynchronizer):
target_last_known_gen, target_last_known_trans_id = \
self.source._get_replica_gen_and_trans_id(
self.target_replica_uid)
+ logger.debug(
+ "Soledad source sync info:\n"
+ " source target gen: %d\n"
+ " source target trans_id: %s"
+ % (target_last_known_gen, target_last_known_trans_id))
# validate transaction ids
if not changes and target_last_known_gen == target_gen:
if target_trans_id != target_last_known_trans_id:
raise errors.InvalidTransactionId
+ self.release_syncing_lock()
return my_gen
# prepare to send all the changed docs
@@ -114,12 +183,79 @@ class Synchronizer(U1DBSynchronizer):
new_gen, new_trans_id = sync_target.sync_exchange(
docs_by_generation, self.source._replica_uid,
target_last_known_gen, target_last_known_trans_id,
- self._insert_doc_from_target, ensure_callback=ensure_callback)
+ self._insert_doc_from_target, ensure_callback=ensure_callback,
+ defer_decryption=defer_decryption)
+
+ logger.debug(
+ "Soledad source sync info after sync exchange:\n"
+ " source target gen: %d\n"
+ " source target trans_id: %s"
+ % (new_gen, new_trans_id))
+
+ info = {
+ "target_replica_uid": self.target_replica_uid,
+ "new_gen": new_gen,
+ "new_trans_id": new_trans_id,
+ "my_gen": my_gen
+ }
+ self._syncing_info = info
+
+ if defer_decryption and not sync_target.has_syncdb():
+ logger.debug("Sync target has no valid sync db, "
+ "aborting defer_decryption")
+ defer_decryption = False
- # record target synced-up-to generation including applying what we sent
+ self.complete_sync()
+ return my_gen
+
+ def complete_sync(self):
+ """
+ Last stage of the synchronization:
+ (a) record last known generation and transaction uid for the remote
+ replica, and
+ (b) make target aware of our current reached generation.
+ """
+ logger.debug("Completing deferred last step in SYNC...")
+
+ # record target synced-up-to generation including applying what we
+ # sent
+ info = self._syncing_info
self.source._set_replica_gen_and_trans_id(
- self.target_replica_uid, new_gen, new_trans_id)
+ info["target_replica_uid"], info["new_gen"], info["new_trans_id"])
+
# if gapless record current reached generation with target
- self._record_sync_info_with_the_target(my_gen)
+ self._record_sync_info_with_the_target(info["my_gen"])
+ self.syncing_lock.release()
- return my_gen
+ @property
+ def syncing(self):
+ """
+ Return True if a sync is ongoing, False otherwise.
+ :rtype: bool
+ """
+ # XXX FIXME we need some mechanism for timeout: should cleanup and
+ # release if something in the syncdb-decrypt goes wrong. we could keep
+ # track of the release date and cleanup unrealistic sync entries after
+ # some time.
+ locked = self.syncing_lock.locked()
+ return locked
+
+ def release_syncing_lock(self):
+ """
+ Release syncing lock if it's locked.
+ """
+ if self.syncing_lock.locked():
+ self.syncing_lock.release()
+
+ def close(self):
+ """
+ Close sync target pool of workers.
+ """
+ self.release_syncing_lock()
+ self.sync_target.close()
+
+ def __del__(self):
+ """
+ Cleanup: release lock.
+ """
+ self.release_syncing_lock()