summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/changes/bug_5518_close-all-connections-after-sync1
-rw-r--r--client/changes/bug_reset-synchronizer-state2
-rw-r--r--client/changes/feature_5326_use-transitional-db-for-sync1
-rw-r--r--client/changes/feature_5571_add-sync-status-signals1
-rw-r--r--client/changes/feature_5571_allow-for-interrupting-and-recovering-sync1
-rw-r--r--client/changes/feature_5571_parallelize-requests1
-rw-r--r--client/changes/feature_5571_split-sync-post1
-rw-r--r--client/pkg/requirements.pip5
-rw-r--r--client/src/leap/soledad/client/__init__.py73
-rw-r--r--client/src/leap/soledad/client/crypto.py1006
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py249
-rw-r--r--client/src/leap/soledad/client/sync.py163
-rw-r--r--client/src/leap/soledad/client/target.py1622
-rw-r--r--common/changes/bug_5739_fix-multipart-problem2
-rw-r--r--common/src/leap/soledad/common/couch.py55
-rw-r--r--common/src/leap/soledad/common/ddocs/syncs/updates/put.js141
-rw-r--r--common/src/leap/soledad/common/tests/__init__.py51
-rw-r--r--common/src/leap/soledad/common/tests/test_couch.py9
-rw-r--r--common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py18
-rw-r--r--common/src/leap/soledad/common/tests/test_crypto.py53
-rw-r--r--common/src/leap/soledad/common/tests/test_http.py64
-rw-r--r--common/src/leap/soledad/common/tests/test_http_client.py116
-rw-r--r--common/src/leap/soledad/common/tests/test_https.py108
-rw-r--r--common/src/leap/soledad/common/tests/test_server.py40
-rw-r--r--common/src/leap/soledad/common/tests/test_soledad.py5
-rw-r--r--common/src/leap/soledad/common/tests/test_soledad_doc.py44
-rw-r--r--common/src/leap/soledad/common/tests/test_sqlcipher.py27
-rw-r--r--common/src/leap/soledad/common/tests/test_sync.py138
-rw-r--r--common/src/leap/soledad/common/tests/test_sync_deferred.py227
-rw-r--r--common/src/leap/soledad/common/tests/test_sync_target.py589
-rw-r--r--common/src/leap/soledad/common/tests/test_target.py21
-rw-r--r--common/src/leap/soledad/common/tests/test_target_soledad.py102
-rw-r--r--common/src/leap/soledad/common/tests/u1db_tests/__init__.py5
-rw-r--r--common/src/leap/soledad/common/tests/u1db_tests/test_backends.py2
-rw-r--r--common/src/leap/soledad/common/tests/u1db_tests/test_sync.py3
-rwxr-xr-xscripts/profiling/spam.sh24
-rw-r--r--server/changes/bug_5368_avoid-yet-another-crypto-dep2
-rw-r--r--server/changes/feature_3399-check-auth-in-constant-way1
-rw-r--r--server/changes/feature_5571_allow-for-interrupting-and-recovering-sync1
-rw-r--r--server/changes/feature_5571_split-sync-post1
-rw-r--r--server/pkg/requirements.pip2
-rw-r--r--server/src/leap/soledad/server/sync.py30
42 files changed, 4243 insertions, 764 deletions
diff --git a/client/changes/bug_5518_close-all-connections-after-sync b/client/changes/bug_5518_close-all-connections-after-sync
new file mode 100644
index 00000000..46d679a6
--- /dev/null
+++ b/client/changes/bug_5518_close-all-connections-after-sync
@@ -0,0 +1 @@
+ o Close all connections after syncing (#5518).
diff --git a/client/changes/bug_reset-synchronizer-state b/client/changes/bug_reset-synchronizer-state
new file mode 100644
index 00000000..9678b36b
--- /dev/null
+++ b/client/changes/bug_reset-synchronizer-state
@@ -0,0 +1,2 @@
+ o Reset synchronizer state in order to reuse the same synchronizer object
+ multiple times.
diff --git a/client/changes/feature_5326_use-transitional-db-for-sync b/client/changes/feature_5326_use-transitional-db-for-sync
new file mode 100644
index 00000000..7d66141d
--- /dev/null
+++ b/client/changes/feature_5326_use-transitional-db-for-sync
@@ -0,0 +1 @@
+ o Use temporal database for encryption/decryption during sync. Closes: #5326
diff --git a/client/changes/feature_5571_add-sync-status-signals b/client/changes/feature_5571_add-sync-status-signals
new file mode 100644
index 00000000..67bc7d9f
--- /dev/null
+++ b/client/changes/feature_5571_add-sync-status-signals
@@ -0,0 +1 @@
+ o Add sync status signals (#5517).
diff --git a/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync b/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync
new file mode 100644
index 00000000..0087c535
--- /dev/null
+++ b/client/changes/feature_5571_allow-for-interrupting-and-recovering-sync
@@ -0,0 +1 @@
+ o Allow for interrupting and recovering sync (#5517).
diff --git a/client/changes/feature_5571_parallelize-requests b/client/changes/feature_5571_parallelize-requests
new file mode 100644
index 00000000..ec394cdc
--- /dev/null
+++ b/client/changes/feature_5571_parallelize-requests
@@ -0,0 +1 @@
+ o Parallelize sync requests and reuse HTTP connections.
diff --git a/client/changes/feature_5571_split-sync-post b/client/changes/feature_5571_split-sync-post
new file mode 100644
index 00000000..0d7b14dd
--- /dev/null
+++ b/client/changes/feature_5571_split-sync-post
@@ -0,0 +1 @@
+ o Split sync in multiple POST requests in client (#5571).
diff --git a/client/pkg/requirements.pip b/client/pkg/requirements.pip
index aefb8653..ae8d2dac 100644
--- a/client/pkg/requirements.pip
+++ b/client/pkg/requirements.pip
@@ -3,6 +3,9 @@ simplejson
u1db
scrypt
pycryptopp
+cchardet
+taskthread
+zope.proxy
#
# leap deps
@@ -20,5 +23,3 @@ oauth
# pysqlite should not be a dep, see #2945
pysqlite
-
-cchardet
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py
index 0d3a21fd..586e3389 100644
--- a/client/src/leap/soledad/client/__init__.py
+++ b/client/src/leap/soledad/client/__init__.py
@@ -223,33 +223,48 @@ class Soledad(object):
"""
def __init__(self, uuid, passphrase, secrets_path, local_db_path,
- server_url, cert_file, auth_token=None, secret_id=None):
+ server_url, cert_file,
+ auth_token=None, secret_id=None, defer_encryption=False):
"""
Initialize configuration, cryptographic keys and dbs.
:param uuid: User's uuid.
:type uuid: str
+
:param passphrase: The passphrase for locking and unlocking encryption
secrets for local and remote storage.
:type passphrase: unicode
+
:param secrets_path: Path for storing encrypted key used for
symmetric encryption.
:type secrets_path: str
+
:param local_db_path: Path for local encrypted storage db.
:type local_db_path: str
+
:param server_url: URL for Soledad server. This is used either to sync
- with the user's remote db and to interact with the shared recovery
- database.
+ with the user's remote db and to interact with the
+ shared recovery database.
:type server_url: str
+
:param cert_file: Path to the certificate of the ca used
to validate the SSL certificate used by the remote
soledad server.
:type cert_file: str
+
:param auth_token: Authorization token for accessing remote databases.
:type auth_token: str
+ :param secret_id: The id of the storage secret to be used.
+ :type secret_id: str
+
+ :param defer_encryption: Whether to defer encryption/decryption of
+ documents, or do it inline while syncing.
+ :type defer_encryption: bool
+
:raise BootstrapSequenceError: Raised when the secret generation and
- storage on server sequence has failed for some reason.
+ storage on server sequence has failed
+ for some reason.
"""
# get config params
self._uuid = uuid
@@ -258,8 +273,10 @@ class Soledad(object):
# init crypto variables
self._secrets = {}
self._secret_id = secret_id
- # init config (possibly with default values)
+ self._defer_encryption = defer_encryption
+
self._init_config(secrets_path, local_db_path, server_url)
+
self._set_token(auth_token)
self._shared_db_instance = None
# configure SSL certificate
@@ -390,6 +407,7 @@ class Soledad(object):
# release the lock on shared db
try:
self._shared_db.unlock(token)
+ self._shared_db.close()
except NotLockedError:
# for some reason the lock expired. Despite that, secret
# loading or generation/storage must have been executed
@@ -469,24 +487,20 @@ class Soledad(object):
create=True,
document_factory=SoledadDocument,
crypto=self._crypto,
- raw_key=True)
+ raw_key=True,
+ defer_encryption=self._defer_encryption)
def close(self):
"""
Close underlying U1DB database.
"""
+ logger.debug("Closing soledad")
if hasattr(self, '_db') and isinstance(
self._db,
SQLCipherDatabase):
+ self._db.stop_sync()
self._db.close()
- def __del__(self):
- """
- Make sure local database is closed when object is destroyed.
- """
- # Watch out! We have no guarantees that this is properly called.
- self.close()
-
#
# Management of secret for symmetric encryption.
#
@@ -520,6 +534,9 @@ class Soledad(object):
Define the id of the storage secret to be used.
This method will also replace the secret in the crypto object.
+
+ :param secret_id: The id of the storage secret to be used.
+ :type secret_id: str
"""
self._secret_id = secret_id
@@ -881,7 +898,7 @@ class Soledad(object):
:type json: str
:param doc_id: An optional identifier specifying the document id.
:type doc_id:
- :return: The new cocument
+ :return: The new document
:rtype: SoledadDocument
"""
return self._db.create_doc_from_json(json, doc_id=doc_id)
@@ -1041,7 +1058,7 @@ class Soledad(object):
if self._db:
return self._db.resolve_doc(doc, conflicted_doc_revs)
- def sync(self):
+ def sync(self, defer_decryption=True):
"""
Synchronize the local encrypted replica with a remote replica.
@@ -1051,16 +1068,25 @@ class Soledad(object):
:param url: the url of the target replica to sync with
:type url: str
- :return: the local generation before the synchronisation was
- performed.
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+
+ :return: The local generation before the synchronisation was
+ performed.
:rtype: str
"""
if self._db:
+ try:
local_gen = self._db.sync(
urlparse.urljoin(self.server_url, 'user-%s' % self._uuid),
- creds=self._creds, autocreate=False)
+ creds=self._creds, autocreate=False,
+ defer_decryption=defer_decryption)
signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)
return local_gen
+ except Exception as e:
+ logger.error("Soledad exception when syncing: %s" % str(e))
def stop_sync(self):
"""
@@ -1079,7 +1105,9 @@ class Soledad(object):
:return: Whether remote replica and local replica differ.
:rtype: bool
"""
- target = SoledadSyncTarget(url, creds=self._creds, crypto=self._crypto)
+ target = SoledadSyncTarget(
+ url, self._db._get_replica_uid(), creds=self._creds,
+ crypto=self._crypto)
info = target.get_sync_info(self._db._get_replica_uid())
# compare source generation with target's last known source generation
if self._db._get_generation() != info[4]:
@@ -1087,6 +1115,13 @@ class Soledad(object):
return True
return False
+ @property
+ def syncing(self):
+ """
+ Property, True if the syncer is syncing.
+ """
+ return self._db.syncing
+
def _set_token(self, token):
"""
Set the authentication token for remote database access.
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index a6372107..7133f804 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# crypto.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,27 +14,45 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
Cryptographic utilities for Soledad.
"""
-
-
import os
import binascii
import hmac
import hashlib
-
+import json
+import logging
+import multiprocessing
+import threading
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
+from zope.proxy import sameProxiedObjects
+
+from leap.soledad.common import soledad_assert
+from leap.soledad.common import soledad_assert_type
+from leap.soledad.common.document import SoledadDocument
+
+
+from leap.soledad.common.crypto import (
+ EncryptionSchemes,
+ UnknownEncryptionScheme,
+ MacMethods,
+ UnknownMacMethod,
+ WrongMac,
+ ENC_JSON_KEY,
+ ENC_SCHEME_KEY,
+ ENC_METHOD_KEY,
+ ENC_IV_KEY,
+ MAC_KEY,
+ MAC_METHOD_KEY,
+)
+logger = logging.getLogger(__name__)
-from leap.soledad.common import (
- soledad_assert,
- soledad_assert_type,
-)
+
+MAC_KEY_LENGTH = 64
class EncryptionMethods(object):
@@ -45,6 +63,17 @@ class EncryptionMethods(object):
AES_256_CTR = 'aes-256-ctr'
XSALSA20 = 'xsalsa20'
+#
+# Exceptions
+#
+
+
+class DocumentNotEncrypted(Exception):
+ """
+ Raised for failures in document encryption.
+ """
+ pass
+
class UnknownEncryptionMethod(Exception):
"""
@@ -59,13 +88,116 @@ class NoSymmetricSecret(Exception):
"""
-class SoledadCrypto(object):
+def encrypt_sym(data, key, method):
"""
- General cryptographic functionality.
+ Encrypt C{data} using a {password}.
+
+ Currently, the only encryption methods supported are AES-256 in CTR
+ mode and XSalsa20.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param key: The key used to encrypt C{data} (must be 256 bits long).
+ :type key: str
+ :param method: The encryption method to use.
+ :type method: str
+
+ :return: A tuple with the initial value and the encrypted data.
+ :rtype: (long, str)
+ """
+ soledad_assert_type(key, str)
+
+ soledad_assert(
+ len(key) == 32, # 32 x 8 = 256 bits.
+ 'Wrong key size: %s bits (must be 256 bits long).' %
+ (len(key) * 8))
+ iv = None
+ # AES-256 in CTR mode
+ if method == EncryptionMethods.AES_256_CTR:
+ iv = os.urandom(16)
+ ciphertext = AES(key=key, iv=iv).process(data)
+ # XSalsa20
+ elif method == EncryptionMethods.XSALSA20:
+ iv = os.urandom(24)
+ ciphertext = XSalsa20(key=key, iv=iv).process(data)
+ else:
+ # raise if method is unknown
+ raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
+ return binascii.b2a_base64(iv), ciphertext
+
+
+def decrypt_sym(data, key, method, **kwargs):
"""
+ Decrypt data using symmetric secret.
+
+ Currently, the only encryption method supported is AES-256 CTR mode.
- MAC_KEY_LENGTH = 64
+ :param data: The data to be decrypted.
+ :type data: str
+ :param key: The key used to decrypt C{data} (must be 256 bits long).
+ :type key: str
+ :param method: The encryption method to use.
+ :type method: str
+ :param kwargs: Other parameters specific to each encryption method.
+ :type kwargs: dict
+
+ :return: The decrypted data.
+ :rtype: str
+ """
+ soledad_assert_type(key, str)
+ # assert params
+ soledad_assert(
+ len(key) == 32, # 32 x 8 = 256 bits.
+ 'Wrong key size: %s (must be 256 bits long).' % len(key))
+ soledad_assert(
+ 'iv' in kwargs,
+ '%s needs an initial value.' % method)
+ # AES-256 in CTR mode
+ if method == EncryptionMethods.AES_256_CTR:
+ return AES(
+ key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
+ elif method == EncryptionMethods.XSALSA20:
+ return XSalsa20(
+ key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
+
+ # raise if method is unknown
+ raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
+
+
+def doc_mac_key(doc_id, secret):
+ """
+ Generate a key for calculating a MAC for a document whose id is
+ C{doc_id}.
+ The key is derived using HMAC having sha256 as underlying hash
+ function. The key used for HMAC is the first MAC_KEY_LENGTH characters
+ of Soledad's storage secret. The HMAC message is C{doc_id}.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+
+ :param secret: soledad secret storage
+ :type secret: Soledad.storage_secret
+
+ :return: The key.
+ :rtype: str
+
+ :raise NoSymmetricSecret: if no symmetric secret was supplied.
+ """
+ if secret is None:
+ raise NoSymmetricSecret()
+
+ return hmac.new(
+ secret[:MAC_KEY_LENGTH],
+ doc_id,
+ hashlib.sha256).digest()
+
+
+class SoledadCrypto(object):
+ """
+ General cryptographic functionality encapsulated in a
+ object that can be passed along.
+ """
def __init__(self, soledad):
"""
Initialize the crypto object.
@@ -77,78 +209,14 @@ class SoledadCrypto(object):
def encrypt_sym(self, data, key,
method=EncryptionMethods.AES_256_CTR):
- """
- Encrypt C{data} using a {password}.
-
- Currently, the only encryption method supported is AES-256 CTR mode.
-
- :param data: The data to be encrypted.
- :type data: str
- :param key: The key used to encrypt C{data} (must be 256 bits long).
- :type key: str
- :param method: The encryption method to use.
- :type method: str
-
- :return: A tuple with the initial value and the encrypted data.
- :rtype: (long, str)
- """
- soledad_assert_type(key, str)
-
- soledad_assert(
- len(key) == 32, # 32 x 8 = 256 bits.
- 'Wrong key size: %s bits (must be 256 bits long).' %
- (len(key) * 8))
- iv = None
- # AES-256 in CTR mode
- if method == EncryptionMethods.AES_256_CTR:
- iv = os.urandom(16)
- ciphertext = AES(key=key, iv=iv).process(data)
- # XSalsa20
- elif method == EncryptionMethods.XSALSA20:
- iv = os.urandom(24)
- ciphertext = XSalsa20(key=key, iv=iv).process(data)
- else:
- # raise if method is unknown
- raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
- return binascii.b2a_base64(iv), ciphertext
+ return encrypt_sym(data, key, method)
def decrypt_sym(self, data, key,
method=EncryptionMethods.AES_256_CTR, **kwargs):
- """
- Decrypt data using symmetric secret.
+ return decrypt_sym(data, key, method, **kwargs)
- Currently, the only encryption method supported is AES-256 CTR mode.
-
- :param data: The data to be decrypted.
- :type data: str
- :param key: The key used to decrypt C{data} (must be 256 bits long).
- :type key: str
- :param method: The encryption method to use.
- :type method: str
- :param kwargs: Other parameters specific to each encryption method.
- :type kwargs: dict
-
- :return: The decrypted data.
- :rtype: str
- """
- soledad_assert_type(key, str)
- # assert params
- soledad_assert(
- len(key) == 32, # 32 x 8 = 256 bits.
- 'Wrong key size: %s (must be 256 bits long).' % len(key))
- soledad_assert(
- 'iv' in kwargs,
- '%s needs an initial value.' % method)
- # AES-256 in CTR mode
- if method == EncryptionMethods.AES_256_CTR:
- return AES(
- key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
- elif method == EncryptionMethods.XSALSA20:
- return XSalsa20(
- key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
-
- # raise if method is unknown
- raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
+ def doc_mac_key(self, doc_id, secret):
+ return doc_mac_key(doc_id, self.secret)
def doc_passphrase(self, doc_id):
"""
@@ -173,41 +241,769 @@ class SoledadCrypto(object):
raise NoSymmetricSecret()
return hmac.new(
self.secret[
- self.MAC_KEY_LENGTH:
+ MAC_KEY_LENGTH:
self._soledad.REMOTE_STORAGE_SECRET_LENGTH],
doc_id,
hashlib.sha256).digest()
- def doc_mac_key(self, doc_id):
+ #
+ # secret setters/getters
+ #
+
+ def _get_secret(self):
+ return self._soledad.storage_secret
+
+ secret = property(
+ _get_secret, doc='The secret used for symmetric encryption')
+
+#
+# Crypto utilities for a SoledadDocument.
+#
+
+
+def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret):
+ """
+ Calculate a MAC for C{doc} using C{ciphertext}.
+
+ Current MAC method used is HMAC, with the following parameters:
+
+ * key: sha256(storage_secret, doc_id)
+ * msg: doc_id + doc_rev + ciphertext
+ * digestmod: sha256
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+ :param ciphertext: The content of the document.
+ :type ciphertext: str
+ :param mac_method: The MAC method to use.
+ :type mac_method: str
+ :param secret: soledad secret
+ :type secret: Soledad.secret_storage
+
+ :return: The calculated MAC.
+ :rtype: str
+ """
+ if mac_method == MacMethods.HMAC:
+ return hmac.new(
+ doc_mac_key(doc_id, secret),
+ str(doc_id) + str(doc_rev) + ciphertext,
+ hashlib.sha256).digest()
+ # raise if we do not know how to handle this MAC method
+ raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method)
+
+
+def encrypt_doc(crypto, doc):
+ """
+ Wrapper around encrypt_docstr that accepts a crypto object and the document
+ as arguments.
+
+ :param crypto: a soledad crypto object.
+ :type crypto: SoledadCrypto
+ :param doc: the document.
+ :type doc: SoledadDocument
+ """
+ key = crypto.doc_passphrase(doc.doc_id)
+ secret = crypto.secret
+
+ return encrypt_docstr(
+ doc.get_json(), doc.doc_id, doc.rev, key, secret)
+
+
+def encrypt_docstr(docstr, doc_id, doc_rev, key, secret):
+ """
+ Encrypt C{doc}'s content.
+
+ Encrypt doc's contents using AES-256 CTR mode and return a valid JSON
+ string representing the following:
+
+ {
+ ENC_JSON_KEY: '<encrypted doc JSON string>',
+ ENC_SCHEME_KEY: 'symkey',
+ ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
+ ENC_IV_KEY: '<the initial value used to encrypt>',
+ MAC_KEY: '<mac>'
+ MAC_METHOD_KEY: 'hmac'
+ }
+
+ :param docstr: A representation of the document to be encrypted.
+ :type docstr: str or unicode.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+
+ :param key: The key used to encrypt ``data`` (must be 256 bits long).
+ :type key: str
+
+ :param secret: The Soledad secret (used for MAC auth).
+ :type secret: str
+
+ :return: The JSON serialization of the dict representing the encrypted
+ content.
+ :rtype: str
+ """
+ # encrypt content using AES-256 CTR mode
+ iv, ciphertext = encrypt_sym(
+ str(docstr), # encryption/decryption routines expect str
+ key, method=EncryptionMethods.AES_256_CTR)
+ # Return a representation for the encrypted content. In the following, we
+ # convert binary data to hexadecimal representation so the JSON
+ # serialization does not complain about what it tries to serialize.
+ hex_ciphertext = binascii.b2a_hex(ciphertext)
+ return json.dumps({
+ ENC_JSON_KEY: hex_ciphertext,
+ ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY,
+ ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
+ ENC_IV_KEY: iv,
+ MAC_KEY: binascii.b2a_hex(mac_doc( # store the mac as hex.
+ doc_id, doc_rev, ciphertext,
+ MacMethods.HMAC, secret)),
+ MAC_METHOD_KEY: MacMethods.HMAC,
+ })
+
+
+def decrypt_doc(crypto, doc):
+ """
+ Wrapper around decrypt_doc_dict that accepts a crypto object and the
+ document as arguments.
+
+ :param crypto: a soledad crypto object.
+ :type crypto: SoledadCrypto
+ :param doc: the document.
+ :type doc: SoledadDocument
+
+ :return: json string with the decrypted document
+ :rtype: str
+ """
+ key = crypto.doc_passphrase(doc.doc_id)
+ secret = crypto.secret
+ return decrypt_doc_dict(doc.content, doc.doc_id, doc.rev, key, secret)
+
+
+def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret):
+ """
+ Decrypt C{doc}'s content.
+
+ Return the JSON string representation of the document's decrypted content.
+
+ The passed doc_dict argument should have the following structure:
+
+ {
+ ENC_JSON_KEY: '<enc_blob>',
+ ENC_SCHEME_KEY: '<enc_scheme>',
+ ENC_METHOD_KEY: '<enc_method>',
+ ENC_IV_KEY: '<initial value used to encrypt>', # (optional)
+ MAC_KEY: '<mac>'
+ MAC_METHOD_KEY: 'hmac'
+ }
+
+ C{enc_blob} is the encryption of the JSON serialization of the document's
+ content. For now Soledad just deals with documents whose C{enc_scheme} is
+ EncryptionSchemes.SYMKEY and C{enc_method} is
+ EncryptionMethods.AES_256_CTR.
+
+ :param doc_dict: The content of the document to be decrypted.
+ :type doc_dict: dict
+
+ :param doc_id: The document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+
+ :param key: The key used to encrypt ``data`` (must be 256 bits long).
+ :type key: str
+
+ :param secret:
+ :type secret:
+
+ :return: The JSON serialization of the decrypted content.
+ :rtype: str
+ """
+ soledad_assert(ENC_JSON_KEY in doc_dict)
+ soledad_assert(ENC_SCHEME_KEY in doc_dict)
+ soledad_assert(ENC_METHOD_KEY in doc_dict)
+ soledad_assert(MAC_KEY in doc_dict)
+ soledad_assert(MAC_METHOD_KEY in doc_dict)
+
+ # verify MAC
+ ciphertext = binascii.a2b_hex( # content is stored as hex.
+ doc_dict[ENC_JSON_KEY])
+ mac = mac_doc(
+ doc_id, doc_rev,
+ ciphertext,
+ doc_dict[MAC_METHOD_KEY], secret)
+ # we compare mac's hashes to avoid possible timing attacks that might
+ # exploit python's builtin comparison operator behaviour, which fails
+ # immediatelly when non-matching bytes are found.
+ doc_mac_hash = hashlib.sha256(
+ binascii.a2b_hex( # the mac is stored as hex
+ doc_dict[MAC_KEY])).digest()
+ calculated_mac_hash = hashlib.sha256(mac).digest()
+
+ if doc_mac_hash != calculated_mac_hash:
+ logger.warning("Wrong MAC while decrypting doc...")
+ raise WrongMac('Could not authenticate document\'s contents.')
+ # decrypt doc's content
+ enc_scheme = doc_dict[ENC_SCHEME_KEY]
+ plainjson = None
+ if enc_scheme == EncryptionSchemes.SYMKEY:
+ enc_method = doc_dict[ENC_METHOD_KEY]
+ if enc_method == EncryptionMethods.AES_256_CTR:
+ soledad_assert(ENC_IV_KEY in doc_dict)
+ plainjson = decrypt_sym(
+ ciphertext, key,
+ method=enc_method,
+ iv=doc_dict[ENC_IV_KEY])
+ else:
+ raise UnknownEncryptionMethod(enc_method)
+ else:
+ raise UnknownEncryptionScheme(enc_scheme)
+
+ return plainjson
+
+
+def is_symmetrically_encrypted(doc):
+ """
+ Return True if the document was symmetrically encrypted.
+
+ :param doc: The document to check.
+ :type doc: SoledadDocument
+
+ :rtype: bool
+ """
+ if doc.content and ENC_SCHEME_KEY in doc.content:
+ if doc.content[ENC_SCHEME_KEY] == EncryptionSchemes.SYMKEY:
+ return True
+ return False
+
+
+#
+# Encrypt/decrypt pools of workers
+#
+
+class SyncEncryptDecryptPool(object):
+ """
+ Base class for encrypter/decrypter pools.
+ """
+ WORKERS = 5
+
+ def __init__(self, crypto, sync_db, write_lock):
+ """
+ Initialize the pool of encryption-workers.
+
+ :param crypto: A SoledadCryto instance to perform the encryption.
+ :type crypto: leap.soledad.crypto.SoledadCrypto
+
+ :param sync_db: a database connection handle
+ :type sync_db: handle
+
+ :param write_lock: a write lock for controlling concurrent access
+ to the sync_db
+ :type write_lock: threading.Lock
+ """
+ self._pool = multiprocessing.Pool(self.WORKERS)
+ self._crypto = crypto
+ self._sync_db = sync_db
+ self._sync_db_write_lock = write_lock
+
+ def close(self):
+ """
+ Cleanly close the pool of workers.
+ """
+ logger.debug("Closing %s" % (self.__class__.__name__,))
+ self._pool.close()
+ try:
+ self._pool.join()
+ except Exception:
+ pass
+
+ def terminate(self):
+ """
+ Terminate the pool of workers.
+ """
+ logger.debug("Terminating %s" % (self.__class__.__name__,))
+ self._pool.terminate()
+
+
+def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
+ """
+ Encrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad secret (used for MAC auth).
+ :type secret: str
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ encrypted_content = encrypt_docstr(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, encrypted_content
+
+
+class SyncEncrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric encryption
+ of documents to be synced.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ WORKERS = 5
+ TABLE_NAME = "docs_tosync"
+ FIELD_NAMES = "doc_id, rev, content"
+
+ def encrypt_doc(self, doc, workers=True):
+ """
+ Symmetrically encrypt a document.
+
+ :param doc: The document with contents to be encrypted.
+ :type doc: SoledadDocument
+
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+ """
+ soledad_assert(self._crypto is not None, "need a crypto object")
+ docstr = doc.get_json()
+ key = self._crypto.doc_passphrase(doc.doc_id)
+ secret = self._crypto.secret
+ args = doc.doc_id, doc.rev, docstr, key, secret
+
+ try:
+ if workers:
+ res = self._pool.apply_async(
+ encrypt_doc_task, args,
+ callback=self.encrypt_doc_cb)
+ else:
+ # encrypt inline
+ res = encrypt_doc_task(*args)
+ self.encrypt_doc_cb(res)
+
+ except Exception as exc:
+ logger.exception(exc)
+
+ def encrypt_doc_cb(self, result):
"""
- Generate a key for calculating a MAC for a document whose id is
- C{doc_id}.
+ Insert results of encryption routine into the local sync database.
- The key is derived using HMAC having sha256 as underlying hash
- function. The key used for HMAC is the first MAC_KEY_LENGTH characters
- of Soledad's storage secret. The HMAC message is C{doc_id}.
+ :param result: A tuple containing the doc id, revision and encrypted
+ content.
+ :type result: tuple(str, str, str)
+ """
+ doc_id, doc_rev, content = result
+ self.insert_encrypted_local_doc(doc_id, doc_rev, content)
- :param doc_id: The id of the document.
+ def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
+ """
+ Insert the contents of the encrypted doc into the local sync
+ database.
+
+ :param doc_id: The document id.
:type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param content: The encrypted document.
+ :type content: str
+ """
+ sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,)
- :return: The key.
- :rtype: str
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_del, (doc_id, ))
+ con.execute(sql_ins, (doc_id, doc_rev, content))
- :raise NoSymmetricSecret: if no symmetric secret was supplied.
+
+def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
+ """
+ Decrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The encrypted content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification of
+ that document.
+ :type trans_id: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad secret (used for MAC auth).
+ :type secret: str
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ decrypted_content = decrypt_doc_dict(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, decrypted_content, gen, trans_id
+
+
+def get_insertable_docs_by_gen(expected, got):
+ """
+ Return a list of documents ready to be inserted. This list is computed
+ by aligning the expected list with the already gotten docs, and returning
+ the maximum number of docs that can be processed in the expected order
+ before finding a gap.
+
+ :param expected: A list of generations to be inserted.
+ :type expected: list
+
+ :param got: A dictionary whose values are the docs to be inserted.
+ :type got: dict
+ """
+ ordered = [got.get(i) for i in expected]
+ if None in ordered:
+ return ordered[:ordered.index(None)]
+ else:
+ return ordered
+
+
+class SyncDecrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric decryption
+ of documents that were received.
+
+ The decryption of the received documents is done in two steps:
+
+ 1. All the encrypted docs are collected, together with their generation
+ and transaction-id
+ 2. The docs are enqueued for decryption. When completed, they are
+ inserted following the generation order.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ TABLE_NAME = "docs_received"
+ FIELD_NAMES = "doc_id, rev, content, gen, trans_id"
+
+ write_encrypted_lock = threading.Lock()
+
+ def __init__(self, *args, **kwargs):
"""
- if self.secret is None:
- raise NoSymmetricSecret()
- return hmac.new(
- self.secret[:self.MAC_KEY_LENGTH],
- doc_id,
- hashlib.sha256).digest()
+ Initialize the decrypter pool, and setup a dict for putting the
+ results of the decrypted docs until they are picked by the insert
+ routine that gets them in order.
+ """
+ self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+ self.decrypted_docs = {}
+ self.source_replica_uid = None
- #
- # secret setters/getters
- #
+ def set_source_replica_uid(self, source_replica_uid):
+ """
+ Set the source replica uid for this decrypter pool instance.
- def _get_secret(self):
- return self._soledad.storage_secret
+ :param source_replica_uid: The uid of the source replica.
+ :type source_replica_uid: str
+ """
+ self.source_replica_uid = source_replica_uid
- secret = property(
- _get_secret, doc='The secret used for symmetric encryption')
+ def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
+ gen, trans_id):
+ """
+ Insert a received message with encrypted content, to be decrypted later
+ on.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ """
+ docstr = json.dumps(content)
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
+ self.TABLE_NAME,)
+
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id))
+
+ def insert_marker_for_received_doc(self, doc_id, doc_rev, gen):
+ """
+ Insert a marker with the document id, revision and generation on the
+ sync db. This document does not have an encrypted payload, so the
+ content has already been inserted into the decrypted_docs dictionary
+ from where it can be picked following generation order.
+ We need to leave here the marker to be able to calculate the expected
+ insertion order for a synchronization batch.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param gen: the Document Generation
+ :type gen: int
+ """
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
+ self.TABLE_NAME,)
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_ins, (doc_id, doc_rev, '', gen, ''))
+
+ def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):
+ """
+ Insert a document that is not symmetrically encrypted.
+ We store it in the staging area (the decrypted_docs dictionary) to be
+ picked up in order as the preceding documents are decrypted.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ """
+ # XXX this need a deeper review / testing.
+ # I believe that what I'm doing here is prone to problems
+ # if the sync is interrupted (ie, client crash) in the worst possible
+ # moment. We would need a recover strategy in that case
+ # (or, insert the document in the table all the same, but with a flag
+ # saying if the document is sym-encrypted or not),
+ content = json.dumps(content)
+ result = doc_id, doc_rev, content, gen, trans_id
+ self.decrypted_docs[gen] = result
+ self.insert_marker_for_received_doc(doc_id, doc_rev, gen)
+
+ def delete_encrypted_received_doc(self, doc_id, doc_rev):
+ """
+ Delete a encrypted received doc after it was inserted into the local
+ db.
+
+ :param doc_id: Document ID.
+ :type doc_id: str
+ :param doc_rev: Document revision.
+ :type doc_rev: str
+ """
+ sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
+ self.TABLE_NAME,)
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_del, (doc_id, doc_rev))
+
+ def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True):
+ """
+ Symmetrically decrypt a document.
+
+ :param doc_id: The ID for the document with contents to be encrypted.
+ :type doc: str
+ :param rev: The revision of the document.
+ :type rev: str
+ :param source_replica_uid:
+ :type source_replica_uid: str
+
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+ """
+ self.source_replica_uid = source_replica_uid
+
+ # insert_doc_cb is a proxy object that gets updated with the right
+ # insert function only when the sync_target invokes the sync_exchange
+ # method. so, if we don't still have a non-empty callback, we refuse
+ # to proceed.
+ if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid),
+ None):
+ logger.debug("Sync decrypter pool: no insert_doc_cb() yet.")
+ return
+
+ # XXX move to get_doc function...
+ c = self._sync_db.cursor()
+ sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % (
+ self.TABLE_NAME,)
+ try:
+ c.execute(sql, (doc_id, rev))
+ res = c.fetchone()
+ except Exception as exc:
+ logger.warning("Error getting docs from syncdb: %r" % (exc,))
+ return
+ if res is None:
+ logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev))
+ return
+
+ soledad_assert(self._crypto is not None, "need a crypto object")
+ try:
+ doc_id, rev, docstr, gen, trans_id = res
+ except ValueError:
+ logger.warning("Wrong entry in sync db")
+ return
+
+ if len(docstr) == 0:
+ # not encrypted payload
+ return
+
+ try:
+ content = json.loads(docstr)
+ except TypeError:
+ logger.warning("Wrong type while decoding json: %s" % repr(docstr))
+ return
+
+ key = self._crypto.doc_passphrase(doc_id)
+ secret = self._crypto.secret
+ args = doc_id, rev, content, gen, trans_id, key, secret
+
+ try:
+ if workers:
+ # Ouch. This is sent to the workers asynchronously, so
+ # we have no way of logging errors. We'd have to inspect
+ # lingering results by querying successful / get() over them...
+ # Or move the heck out of it to twisted.
+ res = self._pool.apply_async(
+ decrypt_doc_task, args,
+ callback=self.decrypt_doc_cb)
+ else:
+ # decrypt inline
+ res = decrypt_doc_task(*args)
+ self.decrypt_doc_cb(res)
+
+ except Exception as exc:
+ logger.exception(exc)
+
+ def decrypt_doc_cb(self, result):
+ """
+ Temporarily store the decryption result in a dictionary where it will
+ be picked by process_decrypted.
+
+ :param result: A tuple containing the doc id, revision and encrypted
+ content.
+ :type result: tuple(str, str, str)
+ """
+ doc_id, rev, content, gen, trans_id = result
+ logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen))
+ self.decrypted_docs[gen] = result
+
+ def get_docs_by_generation(self):
+ """
+ Get all documents in the received table from the sync db,
+ ordered by generation.
+
+ :return: list of doc_id, rev, generation
+ """
+ c = self._sync_db.cursor()
+ sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % (
+ self.TABLE_NAME,)
+ c.execute(sql)
+ return c.fetchall()
+
+ def count_received_encrypted_docs(self):
+ """
+ Count how many documents we have in the table for received and
+ encrypted docs.
+
+ :return: The count of documents.
+ :rtype: int
+ """
+ if self._sync_db is None:
+ logger.warning("cannot return count with null sync_db")
+ return
+ c = self._sync_db.cursor()
+ sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
+ c.execute(sql)
+ res = c.fetchone()
+ if res is not None:
+ return res[0]
+ else:
+ return 0
+
+ def decrypt_received_docs(self):
+ """
+ Get all the encrypted documents from the sync database and dispatch a
+ decrypt worker to decrypt each one of them.
+ """
+ docs_by_generation = self.get_docs_by_generation()
+ logger.debug("Sync decrypter pool: There are %d documents to " \
+ "decrypt." % len(docs_by_generation))
+ for doc_id, rev, gen in filter(None, docs_by_generation):
+ self.decrypt_doc(doc_id, rev, self.source_replica_uid)
+
+ def process_decrypted(self):
+ """
+ Process the already decrypted documents, and insert as many documents
+ as can be taken from the expected order without finding a gap.
+
+ :return: Whether we have processed all the pending docs.
+ :rtype: bool
+ """
+ # Acquire the lock to avoid processing while we're still
+ # getting data from the syncing stream, to avoid InvalidGeneration
+ # problems.
+ with self.write_encrypted_lock:
+ already_decrypted = self.decrypted_docs
+ docs = self.get_docs_by_generation()
+ docs = filter(lambda entry: len(entry) > 0, docs)
+ expected = [gen for doc_id, rev, gen in docs]
+ docs_to_insert = get_insertable_docs_by_gen(
+ expected, already_decrypted)
+ for doc_fields in docs_to_insert:
+ self.insert_decrypted_local_doc(*doc_fields)
+ remaining = self.count_received_encrypted_docs()
+ return remaining == 0
+
+ def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
+ gen, trans_id):
+ """
+ Insert the decrypted document into the local sqlcipher database.
+ Makes use of the passed callback `return_doc_cb` passed to the caller
+ by u1db sync.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
+ """
+ # could pass source_replica in params for callback chain
+ insert_fun = self._insert_doc_cb[self.source_replica_uid]
+ logger.debug("Sync decrypter pool: inserting doc in local db: " \
+ "%s:%s %s" % (doc_id, doc_rev, gen))
+ try:
+ # convert deleted documents to avoid error on document creation
+ if content == 'null':
+ content = None
+ doc = SoledadDocument(doc_id, doc_rev, content)
+ insert_fun(doc, int(gen), trans_id)
+ except Exception as exc:
+ logger.error("Sync decrypter pool: error while inserting "
+ "decrypted doc into local db.")
+ logger.exception(exc)
+
+ else:
+ # If no errors found, remove it from the local temporary dict
+ # and from the received database.
+ self.decrypted_docs.pop(gen)
+ self.delete_encrypted_received_doc(doc_id, doc_rev)
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 5ffa9c7e..2df9606e 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# sqlcipher.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,8 +14,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
A U1DB backend that uses SQLCipher as its persistence layer.
@@ -40,11 +38,13 @@ cipher_default_use_hmac can be used to globally alter the default use of HMAC
when opening a database.
So, as the statements above were introduced for backwards compatibility with
-SLCipher 1.1 databases, we do not implement them as all SQLCipher databases
+SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases
handled by Soledad should be created by SQLCipher >= 2.0.
"""
import logging
+import multiprocessing
import os
+import sqlite3
import string
import threading
import time
@@ -57,9 +57,12 @@ from collections import defaultdict
from pysqlcipher import dbapi2
from u1db.backends import sqlite_backend
from u1db import errors as u1db_errors
+from taskthread import TimerTask
-from leap.soledad.client.sync import Synchronizer
+from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
from leap.soledad.client.target import SoledadSyncTarget
+from leap.soledad.client.target import PendingReceivedDocsSyncError
+from leap.soledad.client.sync import SoledadSynchronizer
from leap.soledad.common.document import SoledadDocument
@@ -88,7 +91,7 @@ SQLITE_ISOLATION_LEVEL = None
def open(path, password, create=True, document_factory=None, crypto=None,
raw_key=False, cipher='aes-256-cbc', kdf_iter=4000,
- cipher_page_size=1024):
+ cipher_page_size=1024, defer_encryption=False):
"""Open a database at the given location.
Will raise u1db.errors.DatabaseDoesNotExist if create=False and the
@@ -114,6 +117,9 @@ def open(path, password, create=True, document_factory=None, crypto=None,
:type kdf_iter: int
:param cipher_page_size: The page size.
:type cipher_page_size: int
+ :param defer_encryption: Whether to defer encryption/decryption of
+ documents, or do it inline while syncing.
+ :type defer_encryption: bool
:return: An instance of Database.
:rtype SQLCipherDatabase
@@ -121,7 +127,7 @@ def open(path, password, create=True, document_factory=None, crypto=None,
return SQLCipherDatabase.open_database(
path, password, create=create, document_factory=document_factory,
crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter,
- cipher_page_size=cipher_page_size)
+ cipher_page_size=cipher_page_size, defer_encryption=defer_encryption)
#
@@ -147,19 +153,40 @@ class NotAnHexString(Exception):
#
class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
- """A U1DB implementation that uses SQLCipher as its persistence layer."""
+ """
+ A U1DB implementation that uses SQLCipher as its persistence layer.
+ """
+ defer_encryption = False
_index_storage_value = 'expand referenced encrypted'
k_lock = threading.Lock()
create_doc_lock = threading.Lock()
update_indexes_lock = threading.Lock()
+ _sync_watcher = None
+ _sync_enc_pool = None
+
+ """
+ The name of the local symmetrically encrypted documents to
+ sync database file.
+ """
+ LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db'
- syncing_lock = defaultdict(threading.Lock)
"""
A dictionary that hold locks which avoid multiple sync attempts from the
same database replica.
"""
+ encrypting_lock = threading.Lock()
+ """
+ Period or recurrence of the periodic encrypting task, in seconds.
+ """
+ ENCRYPT_TASK_PERIOD = 1
+
+ syncing_lock = defaultdict(threading.Lock)
+ """
+ A dictionary that hold locks which avoid multiple sync attempts from the
+ same database replica.
+ """
def __init__(self, sqlcipher_file, password, document_factory=None,
crypto=None, raw_key=False, cipher='aes-256-cbc',
@@ -194,7 +221,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
self.assert_db_is_encrypted(
sqlcipher_file, password, raw_key, cipher, kdf_iter,
cipher_page_size)
- # connect to the database
+
+ # connect to the sqlcipher database
with self.k_lock:
self._db_handle = dbapi2.connect(
sqlcipher_file,
@@ -215,18 +243,45 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
self._ensure_schema()
self._crypto = crypto
+ self._sync_db = None
+ self._sync_db_write_lock = None
+ self._sync_enc_pool = None
+
+ if self.defer_encryption:
+ if sqlcipher_file != ":memory:":
+ self._sync_db_path = "%s-sync" % sqlcipher_file
+ else:
+ self._sync_db_path = ":memory:"
+
+ # initialize sync db
+ self._init_sync_db()
+
+ # initialize syncing queue encryption pool
+ self._sync_enc_pool = SyncEncrypterPool(
+ self._crypto, self._sync_db, self._sync_db_write_lock)
+ self._sync_watcher = TimerTask(self._encrypt_syncing_docs,
+ self.ENCRYPT_TASK_PERIOD)
+ self._sync_watcher.start()
+
def factory(doc_id=None, rev=None, json='{}', has_conflicts=False,
syncable=True):
return SoledadDocument(doc_id=doc_id, rev=rev, json=json,
has_conflicts=has_conflicts,
syncable=syncable)
self.set_document_factory(factory)
+ # we store syncers in a dictionary indexed by the target URL. We also
+ # store a hash of the auth info in case auth info expires and we need
+ # to rebuild the syncer for that target. The final self._syncers
+ # format is the following:
+ #
+ # self._syncers = {'<url>': ('<auth_hash>', syncer), ...}
self._syncers = {}
@classmethod
def _open_database(cls, sqlcipher_file, password, document_factory=None,
crypto=None, raw_key=False, cipher='aes-256-cbc',
- kdf_iter=4000, cipher_page_size=1024):
+ kdf_iter=4000, cipher_page_size=1024,
+ defer_encryption=False):
"""
Open a SQLCipher database.
@@ -249,10 +304,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:type kdf_iter: int
:param cipher_page_size: The page size.
:type cipher_page_size: int
+ :param defer_encryption: Whether to defer encryption/decryption of
+ documents, or do it inline while syncing.
+ :type defer_encryption: bool
:return: The database object.
:rtype: SQLCipherDatabase
"""
+ cls.defer_encryption = defer_encryption
if not os.path.isfile(sqlcipher_file):
raise u1db_errors.DatabaseDoesNotExist()
@@ -298,43 +357,59 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
def open_database(cls, sqlcipher_file, password, create, backend_cls=None,
document_factory=None, crypto=None, raw_key=False,
cipher='aes-256-cbc', kdf_iter=4000,
- cipher_page_size=1024):
+ cipher_page_size=1024, defer_encryption=False):
"""
Open a SQLCipher database.
:param sqlcipher_file: The path for the SQLCipher file.
:type sqlcipher_file: str
+
:param password: The password that protects the SQLCipher db.
:type password: str
+
:param create: Should the datbase be created if it does not already
- exist?
- :type: bool
+ exist?
+ :type create: bool
+
:param backend_cls: A class to use as backend.
:type backend_cls: type
+
:param document_factory: A function that will be called with the same
- parameters as Document.__init__.
+ parameters as Document.__init__.
:type document_factory: callable
+
:param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
+ document contents when syncing.
:type crypto: soledad.crypto.SoledadCrypto
+
:param raw_key: Whether C{password} is a raw 64-char hex string or a
- passphrase that should be hashed to obtain the encyrption key.
+ passphrase that should be hashed to obtain the
+ encyrption key.
:type raw_key: bool
+
:param cipher: The cipher and mode to use.
:type cipher: str
+
:param kdf_iter: The number of iterations to use.
:type kdf_iter: int
+
:param cipher_page_size: The page size.
:type cipher_page_size: int
+ :param defer_encryption: Whether to defer encryption/decryption of
+ documents, or do it inline while syncing.
+ :type defer_encryption: bool
+
:return: The database object.
:rtype: SQLCipherDatabase
"""
+ cls.defer_encryption = defer_encryption
try:
return cls._open_database(
sqlcipher_file, password, document_factory=document_factory,
crypto=crypto, raw_key=raw_key, cipher=cipher,
- kdf_iter=kdf_iter, cipher_page_size=cipher_page_size)
+ kdf_iter=kdf_iter, cipher_page_size=cipher_page_size,
+ defer_encryption=defer_encryption)
except u1db_errors.DatabaseDoesNotExist:
if not create:
raise
@@ -347,7 +422,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
crypto=crypto, raw_key=raw_key, cipher=cipher,
kdf_iter=kdf_iter, cipher_page_size=cipher_page_size)
- def sync(self, url, creds=None, autocreate=True):
+ def sync(self, url, creds=None, autocreate=True, defer_decryption=True):
"""
Synchronize documents with remote replica exposed at url.
@@ -362,6 +437,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:type creds: dict
:param autocreate: Ask the target to create the db if non-existent.
:type autocreate: bool
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
:return: The local generation before the synchronisation was performed.
:rtype: int
@@ -370,7 +449,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
# the following context manager blocks until the syncing lock can be
# acquired.
with self.syncer(url, creds=creds) as syncer:
- res = syncer.sync(autocreate=autocreate)
+
+ # XXX could mark the critical section here...
+ try:
+ res = syncer.sync(autocreate=autocreate,
+ defer_decryption=defer_decryption)
+
+ except PendingReceivedDocsSyncError:
+ logger.warning("Local sync db is not clear, skipping sync...")
+ return
+
return res
def stop_sync(self):
@@ -394,7 +482,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]:
syncer = self._get_syncer(url, creds=creds)
yield syncer
- syncer.sync_target.close()
+
+ @property
+ def syncing(self):
+ lock = SQLCipherDatabase.syncing_lock[self._get_replica_uid()]
+ acquired_lock = lock.acquire(False)
+ if acquired_lock is False:
+ return True
+ lock.release()
+ return False
def _get_syncer(self, url, creds=None):
"""
@@ -415,11 +511,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
h = sha256(json.dumps([url, creds])).hexdigest()
cur_h, syncer = self._syncers.get(url, (None, None))
if syncer is None or h != cur_h:
- syncer = Synchronizer(
+ wlock = self._sync_db_write_lock
+ syncer = SoledadSynchronizer(
self,
SoledadSyncTarget(url,
+ self._replica_uid,
creds=creds,
- crypto=self._crypto))
+ crypto=self._crypto,
+ sync_db=self._sync_db,
+ sync_db_write_lock=wlock))
self._syncers[url] = (h, syncer)
# in order to reuse the same synchronizer multiple times we have to
# reset its state (i.e. the number of documents received from target
@@ -442,21 +542,85 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
'ALTER TABLE document '
'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE')
- def create_doc(self, content, doc_id=None):
+ def _init_sync_db(self):
+ """
+ Initialize the Symmetrically-Encrypted document to be synced database,
+ and the queue to communicate with subprocess workers.
"""
- Create a new document in the local encrypted database.
+ self._sync_db = sqlite3.connect(self._sync_db_path,
+ check_same_thread=False)
- :param content: the contents of the new document
- :type content: dict
- :param doc_id: an optional identifier specifying the document id
- :type doc_id: str
+ self._sync_db_write_lock = threading.Lock()
+ self._create_sync_db_tables()
+ self.sync_queue = multiprocessing.Queue()
+
+ def _create_sync_db_tables(self):
+ """
+ Create tables for the local sync documents db if needed.
+ """
+ encr = SyncEncrypterPool
+ decr = SyncDecrypterPool
+ sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % (
+ encr.TABLE_NAME, encr.FIELD_NAMES))
+ sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % (
+ decr.TABLE_NAME, decr.FIELD_NAMES))
+
+ with self._sync_db_write_lock:
+ with self._sync_db:
+ self._sync_db.execute(sql_encr)
+ self._sync_db.execute(sql_decr)
+
+ #
+ # Symmetric encryption of syncing docs
+ #
+
+ def _encrypt_syncing_docs(self):
+ """
+ Process the syncing queue and send the documents there
+ to be encrypted in the sync db. They will be read by the
+ SoledadSyncTarget during the sync_exchange.
+
+ Called periodical from the TimerTask self._sync_watcher.
+ """
+ lock = self.encrypting_lock
+ # optional wait flag used to avoid blocking
+ if not lock.acquire(False):
+ return
+ else:
+ queue = self.sync_queue
+ try:
+ while not queue.empty():
+ doc = queue.get_nowait()
+ self._sync_enc_pool.encrypt_doc(doc)
+
+ except Exception as exc:
+ logger.error("Error while encrypting docs to sync")
+ logger.exception(exc)
+ finally:
+ lock.release()
+
+ #
+ # Document operations
+ #
- :return: the new document
- :rtype: SoledadDocument
+ def put_doc(self, doc):
"""
- with self.create_doc_lock:
- return sqlite_backend.SQLitePartialExpandDatabase.create_doc(
- self, content, doc_id=doc_id)
+ Overwrite the put_doc method, to enqueue the modified document for
+ encryption before sync.
+
+ :param doc: The document to be put.
+ :type doc: u1db.Document
+
+ :return: The new document revision.
+ :rtype: str
+ """
+ doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(
+ self, doc)
+ if self.defer_encryption:
+ self.sync_queue.put_nowait(doc)
+ return doc_rev
+
+ # indexes
def _put_and_update_indexes(self, old_doc, doc):
"""
@@ -906,12 +1070,25 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
res = c.fetchall()
return res[0][0]
- def __del__(self):
+ def close(self):
"""
- Closes db_handle upon object destruction.
+ Close db_handle and close syncer.
"""
+ logger.debug("Sqlcipher backend: closing")
+ if self._sync_watcher is not None:
+ self._sync_watcher.stop()
+ self._sync_watcher.shutdown()
+ for url in self._syncers:
+ _, syncer = self._syncers[url]
+ syncer.close()
+ if self._sync_enc_pool is not None:
+ self._sync_enc_pool.close()
if self._db_handle is not None:
self._db_handle.close()
+ @property
+ def replica_uid(self):
+ return self._get_replica_uid()
+
sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index 56e63416..5d545a77 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -17,35 +17,85 @@
"""
-Sync infrastructure that can be interrupted and recovered.
+Soledad synchronization utilities.
+
+
+Extend u1db Synchronizer with the ability to:
+
+ * Defer the update of the known replica uid until all the decryption of
+ the incoming messages has been processed.
+
+ * Be interrupted and recovered.
"""
+
import json
+import logging
+import traceback
+from threading import Lock
from u1db import errors
-from u1db.sync import Synchronizer as U1DBSynchronizer
+from u1db.sync import Synchronizer
-class Synchronizer(U1DBSynchronizer):
+logger = logging.getLogger(__name__)
+
+
+class SoledadSynchronizer(Synchronizer):
"""
Collect the state around synchronizing 2 U1DB replicas.
- Modified to allow for interrupting the synchronization process.
+ Synchronization is bi-directional, in that new items in the source are sent
+ to the target, and new items in the target are returned to the source.
+ However, it still recognizes that one side is initiating the request. Also,
+ at the moment, conflicts are only created in the source.
+
+ Also modified to allow for interrupting the synchronization process.
"""
+ syncing_lock = Lock()
+
def stop(self):
"""
Stop the current sync in progress.
"""
self.sync_target.stop()
- def sync(self, autocreate=False):
+ def sync(self, autocreate=False, defer_decryption=True):
"""
Synchronize documents between source and target.
+ Differently from u1db `Synchronizer.sync` method, this one allows to
+ pass a `defer_decryption` flag that will postpone the last
+ step in the synchronization dance, namely, the setting of the last
+ known generation and transaction id for a given remote replica.
+
+ This is done to allow the ongoing parallel decryption of the incoming
+ docs to proceed without `InvalidGeneration` conflicts.
+
:param autocreate: Whether the target replica should be created or not.
:type autocreate: bool
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+ """
+ self.syncing_lock.acquire()
+ try:
+ return self._sync(autocreate=autocreate,
+ defer_decryption=defer_decryption)
+ except Exception:
+ # re-raising the exceptions to let syqlcipher.sync catch them
+ # (and re-create the syncer instance if needed)
+ raise
+ finally:
+ self.release_syncing_lock()
+
+ def _sync(self, autocreate=False, defer_decryption=True):
+ """
+ Helper function, called from the main `sync` method.
+ See `sync` docstring.
"""
sync_target = self.sync_target
@@ -64,6 +114,16 @@ class Synchronizer(U1DBSynchronizer):
target_gen, target_trans_id = (0, '')
target_my_gen, target_my_trans_id = (0, '')
+ logger.debug(
+ "Soledad target sync info:\n"
+ " target replica uid: %s\n"
+ " target generation: %d\n"
+ " target trans id: %s\n"
+ " target my gen: %d\n"
+ " target my trans_id: %s"
+ % (self.target_replica_uid, target_gen, target_trans_id,
+ target_my_gen, target_my_trans_id))
+
# make sure we'll have access to target replica uid once it exists
if self.target_replica_uid is None:
@@ -80,6 +140,8 @@ class Synchronizer(U1DBSynchronizer):
# what's changed since that generation and this current gen
my_gen, _, changes = self.source.whats_changed(target_my_gen)
+ logger.debug("Soledad sync: there are %d documents to send." \
+ % len(changes))
# get source last-seen database generation for the target
if self.target_replica_uid is None:
@@ -88,6 +150,11 @@ class Synchronizer(U1DBSynchronizer):
target_last_known_gen, target_last_known_trans_id = \
self.source._get_replica_gen_and_trans_id(
self.target_replica_uid)
+ logger.debug(
+ "Soledad source sync info:\n"
+ " source target gen: %d\n"
+ " source target trans_id: %s"
+ % (target_last_known_gen, target_last_known_trans_id))
# validate transaction ids
if not changes and target_last_known_gen == target_gen:
@@ -111,15 +178,85 @@ class Synchronizer(U1DBSynchronizer):
#
# The sync_exchange method may be interrupted, in which case it will
# return a tuple of Nones.
- new_gen, new_trans_id = sync_target.sync_exchange(
- docs_by_generation, self.source._replica_uid,
- target_last_known_gen, target_last_known_trans_id,
- self._insert_doc_from_target, ensure_callback=ensure_callback)
+ try:
+ new_gen, new_trans_id = sync_target.sync_exchange(
+ docs_by_generation, self.source._replica_uid,
+ target_last_known_gen, target_last_known_trans_id,
+ self._insert_doc_from_target, ensure_callback=ensure_callback,
+ defer_decryption=defer_decryption)
+ logger.debug(
+ "Soledad source sync info after sync exchange:\n"
+ " source target gen: %d\n"
+ " source target trans_id: %s"
+ % (new_gen, new_trans_id))
+ info = {
+ "target_replica_uid": self.target_replica_uid,
+ "new_gen": new_gen,
+ "new_trans_id": new_trans_id,
+ "my_gen": my_gen
+ }
+ self._syncing_info = info
+ if defer_decryption and not sync_target.has_syncdb():
+ logger.debug("Sync target has no valid sync db, "
+ "aborting defer_decryption")
+ defer_decryption = False
+ self.complete_sync()
+ except Exception as e:
+ logger.error("Soledad sync error: %s" % str(e))
+ logger.error(traceback.format_exc())
+ sync_target.stop()
+ finally:
+ sync_target.close()
- # record target synced-up-to generation including applying what we sent
+ return my_gen
+
+ def complete_sync(self):
+ """
+ Last stage of the synchronization:
+ (a) record last known generation and transaction uid for the remote
+ replica, and
+ (b) make target aware of our current reached generation.
+ """
+ logger.debug("Completing deferred last step in SYNC...")
+
+ # record target synced-up-to generation including applying what we
+ # sent
+ info = self._syncing_info
self.source._set_replica_gen_and_trans_id(
- self.target_replica_uid, new_gen, new_trans_id)
+ info["target_replica_uid"], info["new_gen"], info["new_trans_id"])
+
# if gapless record current reached generation with target
- self._record_sync_info_with_the_target(my_gen)
+ self._record_sync_info_with_the_target(info["my_gen"])
- return my_gen
+ @property
+ def syncing(self):
+ """
+ Return True if a sync is ongoing, False otherwise.
+ :rtype: bool
+ """
+ # XXX FIXME we need some mechanism for timeout: should cleanup and
+ # release if something in the syncdb-decrypt goes wrong. we could keep
+ # track of the release date and cleanup unrealistic sync entries after
+ # some time.
+ locked = self.syncing_lock.locked()
+ return locked
+
+ def release_syncing_lock(self):
+ """
+ Release syncing lock if it's locked.
+ """
+ if self.syncing_lock.locked():
+ self.syncing_lock.release()
+
+ def close(self):
+ """
+ Close sync target pool of workers.
+ """
+ self.release_syncing_lock()
+ self.sync_target.close()
+
+ def __del__(self):
+ """
+ Cleanup: release lock.
+ """
+ self.release_syncing_lock()
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 968545b6..70e4d3a2 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -14,458 +14,466 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
"""
A U1DB backend for encrypting data before sending to server and decrypting
after receiving.
"""
-import binascii
+
+
import cStringIO
import gzip
-import hashlib
-import hmac
import logging
+import re
import urllib
import threading
+import urlparse
-import simplejson as json
+from collections import defaultdict
from time import sleep
from uuid import uuid4
+from contextlib import contextmanager
-from u1db.remote import utils, http_errors
-from u1db.errors import BrokenSyncStream
+import simplejson as json
+from taskthread import TimerTask
from u1db import errors
+from u1db.remote import utils, http_errors
from u1db.remote.http_target import HTTPSyncTarget
-from u1db.remote.http_client import _encode_query_parameter
-
+from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
+from zope.proxy import ProxyBase
+from zope.proxy import sameProxiedObjects, setProxiedObject
from leap.soledad.common import soledad_assert
-from leap.soledad.common.crypto import (
- EncryptionSchemes,
- UnknownEncryptionScheme,
- MacMethods,
- UnknownMacMethod,
- WrongMac,
- ENC_JSON_KEY,
- ENC_SCHEME_KEY,
- ENC_METHOD_KEY,
- ENC_IV_KEY,
- MAC_KEY,
- MAC_METHOD_KEY,
-)
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
-from leap.soledad.client.crypto import (
- EncryptionMethods,
- UnknownEncryptionMethod,
-)
-from leap.soledad.client.events import (
- SOLEDAD_SYNC_SEND_STATUS,
- SOLEDAD_SYNC_RECEIVE_STATUS,
- signal,
-)
+from leap.soledad.client.crypto import is_symmetrically_encrypted
+from leap.soledad.client.crypto import encrypt_doc, decrypt_doc
+from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
+from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
+from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
+from leap.soledad.client.events import signal
-logger = logging.getLogger(__name__)
-#
-# Exceptions
-#
+logger = logging.getLogger(__name__)
-class DocumentNotEncrypted(Exception):
+def _gunzip(data):
"""
- Raised for failures in document encryption.
+ Uncompress data that is gzipped.
+
+ :param data: gzipped data
+ :type data: basestring
"""
- pass
+ buffer = cStringIO.StringIO()
+ buffer.write(data)
+ buffer.seek(0)
+ try:
+ data = gzip.GzipFile(mode='r', fileobj=buffer).read()
+ except Exception:
+ logger.warning("Error while decrypting gzipped data")
+ buffer.close()
+ return data
-#
-# Crypto utilities for a SoledadDocument.
-#
+class PendingReceivedDocsSyncError(Exception):
+ pass
-def mac_doc(crypto, doc_id, doc_rev, ciphertext, mac_method):
+class DocumentSyncerThread(threading.Thread):
"""
- Calculate a MAC for C{doc} using C{ciphertext}.
-
- Current MAC method used is HMAC, with the following parameters:
-
- * key: sha256(storage_secret, doc_id)
- * msg: doc_id + doc_rev + ciphertext
- * digestmod: sha256
-
- :param crypto: A SoledadCryto instance used to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
- :param doc_id: The id of the document.
- :type doc_id: str
- :param doc_rev: The revision of the document.
- :type doc_rev: str
- :param ciphertext: The content of the document.
- :type ciphertext: str
- :param mac_method: The MAC method to use.
- :type mac_method: str
-
- :return: The calculated MAC.
- :rtype: str
+ A thread that knowns how to either send or receive a document during the
+ sync process.
"""
- if mac_method == MacMethods.HMAC:
- return hmac.new(
- crypto.doc_mac_key(doc_id),
- str(doc_id) + str(doc_rev) + ciphertext,
- hashlib.sha256).digest()
- # raise if we do not know how to handle this MAC method
- raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method)
+ def __init__(self, doc_syncer, release_method, failed_method,
+ idx, total, last_request_lock=None, last_callback_lock=None):
+ """
+ Initialize a new syncer thread.
+
+ :param doc_syncer: A document syncer.
+ :type doc_syncer: HTTPDocumentSyncer
+ :param release_method: A method to be called when finished running.
+ :type release_method: callable(DocumentSyncerThread)
+ :param failed_method: A method to be called when we failed.
+ :type failed_method: callable(DocumentSyncerThread)
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ :param last_request_lock: A lock to wait for before actually performing
+ the request.
+ :type last_request_lock: threading.Lock
+ :param last_callback_lock: A lock to wait for before actually running
+ the success callback.
+ :type last_callback_lock: threading.Lock
+ """
+ threading.Thread.__init__(self)
+ self._doc_syncer = doc_syncer
+ self._release_method = release_method
+ self._failed_method = failed_method
+ self._idx = idx
+ self._total = total
+ self._last_request_lock = last_request_lock
+ self._last_callback_lock = last_callback_lock
+ self._response = None
+ self._exception = None
+ self._result = None
+ self._success = False
+ # a lock so we can signal when we're finished
+ self._request_lock = threading.Lock()
+ self._request_lock.acquire()
+ self._callback_lock = threading.Lock()
+ self._callback_lock.acquire()
+ # make thread interruptable
+ self._stopped = None
+ self._stop_lock = threading.Lock()
-def encrypt_doc(crypto, doc):
- """
- Encrypt C{doc}'s content.
-
- Encrypt doc's contents using AES-256 CTR mode and return a valid JSON
- string representing the following:
-
- {
- ENC_JSON_KEY: '<encrypted doc JSON string>',
- ENC_SCHEME_KEY: 'symkey',
- ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
- ENC_IV_KEY: '<the initial value used to encrypt>',
- MAC_KEY: '<mac>'
- MAC_METHOD_KEY: 'hmac'
- }
-
- :param crypto: A SoledadCryto instance used to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
- :param doc: The document with contents to be encrypted.
- :type doc: SoledadDocument
-
- :return: The JSON serialization of the dict representing the encrypted
- content.
- :rtype: str
- """
- soledad_assert(doc.is_tombstone() is False)
- # encrypt content using AES-256 CTR mode
- iv, ciphertext = crypto.encrypt_sym(
- str(doc.get_json()), # encryption/decryption routines expect str
- crypto.doc_passphrase(doc.doc_id),
- method=EncryptionMethods.AES_256_CTR)
- # Return a representation for the encrypted content. In the following, we
- # convert binary data to hexadecimal representation so the JSON
- # serialization does not complain about what it tries to serialize.
- hex_ciphertext = binascii.b2a_hex(ciphertext)
- return json.dumps({
- ENC_JSON_KEY: hex_ciphertext,
- ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY,
- ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
- ENC_IV_KEY: iv,
- # store the mac as hex.
- MAC_KEY: binascii.b2a_hex(
- mac_doc(
- crypto, doc.doc_id, doc.rev,
- ciphertext,
- MacMethods.HMAC)),
- MAC_METHOD_KEY: MacMethods.HMAC,
- })
-
-
-def decrypt_doc(crypto, doc):
- """
- Decrypt C{doc}'s content.
+ def run(self):
+ """
+ Run the HTTP request and store results.
- Return the JSON string representation of the document's decrypted content.
+ This method will block and wait for an eventual previous operation to
+ finish before actually performing the request. It also traps any
+ exception and register any failure with the request.
+ """
+ with self._stop_lock:
+ if self._stopped is None:
+ self._stopped = False
+ else:
+ return
+
+ # eventually wait for the previous thread to finish
+ if self._last_request_lock is not None:
+ self._last_request_lock.acquire()
+
+ # bail out in case we've been interrupted
+ if self.stopped is True:
+ return
+
+ try:
+ self._response = self._doc_syncer.do_request()
+ self._request_lock.release()
+
+ # run success callback
+ if self._doc_syncer.success_callback is not None:
+
+ # eventually wait for callback lock release
+ if self._last_callback_lock is not None:
+ self._last_callback_lock.acquire()
+
+ # bail out in case we've been interrupted
+ if self._stopped is True:
+ return
+
+ self._result = self._doc_syncer.success_callback(
+ self._idx, self._total, self._response)
+ self._success = True
+ doc_syncer = self._doc_syncer
+ self._release_method(self, doc_syncer)
+ self._doc_syncer = None
+ # let next thread executed its callback
+ self._callback_lock.release()
+
+ # trap any exception and signal failure
+ except Exception as e:
+ self._exception = e
+ self._success = False
+ # run failure callback
+ if self._doc_syncer.failure_callback is not None:
+
+ # eventually wait for callback lock release
+ if self._last_callback_lock is not None:
+ self._last_callback_lock.acquire()
+
+ # bail out in case we've been interrupted
+ if self.stopped is True:
+ return
+
+ self._doc_syncer.failure_callback(
+ self._idx, self._total, self._exception)
+
+ self._failed_method(self)
+ # we do not release the callback lock here because we
+ # failed and so we don't want other threads to succeed.
- The content of the document should have the following structure:
+ @property
+ def doc_syncer(self):
+ return self._doc_syncer
- {
- ENC_JSON_KEY: '<enc_blob>',
- ENC_SCHEME_KEY: '<enc_scheme>',
- ENC_METHOD_KEY: '<enc_method>',
- ENC_IV_KEY: '<initial value used to encrypt>', # (optional)
- MAC_KEY: '<mac>'
- MAC_METHOD_KEY: 'hmac'
- }
+ @property
+ def response(self):
+ return self._response
- C{enc_blob} is the encryption of the JSON serialization of the document's
- content. For now Soledad just deals with documents whose C{enc_scheme} is
- EncryptionSchemes.SYMKEY and C{enc_method} is
- EncryptionMethods.AES_256_CTR.
+ @property
+ def exception(self):
+ return self._exception
- :param crypto: A SoledadCryto instance to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
- :param doc: The document to be decrypted.
- :type doc: SoledadDocument
+ @property
+ def callback_lock(self):
+ return self._callback_lock
- :return: The JSON serialization of the decrypted content.
- :rtype: str
- """
- soledad_assert(doc.is_tombstone() is False)
- soledad_assert(ENC_JSON_KEY in doc.content)
- soledad_assert(ENC_SCHEME_KEY in doc.content)
- soledad_assert(ENC_METHOD_KEY in doc.content)
- soledad_assert(MAC_KEY in doc.content)
- soledad_assert(MAC_METHOD_KEY in doc.content)
- # verify MAC
- ciphertext = binascii.a2b_hex( # content is stored as hex.
- doc.content[ENC_JSON_KEY])
- mac = mac_doc(
- crypto, doc.doc_id, doc.rev,
- ciphertext,
- doc.content[MAC_METHOD_KEY])
- # we compare mac's hashes to avoid possible timing attacks that might
- # exploit python's builtin comparison operator behaviour, which fails
- # immediatelly when non-matching bytes are found.
- doc_mac_hash = hashlib.sha256(
- binascii.a2b_hex( # the mac is stored as hex
- doc.content[MAC_KEY])).digest()
- calculated_mac_hash = hashlib.sha256(mac).digest()
- if doc_mac_hash != calculated_mac_hash:
- raise WrongMac('Could not authenticate document\'s contents.')
- # decrypt doc's content
- enc_scheme = doc.content[ENC_SCHEME_KEY]
- plainjson = None
- if enc_scheme == EncryptionSchemes.SYMKEY:
- enc_method = doc.content[ENC_METHOD_KEY]
- if enc_method == EncryptionMethods.AES_256_CTR:
- soledad_assert(ENC_IV_KEY in doc.content)
- plainjson = crypto.decrypt_sym(
- ciphertext,
- crypto.doc_passphrase(doc.doc_id),
- method=enc_method,
- iv=doc.content[ENC_IV_KEY])
- else:
- raise UnknownEncryptionMethod(enc_method)
- else:
- raise UnknownEncryptionScheme(enc_scheme)
- return plainjson
+ @property
+ def request_lock(self):
+ return self._request_lock
+ @property
+ def success(self):
+ return self._success
-def _gunzip(data):
- """
- Uncompress data that is gzipped.
+ def stop(self):
+ with self._stop_lock:
+ self._stopped = True
- :param data: gzipped data
- :type data: basestring
- """
- buffer = cStringIO.StringIO()
- buffer.write(data)
- buffer.seek(0)
- try:
- data = gzip.GzipFile(mode='r', fileobj=buffer).read()
- except Exception:
- logger.warning("Error while decrypting gzipped data")
- buffer.close()
- return data
+ @property
+ def stopped(self):
+ with self._stop_lock:
+ return self._stopped
+ @property
+ def result(self):
+ return self._result
-#
-# SoledadSyncTarget
-#
-class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
+class DocumentSyncerPool(object):
"""
- A SyncTarget that encrypts data before sending and decrypts data after
- receiving.
+ A pool of reusable document syncers.
"""
- #
- # Token auth methods.
- #
+ POOL_SIZE = 10
+ """
+ The maximum amount of syncer threads running at the same time.
+ """
- def set_token_credentials(self, uuid, token):
+ def __init__(self, raw_url, raw_creds, query_string, headers,
+ ensure_callback, stop_method):
"""
- Store given credentials so we can sign the request later.
+ Initialize the document syncer pool.
+
+ :param raw_url: The complete raw URL for the HTTP request.
+ :type raw_url: str
+ :param raw_creds: The credentials for the HTTP request.
+ :type raw_creds: dict
+ :param query_string: The query string for the HTTP request.
+ :type query_string: str
+ :param headers: The headers for the HTTP request.
+ :type headers: dict
+ :param ensure_callback: A callback to ensure we have the correct
+ target_replica_uid, if it was just created.
+ :type ensure_callback: callable
- :param uuid: The user's uuid.
- :type uuid: str
- :param token: The authentication token.
- :type token: str
"""
- TokenBasedAuth.set_token_credentials(self, uuid, token)
-
- def _sign_request(self, method, url_query, params):
+ # save syncer params
+ self._raw_url = raw_url
+ self._raw_creds = raw_creds
+ self._query_string = query_string
+ self._headers = headers
+ self._ensure_callback = ensure_callback
+ self._stop_method = stop_method
+ # pool attributes
+ self._failures = False
+ self._semaphore_pool = threading.BoundedSemaphore(
+ DocumentSyncerPool.POOL_SIZE)
+ self._pool_access_lock = threading.Lock()
+ self._doc_syncers = []
+ self._threads = []
+
+ def new_syncer_thread(self, idx, total, last_request_lock=None,
+ last_callback_lock=None):
"""
- Return an authorization header to be included in the HTTP request.
+ Yield a new document syncer thread.
+
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ :param last_request_lock: A lock to wait for before actually performing
+ the request.
+ :type last_request_lock: threading.Lock
+ :param last_callback_lock: A lock to wait for before actually running
+ the success callback.
+ :type last_callback_lock: threading.Lock
+ """
+ t = None
+ # wait for available threads
+ self._semaphore_pool.acquire()
+ with self._pool_access_lock:
+ if self._failures is True:
+ return None
+ # get a syncer
+ doc_syncer = self._get_syncer()
+ # we rely on DocumentSyncerThread.run() to release the lock using
+ # self.release_syncer so we can launch a new thread.
+ t = DocumentSyncerThread(
+ doc_syncer, self.release_syncer, self.cancel_threads,
+ idx, total,
+ last_request_lock=last_request_lock,
+ last_callback_lock=last_callback_lock)
+ self._threads.append(t)
+ return t
+
+ def _failed(self):
+ with self._pool_access_lock:
+ self._failures = True
- :param method: The HTTP method.
- :type method: str
- :param url_query: The URL query string.
- :type url_query: str
- :param params: A list with encoded query parameters.
- :type param: list
+ @property
+ def failures(self):
+ return self._failures
- :return: The Authorization header.
- :rtype: list of tuple
+ def _get_syncer(self):
"""
- return TokenBasedAuth._sign_request(self, method, url_query, params)
-
- #
- # Modified HTTPSyncTarget methods.
- #
+ Get a document syncer from the pool.
- @staticmethod
- def connect(url, crypto=None):
- return SoledadSyncTarget(url, crypto=crypto)
+ This method will create a new syncer whenever there is no syncer
+ available in the pool.
- def __init__(self, url, creds=None, crypto=None):
+ :return: A syncer.
+ :rtype: HTTPDocumentSyncer
"""
- Initialize the SoledadSyncTarget.
-
- :param url: The url of the target replica to sync with.
- :type url: str
- :param creds: optional dictionary giving credentials.
- to authorize the operation with the server.
- :type creds: dict
- :param soledad: An instance of Soledad so we can encrypt/decrypt
- document contents when syncing.
- :type soledad: soledad.Soledad
+ syncer = None
+ # get an available syncer or create a new one
+ try:
+ syncer = self._doc_syncers.pop()
+ except IndexError:
+ syncer = HTTPDocumentSyncer(
+ self._raw_url, self._raw_creds, self._query_string,
+ self._headers, self._ensure_callback)
+ return syncer
+
+ def release_syncer(self, syncer_thread, doc_syncer):
"""
- HTTPSyncTarget.__init__(self, url, creds)
- self._crypto = crypto
- self._stopped = True
- self._stop_lock = threading.Lock()
+ Return a syncer to the pool after use and check for any failures.
- def _init_post_request(self, url, action, headers, content_length):
+ :param syncer: The syncer to be returned to the pool.
+ :type syncer: HTTPDocumentSyncer
"""
- Initiate a syncing POST request.
+ with self._pool_access_lock:
+ self._doc_syncers.append(doc_syncer)
+ if syncer_thread.success is True:
+ self._threads.remove(syncer_thread)
+ self._semaphore_pool.release()
- :param url: The syncing URL.
- :type url: str
- :param action: The syncing action, either 'get' or 'receive'.
- :type action: str
- :param headers: The initial headers to be sent on this request.
- :type headers: dict
- :param content_length: The content-length of the request.
- :type content_length: int
+ def cancel_threads(self, calling_thread):
"""
- self._conn.putrequest('POST', url)
- self._conn.putheader(
- 'content-type', 'application/x-soledad-sync-%s' % action)
- for header_name, header_value in headers:
- self._conn.putheader(header_name, header_value)
- self._conn.putheader('accept-encoding', 'gzip')
- self._conn.putheader('content-length', str(content_length))
- self._conn.endheaders()
-
- def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
- headers, return_doc_cb, ensure_callback, sync_id):
+ Stop all threads in the pool.
"""
- Fetch sync documents from the remote database and insert them in the
- local database.
+ # stop sync
+ self._stop_method()
+ stopped = []
+ # stop all threads
+ logger.warning("Soledad sync: cancelling sync threads...")
+ with self._pool_access_lock:
+ self._failures = True
+ while self._threads:
+ t = self._threads.pop(0)
+ t.stop()
+ self._doc_syncers.append(t.doc_syncer)
+ stopped.append(t)
+ # release locks and join
+ while stopped:
+ t = stopped.pop(0)
+ t.request_lock.acquire(False) # just in case
+ t.request_lock.release()
+ t.callback_lock.acquire(False) # just in case
+ t.callback_lock.release()
+ logger.warning("Soledad sync: cancelled sync threads.")
+
+ def cleanup(self):
+ """
+ Close and remove any syncers from the pool.
+ """
+ with self._pool_access_lock:
+ while self._doc_syncers:
+ syncer = self._doc_syncers.pop()
+ syncer.close()
+ del syncer
- If an incoming document's encryption scheme is equal to
- EncryptionSchemes.SYMKEY, then this method will decrypt it with
- Soledad's symmetric key.
- :param url: The syncing URL.
- :type url: str
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
- :param headers: The headers of the HTTP request.
+class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):
+
+ def __init__(self, raw_url, creds, query_string, headers, ensure_callback):
+ """
+ Initialize the client.
+
+ :param raw_url: The raw URL of the target HTTP server.
+ :type raw_url: str
+ :param creds: Authentication credentials.
+ :type creds: dict
+ :param query_string: The query string for the HTTP request.
+ :type query_string: str
+ :param headers: The headers for the HTTP request.
:type headers: dict
- :param return_doc_cb: A callback to insert docs from target.
- :type return_doc_cb: callable
:param ensure_callback: A callback to ensure we have the correct
target_replica_uid, if it was just created.
:type ensure_callback: callable
+ """
+ HTTPClientBase.__init__(self, raw_url, creds=creds)
+ # info needed to perform the request
+ self._query_string = query_string
+ self._headers = headers
+ self._ensure_callback = ensure_callback
+ # the actual request method
+ self._request_method = None
+ self._success_callback = None
+ self._failure_callback = None
+
+ def _reset(self):
+ """
+ Reset this document syncer so we can reuse it.
+ """
+ self._request_method = None
+ self._success_callback = None
+ self._failure_callback = None
+ self._request_method = None
- :raise BrokenSyncStream: If C{data} is malformed.
+ def set_request_method(self, method, *args, **kwargs):
+ """
+ Set the actual method to perform the request.
- :return: A dictionary representing the first line of the response got
- from remote replica.
- :rtype: list of str
- """
-
- def _post_get_doc(received):
- """
- Get a sync document from server by means of a POST request.
-
- :param received: The number of documents already received in the
- current sync session.
- :type received: int
- """
- entries = ['[']
- size = 1
- # add remote replica metadata to the request
- size += self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=ensure_callback is not None)
- # inform server of how many documents have already been received
- size += self._prepare(
- ',', entries, received=received)
- entries.append('\r\n]')
- size += len(entries[-1])
- # send headers
- self._init_post_request(url, 'get', headers, size)
- # get document
- for entry in entries:
- self._conn.send(entry)
- return self._response()
-
- number_of_changes = None
- received = 0
+ :param method: Either 'get' or 'put'.
+ :type method: str
+ :param args: Arguments for the request method.
+ :type args: list
+ :param kwargs: Keyworded arguments for the request method.
+ :type kwargs: dict
+ """
+ self._reset()
+ # resolve request method
+ if method is 'get':
+ self._request_method = self._get_doc
+ elif method is 'put':
+ self._request_method = self._put_doc
+ else:
+ raise Exception
+ # store request method args
+ self._args = args
+ self._kwargs = kwargs
- new_generation = last_known_generation
- new_transaction_id = last_known_trans_id
- while number_of_changes is None or received < number_of_changes:
- # bail out if sync process was interrupted
- if self.stopped is True:
- return last_known_generation, last_known_trans_id
- # try to fetch one document from target
- data, _ = _post_get_doc(received)
- # decode incoming stream
- parts = data.splitlines()
- if not parts or parts[0] != '[' or parts[-1] != ']':
- raise BrokenSyncStream
- data = parts[1:-1]
- # decode metadata
- line, comma = utils.check_and_strip_comma(data[0])
- metadata = None
- try:
- metadata = json.loads(line)
- soledad_assert('number_of_changes' in metadata)
- soledad_assert('new_generation' in metadata)
- soledad_assert('new_transaction_id' in metadata)
- number_of_changes = metadata['number_of_changes']
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
- except json.JSONDecodeError, AssertionError:
- raise BrokenSyncStream
- # make sure we have replica_uid from fresh new dbs
- if ensure_callback and 'replica_uid' in metadata:
- ensure_callback(metadata['replica_uid'])
- # bail out if there are no documents to be received
- if number_of_changes == 0:
- break
- # decrypt incoming document and insert into local database
- entry = None
- try:
- entry = json.loads(data[1])
- except IndexError:
- raise BrokenSyncStream
- # -------------------------------------------------------------
- # symmetric decryption of document's contents
- # -------------------------------------------------------------
- # if arriving content was symmetrically encrypted, we decrypt
- # it.
- doc = SoledadDocument(
- entry['id'], entry['rev'], entry['content'])
- if doc.content and ENC_SCHEME_KEY in doc.content:
- if doc.content[ENC_SCHEME_KEY] == \
- EncryptionSchemes.SYMKEY:
- doc.set_json(decrypt_doc(self._crypto, doc))
- # -------------------------------------------------------------
- # end of symmetric decryption
- # -------------------------------------------------------------
- return_doc_cb(doc, entry['gen'], entry['trans_id'])
- received += 1
- signal(
- SOLEDAD_SYNC_RECEIVE_STATUS,
- "%d/%d" %
- (received, number_of_changes))
- return new_generation, new_transaction_id
+ def set_success_callback(self, callback):
+ self._success_callback = callback
+
+ def set_failure_callback(self, callback):
+ self._failure_callback = callback
+
+ @property
+ def success_callback(self):
+ return self._success_callback
+
+ @property
+ def failure_callback(self):
+ return self._failure_callback
+
+ def do_request(self):
+ """
+ Actually perform the request.
+
+ :return: The body and headers of the response.
+ :rtype: tuple
+ """
+ self._ensure_connection()
+ args = self._args
+ kwargs = self._kwargs
+ return self._request_method(*args, **kwargs)
def _request(self, method, url_parts, params=None, body=None,
content_type=None):
@@ -482,6 +490,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:type body: str
:param content-type: The content-type of the request.
:type content-type: str
+
+ :return: The body and headers of the response.
+ :rtype: tuple
+
+ :raise errors.Unavailable: Raised after a number of unsuccesful
+ request attempts.
+ :raise Exception: Raised for any other exception ocurring during the
+ request.
"""
self._ensure_connection()
@@ -566,14 +582,506 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:type entries: list
:param dic: The data to be included in this entry.
:type dic: dict
+
+ :return: The size of the prepared entry.
+ :rtype: int
"""
entry = comma + '\r\n' + json.dumps(dic)
entries.append(entry)
return len(entry)
- def sync_exchange(self, docs_by_generations, source_replica_uid,
- last_known_generation, last_known_trans_id,
- return_doc_cb, ensure_callback=None):
+ def _init_post_request(self, action, content_length):
+ """
+ Initiate a syncing POST request.
+
+ :param url: The syncing URL.
+ :type url: str
+ :param action: The syncing action, either 'get' or 'receive'.
+ :type action: str
+ :param headers: The initial headers to be sent on this request.
+ :type headers: dict
+ :param content_length: The content-length of the request.
+ :type content_length: int
+ """
+ self._conn.putrequest('POST', self._query_string)
+ self._conn.putheader(
+ 'content-type', 'application/x-soledad-sync-%s' % action)
+ for header_name, header_value in self._headers:
+ self._conn.putheader(header_name, header_value)
+ self._conn.putheader('accept-encoding', 'gzip')
+ self._conn.putheader('content-length', str(content_length))
+ self._conn.endheaders()
+
+ def _get_doc(self, received, sync_id, last_known_generation,
+ last_known_trans_id):
+ """
+ Get a sync document from server by means of a POST request.
+
+ :param received: The number of documents already received in the
+ current sync session.
+ :type received: int
+ :param sync_id: The id for the current sync session.
+ :type sync_id: str
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+
+ :return: The body and headers of the response.
+ :rtype: tuple
+ """
+ entries = ['[']
+ size = 1
+ # add remote replica metadata to the request
+ size += self._prepare(
+ '', entries,
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ # inform server of how many documents have already been received
+ size += self._prepare(
+ ',', entries, received=received)
+ entries.append('\r\n]')
+ size += len(entries[-1])
+ # send headers
+ self._init_post_request('get', size)
+ # get document
+ for entry in entries:
+ self._conn.send(entry)
+ return self._response()
+
+ def _put_doc(self, sync_id, last_known_generation, last_known_trans_id,
+ id, rev, content, gen, trans_id, number_of_docs, doc_idx):
+ """
+ Put a sync document on server by means of a POST request.
+
+ :param sync_id: The id for the current sync session.
+ :type sync_id: str
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+ :param id: The document id.
+ :type id: str
+ :param rev: The document revision.
+ :type rev: str
+ :param content: The serialized document content.
+ :type content: str
+ :param gen: The generation of the modification of the document.
+ :type gen: int
+ :param trans_id: The transaction id of the modification of the
+ document.
+ :type trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
+
+ :return: The body and headers of the response.
+ :rtype: tuple
+ """
+ # prepare to send the document
+ entries = ['[']
+ size = 1
+ # add remote replica metadata to the request
+ size += self._prepare(
+ '', entries,
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ # add the document to the request
+ size += self._prepare(
+ ',', entries,
+ id=id, rev=rev, content=content, gen=gen, trans_id=trans_id,
+ number_of_docs=number_of_docs, doc_idx=doc_idx)
+ entries.append('\r\n]')
+ size += len(entries[-1])
+ # send headers
+ self._init_post_request('put', size)
+ # send document
+ for entry in entries:
+ self._conn.send(entry)
+ return self._response()
+
+ def _sign_request(self, method, url_query, params):
+ """
+ Return an authorization header to be included in the HTTP request.
+
+ :param method: The HTTP method.
+ :type method: str
+ :param url_query: The URL query string.
+ :type url_query: str
+ :param params: A list with encoded query parameters.
+ :type param: list
+
+ :return: The Authorization header.
+ :rtype: list of tuple
+ """
+ return TokenBasedAuth._sign_request(self, method, url_query, params)
+
+ def set_token_credentials(self, uuid, token):
+ """
+ Store given credentials so we can sign the request later.
+
+ :param uuid: The user's uuid.
+ :type uuid: str
+ :param token: The authentication token.
+ :type token: str
+ """
+ TokenBasedAuth.set_token_credentials(self, uuid, token)
+
+
+class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
+ """
+ A SyncTarget that encrypts data before sending and decrypts data after
+ receiving.
+
+ Normally encryption will have been written to the sync database upon
+ document modification. The sync database is also used to write temporarily
+ the parsed documents that the remote send us, before being decrypted and
+ written to the main database.
+ """
+
+ # will later keep a reference to the insert-doc callback
+ # passed to sync_exchange
+ _insert_doc_cb = defaultdict(lambda: ProxyBase(None))
+
+ """
+ Period of recurrence of the periodic decrypting task, in seconds.
+ """
+ DECRYPT_TASK_PERIOD = 0.5
+
+ #
+ # Modified HTTPSyncTarget methods.
+ #
+
+ def __init__(self, url, source_replica_uid=None, creds=None, crypto=None,
+ sync_db=None, sync_db_write_lock=None):
+ """
+ Initialize the SoledadSyncTarget.
+
+ :param source_replica_uid: The source replica uid which we use when
+ deferring decryption.
+ :type source_replica_uid: str
+ :param url: The url of the target replica to sync with.
+ :type url: str
+ :param creds: Optional dictionary giving credentials.
+ to authorize the operation with the server.
+ :type creds: dict
+ :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
+ document contents when syncing.
+ :type crypto: soledad.crypto.SoledadCrypto
+ :param sync_db: Optional. handler for the db with the symmetric
+ encryption of the syncing documents. If
+ None, encryption will be done in-place,
+ instead of retreiving it from the dedicated
+ database.
+ :type sync_db: Sqlite handler
+ :param sync_db_write_lock: a write lock for controlling concurrent
+ access to the sync_db
+ :type sync_db_write_lock: threading.Lock
+ """
+ HTTPSyncTarget.__init__(self, url, creds)
+ self._raw_url = url
+ self._raw_creds = creds
+ self._crypto = crypto
+ self._stopped = True
+ self._stop_lock = threading.Lock()
+ self._sync_exchange_lock = threading.Lock()
+ self.source_replica_uid = source_replica_uid
+ self._defer_decryption = False
+
+ # deferred decryption attributes
+ self._sync_db = None
+ self._sync_db_write_lock = None
+ self._decryption_callback = None
+ self._sync_decr_pool = None
+ self._sync_watcher = None
+ if sync_db and sync_db_write_lock is not None:
+ self._sync_db = sync_db
+ self._sync_db_write_lock = sync_db_write_lock
+
+ def _setup_sync_decr_pool(self):
+ """
+ Set up the SyncDecrypterPool for deferred decryption.
+ """
+ if self._sync_decr_pool is None:
+ # initialize syncing queue decryption pool
+ self._sync_decr_pool = SyncDecrypterPool(
+ self._crypto, self._sync_db,
+ self._sync_db_write_lock,
+ insert_doc_cb=self._insert_doc_cb)
+ self._sync_decr_pool.set_source_replica_uid(
+ self.source_replica_uid)
+
+ def _teardown_sync_decr_pool(self):
+ """
+ Tear down the SyncDecrypterPool.
+ """
+ if self._sync_decr_pool is not None:
+ self._sync_decr_pool.close()
+ self._sync_decr_pool = None
+
+ def _setup_sync_watcher(self):
+ """
+ Set up the sync watcher for deferred decryption.
+ """
+ if self._sync_watcher is None:
+ self._sync_watcher = TimerTask(
+ self._decrypt_syncing_received_docs,
+ delay=self.DECRYPT_TASK_PERIOD)
+
+ def _teardown_sync_watcher(self):
+ """
+ Tear down the sync watcher.
+ """
+ if self._sync_watcher is not None:
+ self._sync_watcher.stop()
+ self._sync_watcher.shutdown()
+ self._sync_watcher = None
+
+ def _get_replica_uid(self, url):
+ """
+ Return replica uid from the url, or None.
+
+ :param url: the replica url
+ :type url: str
+ """
+ replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url)
+ return replica_uid_match[0] if len(replica_uid_match) > 0 else None
+
+ @staticmethod
+ def connect(url, source_replica_uid=None, crypto=None):
+ return SoledadSyncTarget(
+ url, source_replica_uid=source_replica_uid, crypto=crypto)
+
+ def _parse_received_doc_response(self, response):
+ """
+ Parse the response from the server containing the received document.
+
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+ """
+ data, _ = response
+ # decode incoming stream
+ parts = data.splitlines()
+ if not parts or parts[0] != '[' or parts[-1] != ']':
+ raise errors.BrokenSyncStream
+ data = parts[1:-1]
+ # decode metadata
+ line, comma = utils.check_and_strip_comma(data[0])
+ metadata = None
+ try:
+ metadata = json.loads(line)
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
+ number_of_changes = metadata['number_of_changes']
+ except (json.JSONDecodeError, KeyError):
+ raise errors.BrokenSyncStream
+ # make sure we have replica_uid from fresh new dbs
+ if self._ensure_callback and 'replica_uid' in metadata:
+ self._ensure_callback(metadata['replica_uid'])
+ # parse incoming document info
+ doc_id = None
+ rev = None
+ content = None
+ gen = None
+ trans_id = None
+ if number_of_changes > 0:
+ try:
+ entry = json.loads(data[1])
+ doc_id = entry['id']
+ rev = entry['rev']
+ content = entry['content']
+ gen = entry['gen']
+ trans_id = entry['trans_id']
+ except (IndexError, KeyError):
+ raise errors.BrokenSyncStream
+ return new_generation, new_transaction_id, number_of_changes, \
+ doc_id, rev, content, gen, trans_id
+
+ def _insert_received_doc(self, idx, total, response):
+ """
+ Insert a received document into the local replica.
+
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+ """
+ new_generation, new_transaction_id, number_of_changes, doc_id, \
+ rev, content, gen, trans_id = \
+ self._parse_received_doc_response(response)
+ if doc_id is not None:
+ # decrypt incoming document and insert into local database
+ # -------------------------------------------------------------
+ # symmetric decryption of document's contents
+ # -------------------------------------------------------------
+ # If arriving content was symmetrically encrypted, we decrypt it.
+ # We do it inline if defer_decryption flag is False or no sync_db
+ # was defined, otherwise we defer it writing it to the received
+ # docs table.
+ doc = SoledadDocument(doc_id, rev, content)
+ if is_symmetrically_encrypted(doc):
+ if self._queue_for_decrypt:
+ self._save_encrypted_received_doc(
+ doc, gen, trans_id, idx, total)
+ else:
+ # defer_decryption is False or no-sync-db fallback
+ doc.set_json(decrypt_doc(self._crypto, doc))
+ self._return_doc_cb(doc, gen, trans_id)
+ else:
+ # not symmetrically encrypted doc, insert it directly
+ # or save it in the decrypted stage.
+ if self._queue_for_decrypt:
+ self._save_received_doc(doc, gen, trans_id, idx, total)
+ else:
+ self._return_doc_cb(doc, gen, trans_id)
+ # -------------------------------------------------------------
+ # end of symmetric decryption
+ # -------------------------------------------------------------
+ msg = "%d/%d" % (idx + 1, total)
+ signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg)
+ logger.debug("Soledad sync receive status: %s" % msg)
+ return number_of_changes, new_generation, new_transaction_id
+
+ def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
+ headers, return_doc_cb, ensure_callback, sync_id,
+ syncer_pool, defer_decryption=False):
+ """
+ Fetch sync documents from the remote database and insert them in the
+ local database.
+
+ If an incoming document's encryption scheme is equal to
+ EncryptionSchemes.SYMKEY, then this method will decrypt it with
+ Soledad's symmetric key.
+
+ :param url: The syncing URL.
+ :type url: str
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+ :param headers: The headers of the HTTP request.
+ :type headers: dict
+ :param return_doc_cb: A callback to insert docs from target.
+ :type return_doc_cb: callable
+ :param ensure_callback: A callback to ensure we have the correct
+ target_replica_uid, if it was just created.
+ :type ensure_callback: callable
+ :param sync_id: The id for the current sync session.
+ :type sync_id: str
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+
+ :raise BrokenSyncStream: If `data` is malformed.
+
+ :return: A dictionary representing the first line of the response got
+ from remote replica.
+ :rtype: dict
+ """
+ # we keep a reference to the callback in case we defer the decryption
+ self._return_doc_cb = return_doc_cb
+ self._queue_for_decrypt = defer_decryption \
+ and self._sync_db is not None
+
+ new_generation = last_known_generation
+ new_transaction_id = last_known_trans_id
+
+ if self._queue_for_decrypt:
+ logger.debug(
+ "Soledad sync: will queue received docs for decrypting.")
+
+ idx = 0
+ number_of_changes = 1
+
+ first_request = True
+ last_callback_lock = None
+ threads = []
+
+ # get incoming documents
+ while idx < number_of_changes:
+ # bail out if sync process was interrupted
+ if self.stopped is True:
+ break
+
+ # launch a thread to fetch one document from target
+ t = syncer_pool.new_syncer_thread(
+ idx, number_of_changes,
+ last_callback_lock=last_callback_lock)
+
+ # bail out if any thread failed
+ if t is None:
+ self.stop()
+ break
+
+ t.doc_syncer.set_request_method(
+ 'get', idx, sync_id, last_known_generation,
+ last_known_trans_id)
+ t.doc_syncer.set_success_callback(self._insert_received_doc)
+
+ def _failure_callback(idx, total, exception):
+ _failure_msg = "Soledad sync: error while getting document " \
+ "%d/%d: %s" \
+ % (idx + 1, total, exception)
+ logger.warning("%s" % _failure_msg)
+ logger.warning("Soledad sync: failing gracefully, will "
+ "recover on next sync.")
+
+ t.doc_syncer.set_failure_callback(_failure_callback)
+ threads.append(t)
+ t.start()
+ last_callback_lock = t.callback_lock
+ idx += 1
+
+ # if this is the first request, wait to update the number of
+ # changes
+ if first_request is True:
+ t.join()
+ if t.success:
+ number_of_changes, _, _ = t.result
+ first_request = False
+
+ # make sure all threads finished and we have up-to-date info
+ last_successful_thread = None
+ while threads:
+ # check if there are failures
+ t = threads.pop(0)
+ t.join()
+ if t.success:
+ last_successful_thread = t
+
+ # get information about last successful thread
+ if last_successful_thread is not None:
+ body, _ = last_successful_thread.response
+ parsed_body = json.loads(body)
+ # get current target gen and trans id in case no documents were
+ # transferred
+ if len(parsed_body) == 1:
+ metadata = parsed_body[0]
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
+ # get current target gen and trans id from last transferred
+ # document
+ else:
+ doc_data = parsed_body[1]
+ new_generation = doc_data['gen']
+ new_transaction_id = doc_data['trans_id']
+
+ return new_generation, new_transaction_id
+
+ def sync_exchange(self, docs_by_generations,
+ source_replica_uid, last_known_generation,
+ last_known_trans_id, return_doc_cb,
+ ensure_callback=None, defer_decryption=True,
+ sync_id=None):
"""
Find out which documents the remote database does not know about,
encrypt and send them.
@@ -586,24 +1094,54 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
the last local generation the remote
replica knows about.
:type docs_by_generations: list of tuples
+
:param source_replica_uid: The uid of the source replica.
:type source_replica_uid: str
+
:param last_known_generation: Target's last known generation.
:type last_known_generation: int
+
:param last_known_trans_id: Target's last known transaction id.
:type last_known_trans_id: str
+
:param return_doc_cb: A callback for inserting received documents from
- target.
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
:type return_doc_cb: function
+
:param ensure_callback: A callback that ensures we know the target
- replica uid if the target replica was just created.
+ replica uid if the target replica was just
+ created.
:type ensure_callback: function
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+
:return: The new generation and transaction id of the target replica.
:rtype: tuple
"""
+ self._ensure_callback = ensure_callback
+
+ if defer_decryption:
+ self._sync_exchange_lock.acquire()
+ self._setup_sync_decr_pool()
+ self._setup_sync_watcher()
+ self._defer_decryption = True
+
self.start()
- sync_id = str(uuid4())
+
+ if sync_id is None:
+ sync_id = str(uuid4())
+ self.source_replica_uid = source_replica_uid
+ # let the decrypter pool access the passed callback to insert docs
+ setProxiedObject(self._insert_doc_cb[source_replica_uid],
+ return_doc_cb)
+
+ if not self.clear_to_sync():
+ raise PendingReceivedDocsSyncError
self._ensure_connection()
if self._trace_hook: # for tests
@@ -611,78 +1149,135 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
url = '%s/sync-from/%s' % (self._url.path, source_replica_uid)
headers = self._sign_request('POST', url, {})
- def _post_put_doc(headers, last_known_generation, last_known_trans_id,
- id, rev, content, gen, trans_id, sync_id):
- """
- Put a sync document on server by means of a POST request.
-
- :param received: How many documents have already been received in
- this sync session.
- :type received: int
- """
- # prepare to send the document
- entries = ['[']
- size = 1
- # add remote replica metadata to the request
- size += self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=ensure_callback is not None)
- # add the document to the request
- size += self._prepare(
- ',', entries,
- id=id, rev=rev, content=content, gen=gen, trans_id=trans_id)
- entries.append('\r\n]')
- size += len(entries[-1])
- # send headers
- self._init_post_request(url, 'put', headers, size)
- # send document
- for entry in entries:
- self._conn.send(entry)
- data, _ = self._response()
- data = json.loads(data)
- return data[0]['new_generation'], data[0]['new_transaction_id']
-
cur_target_gen = last_known_generation
cur_target_trans_id = last_known_trans_id
# send docs
+ msg = "%d/%d" % (0, len(docs_by_generations))
+ signal(SOLEDAD_SYNC_SEND_STATUS, msg)
+ logger.debug("Soledad sync send status: %s" % msg)
+
+ defer_encryption = self._sync_db is not None
+ syncer_pool = DocumentSyncerPool(
+ self._raw_url, self._raw_creds, url, headers, ensure_callback,
+ self.stop)
+ threads = []
+ last_request_lock = None
+ last_callback_lock = None
sent = 0
- signal(
- SOLEDAD_SYNC_SEND_STATUS,
- "%d/%d" % (0, len(docs_by_generations)))
+ total = len(docs_by_generations)
+
+ synced = []
+ number_of_docs = len(docs_by_generations)
+
for doc, gen, trans_id in docs_by_generations:
# allow for interrupting the sync process
if self.stopped is True:
break
+
# skip non-syncable docs
if isinstance(doc, SoledadDocument) and not doc.syncable:
continue
+
# -------------------------------------------------------------
# symmetric encryption of document's contents
# -------------------------------------------------------------
doc_json = doc.get_json()
if not doc.is_tombstone():
- doc_json = encrypt_doc(self._crypto, doc)
+ if not defer_encryption:
+ # fallback case, for tests
+ doc_json = encrypt_doc(self._crypto, doc)
+ else:
+ try:
+ doc_json = self.get_encrypted_doc_from_db(
+ doc.doc_id, doc.rev)
+ except Exception as exc:
+ logger.error("Error while getting "
+ "encrypted doc from db")
+ logger.exception(exc)
+ continue
+ if doc_json is None:
+ # Not marked as tombstone, but we got nothing
+ # from the sync db. As it is not encrypted yet, we
+ # force inline encryption.
+ # TODO: implement a queue to deal with these cases.
+ doc_json = encrypt_doc(self._crypto, doc)
# -------------------------------------------------------------
# end of symmetric encryption
# -------------------------------------------------------------
- cur_target_gen, cur_target_trans_id = _post_put_doc(
- headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id,
- rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id,
- sync_id=sync_id)
+ t = syncer_pool.new_syncer_thread(
+ sent + 1, total, last_request_lock=None,
+ last_callback_lock=last_callback_lock)
+
+ # bail out if any thread failed
+ if t is None:
+ self.stop()
+ break
+
+ # set the request method
+ t.doc_syncer.set_request_method(
+ 'put', sync_id, cur_target_gen, cur_target_trans_id,
+ id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
+ trans_id=trans_id, number_of_docs=number_of_docs, doc_idx=sent + 1)
+ # set the success calback
+
+ def _success_callback(idx, total, response):
+ _success_msg = "Soledad sync send status: %d/%d" \
+ % (idx, total)
+ signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg)
+ logger.debug(_success_msg)
+
+ t.doc_syncer.set_success_callback(_success_callback)
+
+ # set the failure callback
+ def _failure_callback(idx, total, exception):
+ _failure_msg = "Soledad sync: error while sending document " \
+ "%d/%d: %s" % (idx, total, exception)
+ logger.warning("%s" % _failure_msg)
+ logger.warning("Soledad sync: failing gracefully, will "
+ "recover on next sync.")
+
+ t.doc_syncer.set_failure_callback(_failure_callback)
+
+ # save thread and append
+ t.start()
+ threads.append((t, doc))
+ last_request_lock = t.request_lock
+ last_callback_lock = t.callback_lock
sent += 1
- signal(
- SOLEDAD_SYNC_SEND_STATUS,
- "%d/%d" % (sent, len(docs_by_generations)))
+
+ # make sure all threads finished and we have up-to-date info
+ while threads:
+ # check if there are failures
+ t, doc = threads.pop(0)
+ t.join()
+ if t.success:
+ synced.append((doc.doc_id, doc.rev))
+
+ if defer_decryption:
+ self._sync_watcher.start()
# get docs from target
- cur_target_gen, cur_target_trans_id = self._get_remote_docs(
- url,
- last_known_generation, last_known_trans_id, headers,
- return_doc_cb, ensure_callback, sync_id)
+ if self.stopped is False:
+ cur_target_gen, cur_target_trans_id = self._get_remote_docs(
+ url,
+ last_known_generation, last_known_trans_id, headers,
+ return_doc_cb, ensure_callback, sync_id, syncer_pool,
+ defer_decryption=defer_decryption)
+ syncer_pool.cleanup()
+
+ # delete documents from the sync database
+ if defer_encryption:
+ self.delete_encrypted_docs_from_db(synced)
+
+ # wait for deferred decryption to finish
+ if defer_decryption:
+ while self.clear_to_sync() is False:
+ sleep(self.DECRYPT_TASK_PERIOD)
+ self._teardown_sync_watcher()
+ self._teardown_sync_decr_pool()
+ self._sync_exchange_lock.release()
+
self.stop()
return cur_target_gen, cur_target_trans_id
@@ -714,3 +1309,164 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
with self._stop_lock:
return self._stopped is True
+
+ def get_encrypted_doc_from_db(self, doc_id, doc_rev):
+ """
+ Retrieve encrypted document from the database of encrypted docs for
+ sync.
+
+ :param doc_id: The Document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision
+ :type doc_rev: str
+ """
+ encr = SyncEncrypterPool
+ c = self._sync_db.cursor()
+ sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
+ encr.TABLE_NAME,))
+ c.execute(sql, (doc_id, doc_rev))
+ res = c.fetchall()
+ if len(res) != 0:
+ return res[0][0]
+
+ def delete_encrypted_docs_from_db(self, docs_ids):
+ """
+ Delete several encrypted documents from the database of symmetrically
+ encrypted docs to sync.
+
+ :param docs_ids: an iterable with (doc_id, doc_rev) for all documents
+ to be deleted.
+ :type docs_ids: any iterable of tuples of str
+ """
+ if docs_ids:
+ encr = SyncEncrypterPool
+ c = self._sync_db.cursor()
+ for doc_id, doc_rev in docs_ids:
+ sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % (
+ encr.TABLE_NAME,))
+ c.execute(sql, (doc_id, doc_rev))
+ self._sync_db.commit()
+
+ def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
+ """
+ Save a symmetrically encrypted incoming document into the received
+ docs table in the sync db. A decryption task will pick it up
+ from here in turn.
+
+ :param doc: The document to save.
+ :type doc: SoledadDocument
+ :param gen: The generation.
+ :type gen: str
+ :param trans_id: Transacion id.
+ :type gen: str
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ """
+ logger.debug(
+ "Enqueueing doc for decryption: %d/%d."
+ % (idx + 1, total))
+ self._sync_decr_pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id)
+
+ def _save_received_doc(self, doc, gen, trans_id, idx, total):
+ """
+ Save any incoming document into the received docs table in the sync db.
+
+ :param doc: The document to save.
+ :type doc: SoledadDocument
+ :param gen: The generation.
+ :type gen: str
+ :param trans_id: Transacion id.
+ :type gen: str
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ """
+ logger.debug(
+ "Enqueueing doc, no decryption needed: %d/%d."
+ % (idx + 1, total))
+ self._sync_decr_pool.insert_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id)
+
+ #
+ # Symmetric decryption of syncing docs
+ #
+
+ def clear_to_sync(self):
+ """
+ Return True if sync can proceed (ie, the received db table is empty).
+ :rtype: bool
+ """
+ if self._sync_decr_pool is not None:
+ return self._sync_decr_pool.count_received_encrypted_docs() == 0
+ else:
+ return True
+
+ def set_decryption_callback(self, cb):
+ """
+ Set callback to be called when the decryption finishes.
+
+ :param cb: The callback to be set.
+ :type cb: callable
+ """
+ self._decryption_callback = cb
+
+ def has_decryption_callback(self):
+ """
+ Return True if there is a decryption callback set.
+ :rtype: bool
+ """
+ return self._decryption_callback is not None
+
+ def has_syncdb(self):
+ """
+ Return True if we have an initialized syncdb.
+ """
+ return self._sync_db is not None
+
+ def _decrypt_syncing_received_docs(self):
+ """
+ Decrypt the documents received from remote replica and insert them
+ into the local one.
+
+ Called periodically from TimerTask self._sync_watcher.
+ """
+ if sameProxiedObjects(
+ self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ return
+
+ decrypter = self._sync_decr_pool
+ decrypter.decrypt_received_docs()
+ done = decrypter.process_decrypted()
+
+ def _sign_request(self, method, url_query, params):
+ """
+ Return an authorization header to be included in the HTTP request.
+
+ :param method: The HTTP method.
+ :type method: str
+ :param url_query: The URL query string.
+ :type url_query: str
+ :param params: A list with encoded query parameters.
+ :type param: list
+
+ :return: The Authorization header.
+ :rtype: list of tuple
+ """
+ return TokenBasedAuth._sign_request(self, method, url_query, params)
+
+ def set_token_credentials(self, uuid, token):
+ """
+ Store given credentials so we can sign the request later.
+
+ :param uuid: The user's uuid.
+ :type uuid: str
+ :param token: The authentication token.
+ :type token: str
+ """
+ TokenBasedAuth.set_token_credentials(self, uuid, token)
diff --git a/common/changes/bug_5739_fix-multipart-problem b/common/changes/bug_5739_fix-multipart-problem
new file mode 100644
index 00000000..449e09b8
--- /dev/null
+++ b/common/changes/bug_5739_fix-multipart-problem
@@ -0,0 +1,2 @@
+ o Use a dedicated HTTP resource for couch multipart PUTs to avoid bigcouch
+ bug (#5739).
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index b51b32f3..5658f4ce 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -1106,7 +1106,9 @@ class CouchDatabase(CommonBackend):
)
def _set_replica_gen_and_trans_id(self, other_replica_uid,
- other_generation, other_transaction_id):
+ other_generation, other_transaction_id,
+ number_of_docs=None, doc_idx=None,
+ sync_id=None):
"""
Set the last-known generation and transaction id for the other
database replica.
@@ -1122,12 +1124,21 @@ class CouchDatabase(CommonBackend):
:param other_transaction_id: The transaction id associated with the
generation.
:type other_transaction_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
"""
self._do_set_replica_gen_and_trans_id(
- other_replica_uid, other_generation, other_transaction_id)
+ other_replica_uid, other_generation, other_transaction_id,
+ number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id)
def _do_set_replica_gen_and_trans_id(
- self, other_replica_uid, other_generation, other_transaction_id):
+ self, other_replica_uid, other_generation, other_transaction_id,
+ number_of_docs=None, doc_idx=None, sync_id=None):
"""
Set the last-known generation and transaction id for the other
database replica.
@@ -1143,6 +1154,13 @@ class CouchDatabase(CommonBackend):
:param other_transaction_id: The transaction id associated with the
generation.
:type other_transaction_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
:raise MissingDesignDocError: Raised when tried to access a missing
design document.
@@ -1163,12 +1181,19 @@ class CouchDatabase(CommonBackend):
res = self._database.resource(*ddoc_path)
try:
with CouchDatabase.update_handler_lock[self._get_replica_uid()]:
+ body={
+ 'other_replica_uid': other_replica_uid,
+ 'other_generation': other_generation,
+ 'other_transaction_id': other_transaction_id,
+ }
+ if number_of_docs is not None:
+ body['number_of_docs'] = number_of_docs
+ if doc_idx is not None:
+ body['doc_idx'] = doc_idx
+ if sync_id is not None:
+ body['sync_id'] = sync_id
res.put_json(
- body={
- 'other_replica_uid': other_replica_uid,
- 'other_generation': other_generation,
- 'other_transaction_id': other_transaction_id,
- },
+ body=body,
headers={'content-type': 'application/json'})
except ResourceNotFound as e:
raise_missing_design_doc_error(e, ddoc_path)
@@ -1306,7 +1331,8 @@ class CouchDatabase(CommonBackend):
doc.set_conflicts(cur_doc.get_conflicts())
def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,
- replica_trans_id=''):
+ replica_trans_id='', number_of_docs=None,
+ doc_idx=None, sync_id=None):
"""
Insert/update document into the database with a given revision.
@@ -1339,6 +1365,13 @@ class CouchDatabase(CommonBackend):
:param replica_trans_id: The transaction_id associated with the
generation.
:type replica_trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
:return: (state, at_gen) - If we don't have doc_id already, or if
doc_rev supersedes the existing document revision, then the
@@ -1398,7 +1431,9 @@ class CouchDatabase(CommonBackend):
self._force_doc_sync_conflict(doc)
if replica_uid is not None and replica_gen is not None:
self._set_replica_gen_and_trans_id(
- replica_uid, replica_gen, replica_trans_id)
+ replica_uid, replica_gen, replica_trans_id,
+ number_of_docs=number_of_docs, doc_idx=doc_idx,
+ sync_id=sync_id)
# update info
old_doc.rev = doc.rev
if doc.is_tombstone():
diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
index 722f695a..b0ae2de6 100644
--- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
+++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
@@ -1,22 +1,151 @@
+/**
+ * The u1db_sync_log document stores both the actual sync log and a list of
+ * pending updates to the log, in case we receive incoming documents out of
+ * the correct order (i.e. if there are parallel PUTs during the sync
+ * process).
+ *
+ * The structure of the document is the following:
+ *
+ * {
+ * 'syncs': [
+ * ['<replica_uid>', <gen>, '<trans_id>'],
+ * ...
+ * ],
+ * 'pending': {
+ * 'other_replica_uid': {
+ * 'sync_id': '<sync_id>',
+ * 'log': [[<gen>, '<trans_id>'], ...]
+ * },
+ * ...
+ * }
+ * }
+ *
+ * The update function below does the following:
+ *
+ * 0. If we do not receive a sync_id, we just update the 'syncs' list with
+ * the incoming info about the source replica state.
+ *
+ * 1. Otherwise, if the incoming sync_id differs from current stored
+ * sync_id, then we assume that the previous sync session for that source
+ * replica was interrupted and discard all pending data.
+ *
+ * 2. Then we append incoming info as pending data for that source replica
+ * and current sync_id, and sort the pending data by generation.
+ *
+ * 3. Then we go through pending data and find the most recent generation
+ * that we can use to update the actual sync log.
+ *
+ * 4. Finally, we insert the most up to date information into the sync log.
+ */
function(doc, req){
+
+ // create the document if it doesn't exist
if (!doc) {
doc = {}
doc['_id'] = 'u1db_sync_log';
doc['syncs'] = [];
}
- body = JSON.parse(req.body);
+
+ // get and validate incoming info
+ var body = JSON.parse(req.body);
+ var other_replica_uid = body['other_replica_uid'];
+ var other_generation = parseInt(body['other_generation']);
+ var other_transaction_id = body['other_transaction_id']
+ var sync_id = body['sync_id'];
+ var number_of_docs = body['number_of_docs'];
+ var doc_idx = body['doc_idx'];
+
+ // parse integers
+ if (number_of_docs != null)
+ number_of_docs = parseInt(number_of_docs);
+ if (doc_idx != null)
+ doc_idx = parseInt(doc_idx);
+
+ if (other_replica_uid == null
+ || other_generation == null
+ || other_transaction_id == null)
+ return [null, 'invalid data'];
+
+ // create slot for pending logs
+ if (doc['pending'] == null)
+ doc['pending'] = {};
+
+ // these are the values that will be actually inserted
+ var current_gen = other_generation;
+ var current_trans_id = other_transaction_id;
+
+ /*------------- Wait for sequential values before storing -------------*/
+
+ // we just try to obtain pending log if we received a sync_id
+ if (sync_id != null) {
+
+ // create slot for current source and sync_id pending log
+ if (doc['pending'][other_replica_uid] == null
+ || doc['pending'][other_replica_uid]['sync_id'] != sync_id) {
+ doc['pending'][other_replica_uid] = {
+ 'sync_id': sync_id,
+ 'log': [],
+ 'last_doc_idx': 0,
+ }
+ }
+
+ // append incoming data to pending log
+ doc['pending'][other_replica_uid]['log'].push([
+ other_generation,
+ other_transaction_id,
+ doc_idx,
+ ])
+
+ // sort pending log according to generation
+ doc['pending'][other_replica_uid]['log'].sort(function(a, b) {
+ return a[0] - b[0];
+ });
+
+ // get most up-to-date information from pending log
+ var last_doc_idx = doc['pending'][other_replica_uid]['last_doc_idx'];
+ var pending_idx = doc['pending'][other_replica_uid]['log'][0][2];
+
+ current_gen = null;
+ current_trans_id = null;
+
+ while (last_doc_idx + 1 == pending_idx) {
+ pending = doc['pending'][other_replica_uid]['log'].shift()
+ current_gen = pending[0];
+ current_trans_id = pending[1];
+ last_doc_idx = pending[2]
+ if (doc['pending'][other_replica_uid]['log'].length == 0)
+ break;
+ pending_idx = doc['pending'][other_replica_uid]['log'][0][2];
+ }
+
+ // leave the sync log untouched if we still did not receive enough docs
+ if (current_gen == null)
+ return [doc, 'ok'];
+
+ // update last index of received doc
+ doc['pending'][other_replica_uid]['last_doc_idx'] = last_doc_idx;
+
+ // eventually remove all pending data from that replica
+ if (last_doc_idx == number_of_docs)
+ delete doc['pending'][other_replica_uid]
+ }
+
+ /*--------------- Store source replica info on sync log ---------------*/
+
// remove outdated info
doc['syncs'] = doc['syncs'].filter(
function (entry) {
- return entry[0] != body['other_replica_uid'];
+ return entry[0] != other_replica_uid;
}
);
- // store u1db rev
+
+ // store in log
doc['syncs'].push([
- body['other_replica_uid'],
- body['other_generation'],
- body['other_transaction_id']
+ other_replica_uid,
+ current_gen,
+ current_trans_id
]);
+
return [doc, 'ok'];
}
diff --git a/common/src/leap/soledad/common/tests/__init__.py b/common/src/leap/soledad/common/tests/__init__.py
index a38bdaed..3081683b 100644
--- a/common/src/leap/soledad/common/tests/__init__.py
+++ b/common/src/leap/soledad/common/tests/__init__.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# __init__.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -19,7 +19,6 @@
"""
Tests to make sure Soledad provides U1DB functionality and more.
"""
-
import os
import random
import string
@@ -29,11 +28,8 @@ from mock import Mock
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client import Soledad
-from leap.soledad.client.crypto import SoledadCrypto
-from leap.soledad.client.target import (
- decrypt_doc,
- ENC_SCHEME_KEY,
-)
+from leap.soledad.client.crypto import decrypt_doc_dict
+from leap.soledad.client.crypto import ENC_SCHEME_KEY
from leap.common.testing.basetest import BaseLeapTest
@@ -49,6 +45,7 @@ class BaseSoledadTest(BaseLeapTest):
"""
Instantiates Soledad for usage in tests.
"""
+ defer_sync_encryption = False
def setUp(self):
# config info
@@ -73,11 +70,26 @@ class BaseSoledadTest(BaseLeapTest):
self._db1.close()
self._db2.close()
self._soledad.close()
+
# XXX should not access "private" attrs
for f in [self._soledad._local_db_path, self._soledad._secrets_path]:
if os.path.isfile(f):
os.unlink(f)
+ def get_default_shared_mock(self, put_doc_side_effect):
+ """
+ Get a default class for mocking the shared DB
+ """
+ class defaultMockSharedDB(object):
+ get_doc = Mock(return_value=None)
+ put_doc = Mock(side_effect=put_doc_side_effect)
+ lock = Mock(return_value=('atoken', 300))
+ unlock = Mock(return_value=True)
+
+ def __call__(self):
+ return self
+ return defaultMockSharedDB
+
def _soledad_instance(self, user=ADDRESS, passphrase=u'123',
prefix='',
secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME,
@@ -88,18 +100,11 @@ class BaseSoledadTest(BaseLeapTest):
def _put_doc_side_effect(doc):
self._doc_put = doc
- class MockSharedDB(object):
-
- get_doc = Mock(return_value=None)
- put_doc = Mock(side_effect=_put_doc_side_effect)
- lock = Mock(return_value=('atoken', 300))
- unlock = Mock(return_value=True)
-
- def __call__(self):
- return self
-
if shared_db_class is not None:
MockSharedDB = shared_db_class
+ else:
+ MockSharedDB = self.get_default_shared_mock(
+ _put_doc_side_effect)
Soledad._shared_db = MockSharedDB()
return Soledad(
@@ -111,7 +116,8 @@ class BaseSoledadTest(BaseLeapTest):
self.tempdir, prefix, local_db_path),
server_url=server_url, # Soledad will fail if not given an url.
cert_file=cert_file,
- secret_id=secret_id)
+ secret_id=secret_id,
+ defer_encryption=self.defer_sync_encryption)
def assertGetEncryptedDoc(
self, db, doc_id, doc_rev, content, has_conflicts):
@@ -121,8 +127,15 @@ class BaseSoledadTest(BaseLeapTest):
exp_doc = self.make_document(doc_id, doc_rev, content,
has_conflicts=has_conflicts)
doc = db.get_doc(doc_id)
+
if ENC_SCHEME_KEY in doc.content:
- doc.set_json(decrypt_doc(self._soledad._crypto, doc))
+ # XXX check for SYM_KEY too
+ key = self._soledad._crypto.doc_passphrase(doc.doc_id)
+ secret = self._soledad._crypto.secret
+ decrypted = decrypt_doc_dict(
+ doc.content, doc.doc_id, doc.rev,
+ key, secret)
+ doc.set_json(decrypted)
self.assertEqual(exp_doc.doc_id, doc.doc_id)
self.assertEqual(exp_doc.rev, doc.rev)
self.assertEqual(exp_doc.has_conflicts, doc.has_conflicts)
diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py
index 3b1e5a06..10d6c136 100644
--- a/common/src/leap/soledad/common/tests/test_couch.py
+++ b/common/src/leap/soledad/common/tests/test_couch.py
@@ -91,14 +91,19 @@ class CouchDBWrapper(object):
logPath = os.path.join(self.tempdir, 'log', 'couch.log')
while not os.path.exists(logPath):
if self.process.poll() is not None:
+ got_stdout, got_stderr = "", ""
+ if self.process.stdout is not None:
+ got_stdout = self.process.stdout.read()
+
+ if self.process.stderr is not None:
+ got_stderr = self.process.stderr.read()
raise Exception("""
couchdb exited with code %d.
stdout:
%s
stderr:
%s""" % (
- self.process.returncode, self.process.stdout.read(),
- self.process.stderr.read()))
+ self.process.returncode, got_stdout, got_stderr))
time.sleep(0.01)
while os.stat(logPath).st_size == 0:
time.sleep(0.01)
diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
index b03f79e7..6465eb80 100644
--- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
+++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
-# test_soledad.py
-# Copyright (C) 2013 LEAP
+# test_couch_operations_atomicity.py
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,11 +14,9 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
+Test atomocity for couch operations.
"""
-
import os
import mock
import tempfile
@@ -32,7 +30,7 @@ from leap.soledad.client import Soledad
from leap.soledad.common.couch import CouchDatabase, CouchServerState
from leap.soledad.common.tests.test_couch import CouchDBTestCase
from leap.soledad.common.tests.u1db_tests import TestCaseWithServer
-from leap.soledad.common.tests.test_target import (
+from leap.soledad.common.tests.test_sync_target import (
make_token_soledad_app,
make_leap_document_for_test,
token_leap_sync_target,
@@ -224,9 +222,9 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
#
# Concurrency tests
#
-
+
class _WorkerThread(threading.Thread):
-
+
def __init__(self, params, run_method):
threading.Thread.__init__(self)
self._params = params
@@ -260,7 +258,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
for thread in threads:
thread.join()
-
+
# assert length of transaction_log
transaction_log = self.db._get_transaction_log()
self.assertEqual(
@@ -341,7 +339,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
# wait for threads to finish
for thread in threads:
thread.join()
-
+
# do the sync!
sol.sync()
diff --git a/common/src/leap/soledad/common/tests/test_crypto.py b/common/src/leap/soledad/common/tests/test_crypto.py
index 4b2470ba..1071af14 100644
--- a/common/src/leap/soledad/common/tests/test_crypto.py
+++ b/common/src/leap/soledad/common/tests/test_crypto.py
@@ -14,37 +14,17 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
Tests for cryptographic related stuff.
"""
-
import os
-import shutil
-import tempfile
-import simplejson as json
import hashlib
import binascii
-
-from leap.common.testing.basetest import BaseLeapTest
-from leap.soledad.client import (
- Soledad,
- crypto,
- target,
-)
+from leap.soledad.client import crypto
from leap.soledad.common.document import SoledadDocument
-from leap.soledad.common.tests import (
- BaseSoledadTest,
- KEY_FINGERPRINT,
- PRIVATE_KEY,
-)
+from leap.soledad.common.tests import BaseSoledadTest
from leap.soledad.common.crypto import WrongMac, UnknownMacMethod
-from leap.soledad.common.tests.u1db_tests import (
- simple_doc,
- nested_doc,
-)
class EncryptedSyncTestCase(BaseSoledadTest):
@@ -59,16 +39,17 @@ class EncryptedSyncTestCase(BaseSoledadTest):
simpledoc = {'key': 'val'}
doc1 = SoledadDocument(doc_id='id')
doc1.content = simpledoc
+
# encrypt doc
- doc1.set_json(target.encrypt_doc(self._soledad._crypto, doc1))
+ doc1.set_json(crypto.encrypt_doc(self._soledad._crypto, doc1))
# assert content is different and includes keys
self.assertNotEqual(
simpledoc, doc1.content,
'incorrect document encryption')
- self.assertTrue(target.ENC_JSON_KEY in doc1.content)
- self.assertTrue(target.ENC_SCHEME_KEY in doc1.content)
+ self.assertTrue(crypto.ENC_JSON_KEY in doc1.content)
+ self.assertTrue(crypto.ENC_SCHEME_KEY in doc1.content)
# decrypt doc
- doc1.set_json(target.decrypt_doc(self._soledad._crypto, doc1))
+ doc1.set_json(crypto.decrypt_doc(self._soledad._crypto, doc1))
self.assertEqual(
simpledoc, doc1.content, 'incorrect document encryption')
@@ -159,15 +140,15 @@ class MacAuthTestCase(BaseSoledadTest):
doc = SoledadDocument(doc_id='id')
doc.content = simpledoc
# encrypt doc
- doc.set_json(target.encrypt_doc(self._soledad._crypto, doc))
- self.assertTrue(target.MAC_KEY in doc.content)
- self.assertTrue(target.MAC_METHOD_KEY in doc.content)
+ doc.set_json(crypto.encrypt_doc(self._soledad._crypto, doc))
+ self.assertTrue(crypto.MAC_KEY in doc.content)
+ self.assertTrue(crypto.MAC_METHOD_KEY in doc.content)
# mess with MAC
- doc.content[target.MAC_KEY] = '1234567890ABCDEF'
+ doc.content[crypto.MAC_KEY] = '1234567890ABCDEF'
# try to decrypt doc
self.assertRaises(
WrongMac,
- target.decrypt_doc, self._soledad._crypto, doc)
+ crypto.decrypt_doc, self._soledad._crypto, doc)
def test_decrypt_with_unknown_mac_method_raises(self):
"""
@@ -177,15 +158,15 @@ class MacAuthTestCase(BaseSoledadTest):
doc = SoledadDocument(doc_id='id')
doc.content = simpledoc
# encrypt doc
- doc.set_json(target.encrypt_doc(self._soledad._crypto, doc))
- self.assertTrue(target.MAC_KEY in doc.content)
- self.assertTrue(target.MAC_METHOD_KEY in doc.content)
+ doc.set_json(crypto.encrypt_doc(self._soledad._crypto, doc))
+ self.assertTrue(crypto.MAC_KEY in doc.content)
+ self.assertTrue(crypto.MAC_METHOD_KEY in doc.content)
# mess with MAC method
- doc.content[target.MAC_METHOD_KEY] = 'mymac'
+ doc.content[crypto.MAC_METHOD_KEY] = 'mymac'
# try to decrypt doc
self.assertRaises(
UnknownMacMethod,
- target.decrypt_doc, self._soledad._crypto, doc)
+ crypto.decrypt_doc, self._soledad._crypto, doc)
class SoledadCryptoAESTestCase(BaseSoledadTest):
diff --git a/common/src/leap/soledad/common/tests/test_http.py b/common/src/leap/soledad/common/tests/test_http.py
new file mode 100644
index 00000000..d21470e0
--- /dev/null
+++ b/common/src/leap/soledad/common/tests/test_http.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+# test_http.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: test http database
+"""
+from u1db.remote import http_database
+
+from leap.soledad.client import auth
+
+from leap.soledad.common.tests import u1db_tests as tests
+from leap.soledad.common.tests.u1db_tests import test_http_database
+
+
+#-----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_http_database`.
+#-----------------------------------------------------------------------------
+
+class _HTTPDatabase(http_database.HTTPDatabase, auth.TokenBasedAuth):
+ """
+ Wraps our token auth implementation.
+ """
+
+ def set_token_credentials(self, uuid, token):
+ auth.TokenBasedAuth.set_token_credentials(self, uuid, token)
+
+ def _sign_request(self, method, url_query, params):
+ return auth.TokenBasedAuth._sign_request(
+ self, method, url_query, params)
+
+
+class TestHTTPDatabaseWithCreds(
+ test_http_database.TestHTTPDatabaseCtrWithCreds):
+
+ def test_get_sync_target_inherits_token_credentials(self):
+ # this test was from TestDatabaseSimpleOperations but we put it here
+ # for convenience.
+ self.db = _HTTPDatabase('dbase')
+ self.db.set_token_credentials('user-uuid', 'auth-token')
+ st = self.db.get_sync_target()
+ self.assertEqual(self.db._creds, st._creds)
+
+ def test_ctr_with_creds(self):
+ db1 = _HTTPDatabase('http://dbs/db', creds={'token': {
+ 'uuid': 'user-uuid',
+ 'token': 'auth-token',
+ }})
+ self.assertIn('token', db1._creds)
+
+
+load_tests = tests.load_with_scenarios
diff --git a/common/src/leap/soledad/common/tests/test_http_client.py b/common/src/leap/soledad/common/tests/test_http_client.py
new file mode 100644
index 00000000..3169398b
--- /dev/null
+++ b/common/src/leap/soledad/common/tests/test_http_client.py
@@ -0,0 +1,116 @@
+# -*- coding: utf-8 -*-
+# test_http_client.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: sync target
+"""
+import json
+
+from u1db.remote import http_client
+
+from leap.soledad.client import auth
+from leap.soledad.common.tests import u1db_tests as tests
+from leap.soledad.common.tests.u1db_tests import test_http_client
+from leap.soledad.server.auth import SoledadTokenAuthMiddleware
+
+
+#-----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_http_client`.
+#-----------------------------------------------------------------------------
+
+class TestSoledadClientBase(test_http_client.TestHTTPClientBase):
+ """
+ This class should be used to test Token auth.
+ """
+
+ def getClientWithToken(self, **kwds):
+ self.startServer()
+
+ class _HTTPClientWithToken(
+ http_client.HTTPClientBase, auth.TokenBasedAuth):
+
+ def set_token_credentials(self, uuid, token):
+ auth.TokenBasedAuth.set_token_credentials(self, uuid, token)
+
+ def _sign_request(self, method, url_query, params):
+ return auth.TokenBasedAuth._sign_request(
+ self, method, url_query, params)
+
+ return _HTTPClientWithToken(self.getURL('dbase'), **kwds)
+
+ def test_oauth(self):
+ """
+ Suppress oauth test (we test for token auth here).
+ """
+ pass
+
+ def test_oauth_ctr_creds(self):
+ """
+ Suppress oauth test (we test for token auth here).
+ """
+ pass
+
+ def test_oauth_Unauthorized(self):
+ """
+ Suppress oauth test (we test for token auth here).
+ """
+ pass
+
+ def app(self, environ, start_response):
+ res = test_http_client.TestHTTPClientBase.app(
+ self, environ, start_response)
+ if res is not None:
+ return res
+ # mime solead application here.
+ if '/token' in environ['PATH_INFO']:
+ auth = environ.get(SoledadTokenAuthMiddleware.HTTP_AUTH_KEY)
+ if not auth:
+ start_response("401 Unauthorized",
+ [('Content-Type', 'application/json')])
+ return [json.dumps({"error": "unauthorized",
+ "message": e.message})]
+ scheme, encoded = auth.split(None, 1)
+ if scheme.lower() != 'token':
+ start_response("401 Unauthorized",
+ [('Content-Type', 'application/json')])
+ return [json.dumps({"error": "unauthorized",
+ "message": e.message})]
+ uuid, token = encoded.decode('base64').split(':', 1)
+ if uuid != 'user-uuid' and token != 'auth-token':
+ return unauth_err("Incorrect address or token.")
+ start_response("200 OK", [('Content-Type', 'application/json')])
+ return [json.dumps([environ['PATH_INFO'], uuid, token])]
+
+ def test_token(self):
+ """
+ Test if token is sent correctly.
+ """
+ cli = self.getClientWithToken()
+ cli.set_token_credentials('user-uuid', 'auth-token')
+ res, headers = cli._request('GET', ['doc', 'token'])
+ self.assertEqual(
+ ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res))
+
+ def test_token_ctr_creds(self):
+ cli = self.getClientWithToken(creds={'token': {
+ 'uuid': 'user-uuid',
+ 'token': 'auth-token',
+ }})
+ res, headers = cli._request('GET', ['doc', 'token'])
+ self.assertEqual(
+ ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res))
+
+load_tests = tests.load_with_scenarios
diff --git a/common/src/leap/soledad/common/tests/test_https.py b/common/src/leap/soledad/common/tests/test_https.py
new file mode 100644
index 00000000..b6288188
--- /dev/null
+++ b/common/src/leap/soledad/common/tests/test_https.py
@@ -0,0 +1,108 @@
+# -*- coding: utf-8 -*-
+# test_sync_target.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: https
+"""
+from leap.soledad.common.tests import BaseSoledadTest
+from leap.soledad.common.tests import test_sync_target as test_st
+from leap.soledad.common.tests import u1db_tests as tests
+from leap.soledad.common.tests.u1db_tests import test_backends
+from leap.soledad.common.tests.u1db_tests import test_https
+
+from leap.soledad import client
+from leap.soledad.server import SoledadApp
+
+from u1db.remote import http_client
+
+
+def make_soledad_app(state):
+ return SoledadApp(state)
+
+LEAP_SCENARIOS = [
+ ('http', {
+ 'make_database_for_test': test_backends.make_http_database_for_test,
+ 'copy_database_for_test': test_backends.copy_http_database_for_test,
+ 'make_document_for_test': test_st.make_leap_document_for_test,
+ 'make_app_with_state': test_st.make_soledad_app}),
+]
+
+
+#-----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_https`.
+#-----------------------------------------------------------------------------
+
+def token_leap_https_sync_target(test, host, path):
+ _, port = test.server.server_address
+ st = client.target.SoledadSyncTarget(
+ 'https://%s:%d/%s' % (host, port, path),
+ crypto=test._soledad._crypto)
+ st.set_token_credentials('user-uuid', 'auth-token')
+ return st
+
+
+class TestSoledadSyncTargetHttpsSupport(
+ test_https.TestHttpSyncTargetHttpsSupport,
+ BaseSoledadTest):
+
+ scenarios = [
+ ('token_soledad_https',
+ {'server_def': test_https.https_server_def,
+ 'make_app_with_state': test_st.make_token_soledad_app,
+ 'make_document_for_test': test_st.make_leap_document_for_test,
+ 'sync_target': token_leap_https_sync_target}),
+ ]
+
+ def setUp(self):
+ # the parent constructor undoes our SSL monkey patch to ensure tests
+ # run smoothly with standard u1db.
+ test_https.TestHttpSyncTargetHttpsSupport.setUp(self)
+ # so here monkey patch again to test our functionality.
+ http_client._VerifiedHTTPSConnection = client.VerifiedHTTPSConnection
+ client.SOLEDAD_CERT = http_client.CA_CERTS
+
+ def test_working(self):
+ """
+ Test that SSL connections work well.
+
+ This test was adapted to patch Soledad's HTTPS connection custom class
+ with the intended CA certificates.
+ """
+ self.startServer()
+ db = self.request_state._create_database('test')
+ self.patch(client, 'SOLEDAD_CERT', self.cacert_pem)
+ remote_target = self.getSyncTarget('localhost', 'test')
+ remote_target.record_sync_info('other-id', 2, 'T-id')
+ self.assertEqual(
+ (2, 'T-id'), db._get_replica_gen_and_trans_id('other-id'))
+
+ def test_host_mismatch(self):
+ """
+ Test that SSL connections to a hostname different than the one in the
+ certificate raise CertificateError.
+
+ This test was adapted to patch Soledad's HTTPS connection custom class
+ with the intended CA certificates.
+ """
+ self.startServer()
+ self.request_state._create_database('test')
+ self.patch(client, 'SOLEDAD_CERT', self.cacert_pem)
+ remote_target = self.getSyncTarget('127.0.0.1', 'test')
+ self.assertRaises(
+ http_client.CertificateError, remote_target.record_sync_info,
+ 'other-id', 2, 'T-id')
+
+load_tests = tests.load_with_scenarios
diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py
index 1c5a7407..cb5348b4 100644
--- a/common/src/leap/soledad/common/tests/test_server.py
+++ b/common/src/leap/soledad/common/tests/test_server.py
@@ -14,12 +14,9 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
Tests for server-related functionality.
"""
-
import os
import tempfile
import simplejson as json
@@ -39,16 +36,13 @@ from leap.soledad.common.tests.u1db_tests import (
simple_doc,
)
from leap.soledad.common.tests.test_couch import CouchDBTestCase
-from leap.soledad.common.tests.test_target import (
+from leap.soledad.common.tests.test_target_soledad import (
make_token_soledad_app,
make_leap_document_for_test,
- token_leap_sync_target,
)
-from leap.soledad.client import (
- Soledad,
- target,
-)
-from leap.soledad.server import SoledadApp, LockResource
+from leap.soledad.common.tests.test_sync_target import token_leap_sync_target
+from leap.soledad.client import Soledad, crypto
+from leap.soledad.server import LockResource
from leap.soledad.server.auth import URLToAuthorization
@@ -369,12 +363,12 @@ class EncryptedSyncTestCase(
self.assertEqual(doc1.doc_id, couchdoc.doc_id)
self.assertEqual(doc1.rev, couchdoc.rev)
self.assertEqual(6, len(couchdoc.content))
- self.assertTrue(target.ENC_JSON_KEY in couchdoc.content)
- self.assertTrue(target.ENC_SCHEME_KEY in couchdoc.content)
- self.assertTrue(target.ENC_METHOD_KEY in couchdoc.content)
- self.assertTrue(target.ENC_IV_KEY in couchdoc.content)
- self.assertTrue(target.MAC_KEY in couchdoc.content)
- self.assertTrue(target.MAC_METHOD_KEY in couchdoc.content)
+ self.assertTrue(crypto.ENC_JSON_KEY in couchdoc.content)
+ self.assertTrue(crypto.ENC_SCHEME_KEY in couchdoc.content)
+ self.assertTrue(crypto.ENC_METHOD_KEY in couchdoc.content)
+ self.assertTrue(crypto.ENC_IV_KEY in couchdoc.content)
+ self.assertTrue(crypto.MAC_KEY in couchdoc.content)
+ self.assertTrue(crypto.MAC_METHOD_KEY in couchdoc.content)
# instantiate soledad with empty db, but with same secrets path
sol2 = self._soledad_instance(prefix='x', auth_token='auth-token')
_, doclist = sol2.get_all_docs()
@@ -427,12 +421,12 @@ class EncryptedSyncTestCase(
self.assertEqual(doc1.doc_id, couchdoc.doc_id)
self.assertEqual(doc1.rev, couchdoc.rev)
self.assertEqual(6, len(couchdoc.content))
- self.assertTrue(target.ENC_JSON_KEY in couchdoc.content)
- self.assertTrue(target.ENC_SCHEME_KEY in couchdoc.content)
- self.assertTrue(target.ENC_METHOD_KEY in couchdoc.content)
- self.assertTrue(target.ENC_IV_KEY in couchdoc.content)
- self.assertTrue(target.MAC_KEY in couchdoc.content)
- self.assertTrue(target.MAC_METHOD_KEY in couchdoc.content)
+ self.assertTrue(crypto.ENC_JSON_KEY in couchdoc.content)
+ self.assertTrue(crypto.ENC_SCHEME_KEY in couchdoc.content)
+ self.assertTrue(crypto.ENC_METHOD_KEY in couchdoc.content)
+ self.assertTrue(crypto.ENC_IV_KEY in couchdoc.content)
+ self.assertTrue(crypto.MAC_KEY in couchdoc.content)
+ self.assertTrue(crypto.MAC_METHOD_KEY in couchdoc.content)
# instantiate soledad with empty db, but with same secrets path
sol2 = self._soledad_instance(
prefix='x',
@@ -502,7 +496,6 @@ class EncryptedSyncTestCase(
sol1.close()
sol2.close()
-
def test_sync_many_small_files(self):
"""
Test if Soledad can sync many smallfiles.
@@ -548,6 +541,7 @@ class EncryptedSyncTestCase(
sol1.close()
sol2.close()
+
class LockResourceTestCase(
CouchDBTestCase, TestCaseWithServer):
"""
diff --git a/common/src/leap/soledad/common/tests/test_soledad.py b/common/src/leap/soledad/common/tests/test_soledad.py
index 5a3bf2b0..11e43423 100644
--- a/common/src/leap/soledad/common/tests/test_soledad.py
+++ b/common/src/leap/soledad/common/tests/test_soledad.py
@@ -14,18 +14,13 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
Tests for general Soledad functionality.
"""
-
-
import os
from mock import Mock
-from pysqlcipher.dbapi2 import DatabaseError
from leap.common.events import events_pb2 as proto
from leap.soledad.common.tests import (
BaseSoledadTest,
diff --git a/common/src/leap/soledad/common/tests/test_soledad_doc.py b/common/src/leap/soledad/common/tests/test_soledad_doc.py
new file mode 100644
index 00000000..0952de6d
--- /dev/null
+++ b/common/src/leap/soledad/common/tests/test_soledad_doc.py
@@ -0,0 +1,44 @@
+# -*- coding: utf-8 -*-
+# test_soledad_doc.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: soledad docs
+"""
+from leap.soledad.common.tests import BaseSoledadTest
+from leap.soledad.common.tests.u1db_tests import test_document
+from leap.soledad.common.tests import u1db_tests as tests
+from leap.soledad.common.tests import test_sync_target as st
+
+#-----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_document`.
+#-----------------------------------------------------------------------------
+
+
+class TestSoledadDocument(test_document.TestDocument, BaseSoledadTest):
+
+ scenarios = ([(
+ 'leap', {
+ 'make_document_for_test': st.make_leap_document_for_test})])
+
+
+class TestSoledadPyDocument(test_document.TestPyDocument, BaseSoledadTest):
+
+ scenarios = ([(
+ 'leap', {
+ 'make_document_for_test': st.make_leap_document_for_test})])
+
+
+load_tests = tests.load_with_scenarios
diff --git a/common/src/leap/soledad/common/tests/test_sqlcipher.py b/common/src/leap/soledad/common/tests/test_sqlcipher.py
index 891aca0f..595966ec 100644
--- a/common/src/leap/soledad/common/tests/test_sqlcipher.py
+++ b/common/src/leap/soledad/common/tests/test_sqlcipher.py
@@ -14,16 +14,11 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
Test sqlcipher backend internals.
"""
-
-
import os
import time
-import unittest
import simplejson as json
import threading
@@ -50,15 +45,9 @@ from leap.soledad.client.sqlcipher import (
DatabaseIsNotEncrypted,
open as u1db_open,
)
-from leap.soledad.common.crypto import (
- EncryptionSchemes,
- ENC_JSON_KEY,
- ENC_SCHEME_KEY,
-)
-from leap.soledad.client.target import (
- decrypt_doc,
- SoledadSyncTarget,
-)
+from leap.soledad.client.target import SoledadSyncTarget
+from leap.soledad.common.crypto import ENC_SCHEME_KEY
+from leap.soledad.client.crypto import decrypt_doc_dict
# u1db tests stuff.
@@ -269,6 +258,7 @@ class TestSQLCipherPartialExpandDatabase(
db = SQLCipherDatabase.__new__(
SQLCipherDatabase)
db._db_handle = dbapi2.connect(path) # db is there but not yet init-ed
+ db._syncers = {}
c = db._db_handle.cursor()
c.execute('PRAGMA key="%s"' % PASSWORD)
self.addCleanup(db.close)
@@ -614,7 +604,12 @@ class SQLCipherDatabaseSyncTests(
self.sync(self.db2, db3)
doc3 = db3.get_doc('the-doc')
if ENC_SCHEME_KEY in doc3.content:
- doc3.set_json(decrypt_doc(self._soledad._crypto, doc3))
+ _crypto = self._soledad._crypto
+ key = _crypto.doc_passphrase(doc3.doc_id)
+ secret = _crypto.secret
+ doc3.set_json(decrypt_doc_dict(
+ doc3.content,
+ doc3.doc_id, doc3.rev, key, secret))
self.assertEqual(doc4.get_json(), doc3.get_json())
self.assertFalse(doc3.has_conflicts)
@@ -796,7 +791,7 @@ class SQLCipherEncryptionTest(BaseLeapTest):
# trying to open the a non-encrypted database with sqlcipher
# backend should raise a DatabaseIsNotEncrypted exception.
SQLCipherDatabase(self.DB_FILE, PASSWORD)
- raise db1pi2.DatabaseError(
+ raise dbapi2.DatabaseError(
"SQLCipher backend should not be able to open non-encrypted "
"dbs.")
except DatabaseIsNotEncrypted:
diff --git a/common/src/leap/soledad/common/tests/test_sync.py b/common/src/leap/soledad/common/tests/test_sync.py
index fd4a2797..0433fac9 100644
--- a/common/src/leap/soledad/common/tests/test_sync.py
+++ b/common/src/leap/soledad/common/tests/test_sync.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# test_sync.py
-# Copyright (C) 2014 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -24,26 +24,31 @@ import threading
import time
from urlparse import urljoin
-from leap.soledad.common.couch import (
- CouchServerState,
- CouchDatabase,
-)
+from leap.soledad.common import couch
+from leap.soledad.common.tests import BaseSoledadTest
+from leap.soledad.common.tests import test_sync_target
+from leap.soledad.common.tests import u1db_tests as tests
from leap.soledad.common.tests.u1db_tests import (
TestCaseWithServer,
simple_doc,
+ test_backends,
+ test_sync
)
from leap.soledad.common.tests.test_couch import CouchDBTestCase
-from leap.soledad.common.tests.test_target import (
+from leap.soledad.common.tests.test_target_soledad import (
make_token_soledad_app,
make_leap_document_for_test,
- token_leap_sync_target,
)
-
+from leap.soledad.common.tests.test_sync_target import token_leap_sync_target
from leap.soledad.client import (
Soledad,
target,
)
+from leap.soledad.common.tests.util import SoledadWithCouchServerMixin
+from leap.soledad.client.sync import SoledadSynchronizer
+from leap.soledad.server import SoledadApp
+
class InterruptableSyncTestCase(
@@ -99,8 +104,8 @@ class InterruptableSyncTestCase(
secret_id=secret_id)
def make_app(self):
- self.request_state = CouchServerState(self._couch_url, 'shared',
- 'tokens')
+ self.request_state = couch.CouchServerState(
+ self._couch_url, 'shared', 'tokens')
return self.make_app_with_state(self.request_state)
def setUp(self):
@@ -150,7 +155,7 @@ class InterruptableSyncTestCase(
sol.create_doc(json.loads(simple_doc))
# ensure remote db exists before syncing
- db = CouchDatabase.open_database(
+ db = couch.CouchDatabase.open_database(
urljoin(self._couch_url, 'user-user-uuid'),
create=True,
ensure_ddocs=True)
@@ -174,3 +179,114 @@ class InterruptableSyncTestCase(
db.delete_database()
db.close()
sol.close()
+
+
+def make_soledad_app(state):
+ return SoledadApp(state)
+
+
+class TestSoledadDbSync(
+ SoledadWithCouchServerMixin,
+ test_sync.TestDbSync):
+ """
+ Test db.sync remote sync shortcut
+ """
+
+ scenarios = [
+ ('py-http', {
+ 'make_app_with_state': make_soledad_app,
+ 'make_database_for_test': tests.make_memory_database_for_test,
+ }),
+ ('py-token-http', {
+ 'make_app_with_state': test_sync_target.make_token_soledad_app,
+ 'make_database_for_test': tests.make_memory_database_for_test,
+ 'token': True
+ }),
+ ]
+
+ oauth = False
+ token = False
+
+ def setUp(self):
+ """
+ Need to explicitely invoke inicialization on all bases.
+ """
+ tests.TestCaseWithServer.setUp(self)
+ self.main_test_class = test_sync.TestDbSync
+ SoledadWithCouchServerMixin.setUp(self)
+ self.startServer()
+ self.db2 = couch.CouchDatabase.open_database(
+ urljoin(
+ 'http://localhost:' + str(self.wrapper.port), 'test'),
+ create=True,
+ ensure_ddocs=True)
+
+ def tearDown(self):
+ """
+ Need to explicitely invoke destruction on all bases.
+ """
+ self.db2.delete_database()
+ SoledadWithCouchServerMixin.tearDown(self)
+ tests.TestCaseWithServer.tearDown(self)
+
+ def do_sync(self, target_name):
+ """
+ Perform sync using SoledadSynchronizer, SoledadSyncTarget
+ and Token auth.
+ """
+ extra = {}
+ extra = dict(creds={'token': {
+ 'uuid': 'user-uuid',
+ 'token': 'auth-token',
+ }})
+ target_url = self.getURL(target_name)
+ return SoledadSynchronizer(
+ self.db,
+ target.SoledadSyncTarget(
+ target_url,
+ crypto=self._soledad._crypto,
+ **extra)).sync(autocreate=True,
+ defer_decryption=False)
+
+ def test_db_sync(self):
+ """
+ Test sync.
+
+ Adapted to check for encrypted content.
+ """
+ doc1 = self.db.create_doc_from_json(tests.simple_doc)
+ doc2 = self.db2.create_doc_from_json(tests.nested_doc)
+ local_gen_before_sync = self.do_sync('test')
+ gen, _, changes = self.db.whats_changed(local_gen_before_sync)
+ self.assertEqual(1, len(changes))
+ self.assertEqual(doc2.doc_id, changes[0][0])
+ self.assertEqual(1, gen - local_gen_before_sync)
+ self.assertGetEncryptedDoc(
+ self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False)
+ self.assertGetEncryptedDoc(
+ self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False)
+
+ def test_db_sync_autocreate(self):
+ """
+ Test sync.
+
+ Adapted to check for encrypted content.
+ """
+ doc1 = self.db.create_doc_from_json(tests.simple_doc)
+ local_gen_before_sync = self.do_sync('test')
+ gen, _, changes = self.db.whats_changed(local_gen_before_sync)
+ self.assertEqual(0, gen - local_gen_before_sync)
+ db3 = self.request_state.open_database('test')
+ gen, _, changes = db3.whats_changed()
+ self.assertEqual(1, len(changes))
+ self.assertEqual(doc1.doc_id, changes[0][0])
+ self.assertGetEncryptedDoc(
+ db3, doc1.doc_id, doc1.rev, tests.simple_doc, False)
+ t_gen, _ = self.db._get_replica_gen_and_trans_id(
+ db3.replica_uid)
+ s_gen, _ = db3._get_replica_gen_and_trans_id('test1')
+ self.assertEqual(1, t_gen)
+ self.assertEqual(1, s_gen)
+
+
+load_tests = tests.load_with_scenarios
diff --git a/common/src/leap/soledad/common/tests/test_sync_deferred.py b/common/src/leap/soledad/common/tests/test_sync_deferred.py
new file mode 100644
index 00000000..48e3150f
--- /dev/null
+++ b/common/src/leap/soledad/common/tests/test_sync_deferred.py
@@ -0,0 +1,227 @@
+# -*- coding: utf-8 -*-
+# test_sync_deferred.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: sync with deferred encryption/decryption.
+"""
+import time
+import os
+import random
+import string
+from urlparse import urljoin
+
+from leap.soledad.common.tests import u1db_tests as tests, ADDRESS
+from leap.soledad.common.tests.u1db_tests import test_sync
+
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.common import couch
+from leap.soledad.client import target
+from leap.soledad.client.sync import SoledadSynchronizer
+
+# Just to make clear how this test is different... :)
+DEFER_DECRYPTION = True
+
+WAIT_STEP = 1
+MAX_WAIT = 10
+
+from leap.soledad.common.tests import test_sqlcipher as ts
+from leap.soledad.server import SoledadApp
+
+
+from leap.soledad.client.sqlcipher import open as open_sqlcipher
+from leap.soledad.common.tests.util import SoledadWithCouchServerMixin
+from leap.soledad.common.tests.util import make_soledad_app
+
+
+DBPASS = "pass"
+
+
+class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin):
+ """
+ Another base class for testing the deferred encryption/decryption during
+ the syncs, using the intermediate database.
+ """
+ defer_sync_encryption = True
+
+ def setUp(self):
+ # config info
+ self.db1_file = os.path.join(self.tempdir, "db1.u1db")
+ self.db_pass = DBPASS
+ self.email = ADDRESS
+
+ # get a random prefix for each test, so we do not mess with
+ # concurrency during initialization and shutting down of
+ # each local db.
+ self.rand_prefix = ''.join(
+ map(lambda x: random.choice(string.ascii_letters), range(6)))
+ # initialize soledad by hand so we can control keys
+ self._soledad = self._soledad_instance(
+ prefix=self.rand_prefix, user=self.email)
+
+ # open test dbs: db1 will be the local sqlcipher db
+ # (which instantiates a syncdb)
+ self.db1 = open_sqlcipher(self.db1_file, DBPASS, create=True,
+ document_factory=SoledadDocument,
+ crypto=self._soledad._crypto,
+ defer_encryption=True)
+ self.db2 = couch.CouchDatabase.open_database(
+ urljoin(
+ 'http://localhost:' + str(self.wrapper.port), 'test'),
+ create=True,
+ ensure_ddocs=True)
+
+ def tearDown(self):
+ self.db1.close()
+ self.db2.close()
+ self._soledad.close()
+
+ # XXX should not access "private" attrs
+ for f in [self._soledad._local_db_path,
+ self._soledad._secrets_path,
+ self.db1._sync_db_path]:
+ if os.path.isfile(f):
+ os.unlink(f)
+
+
+#SQLCIPHER_SCENARIOS = [
+# ('http', {
+# #'make_app_with_state': test_sync_target.make_token_soledad_app,
+# 'make_app_with_state': make_soledad_app,
+# 'make_database_for_test': ts.make_sqlcipher_database_for_test,
+# 'copy_database_for_test': ts.copy_sqlcipher_database_for_test,
+# 'make_document_for_test': ts.make_document_for_test,
+# 'token': True
+# }),
+#]
+
+
+class SyncTimeoutError(Exception):
+ """
+ Dummy exception to notify timeout during sync.
+ """
+ pass
+
+
+class TestSoledadDbSyncDeferredEncDecr(
+ BaseSoledadDeferredEncTest,
+ test_sync.TestDbSync):
+ """
+ Test db.sync remote sync shortcut.
+ Case with deferred encryption and decryption: using the intermediate
+ syncdb.
+ """
+
+ scenarios = [
+ ('http', {
+ 'make_app_with_state': make_soledad_app,
+ 'make_database_for_test': tests.make_memory_database_for_test,
+ }),
+ ]
+
+ oauth = False
+ token = True
+
+ def setUp(self):
+ """
+ Need to explicitely invoke inicialization on all bases.
+ """
+ tests.TestCaseWithServer.setUp(self)
+ self.main_test_class = test_sync.TestDbSync
+ BaseSoledadDeferredEncTest.setUp(self)
+ self.startServer()
+ self.syncer = None
+
+ def tearDown(self):
+ """
+ Need to explicitely invoke destruction on all bases.
+ """
+ BaseSoledadDeferredEncTest.tearDown(self)
+ tests.TestCaseWithServer.tearDown(self)
+
+ def do_sync(self, target_name):
+ """
+ Perform sync using SoledadSynchronizer, SoledadSyncTarget
+ and Token auth.
+ """
+ if self.token:
+ extra = dict(creds={'token': {
+ 'uuid': 'user-uuid',
+ 'token': 'auth-token',
+ }})
+ target_url = self.getURL(target_name)
+ syncdb = getattr(self.db1, "_sync_db", None)
+
+ syncer = SoledadSynchronizer(
+ self.db1,
+ target.SoledadSyncTarget(
+ target_url,
+ crypto=self._soledad._crypto,
+ sync_db=syncdb,
+ **extra))
+ # Keep a reference to be able to know when the sync
+ # has finished.
+ self.syncer = syncer
+ return syncer.sync(
+ autocreate=True, defer_decryption=DEFER_DECRYPTION)
+ else:
+ return test_sync.TestDbSync.do_sync(self, target_name)
+
+ def wait_for_sync(self):
+ """
+ Wait for sync to finish.
+ """
+ wait = 0
+ syncer = self.syncer
+ if syncer is not None:
+ while syncer.syncing:
+ time.sleep(WAIT_STEP)
+ wait += WAIT_STEP
+ if wait >= MAX_WAIT:
+ raise SyncTimeoutError
+
+ def test_db_sync(self):
+ """
+ Test sync.
+
+ Adapted to check for encrypted content.
+ """
+ doc1 = self.db1.create_doc_from_json(tests.simple_doc)
+ doc2 = self.db2.create_doc_from_json(tests.nested_doc)
+
+ import time
+ # need to give time to the encryption to proceed
+ # TODO should implement a defer list to subscribe to the all-decrypted
+ # event
+ time.sleep(2)
+
+ local_gen_before_sync = self.do_sync('test')
+ self.wait_for_sync()
+
+ gen, _, changes = self.db1.whats_changed(local_gen_before_sync)
+ self.assertEqual(1, len(changes))
+
+ self.assertEqual(doc2.doc_id, changes[0][0])
+ self.assertEqual(1, gen - local_gen_before_sync)
+
+ self.assertGetEncryptedDoc(
+ self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False)
+ self.assertGetEncryptedDoc(
+ self.db1, doc2.doc_id, doc2.rev, tests.nested_doc, False)
+
+ def test_db_sync_autocreate(self):
+ pass
+
+load_tests = tests.load_with_scenarios
diff --git a/common/src/leap/soledad/common/tests/test_sync_target.py b/common/src/leap/soledad/common/tests/test_sync_target.py
new file mode 100644
index 00000000..edc4589b
--- /dev/null
+++ b/common/src/leap/soledad/common/tests/test_sync_target.py
@@ -0,0 +1,589 @@
+# -*- coding: utf-8 -*-
+# test_sync_target.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: sync target
+"""
+import cStringIO
+import os
+
+import simplejson as json
+import u1db
+
+from uuid import uuid4
+
+from u1db.remote import http_database
+
+from u1db import SyncTarget
+from u1db.sync import Synchronizer
+from u1db.remote import (
+ http_client,
+ http_database,
+ http_target,
+)
+
+from leap.soledad import client
+from leap.soledad.client import (
+ target,
+ auth,
+ crypto,
+ VerifiedHTTPSConnection,
+ sync,
+)
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.server.auth import SoledadTokenAuthMiddleware
+
+
+from leap.soledad.common.tests import u1db_tests as tests
+from leap.soledad.common.tests import BaseSoledadTest
+from leap.soledad.common.tests.util import (
+ make_sqlcipher_database_for_test,
+ make_soledad_app,
+ make_token_soledad_app,
+ SoledadWithCouchServerMixin,
+)
+from leap.soledad.common.tests.u1db_tests import test_backends
+from leap.soledad.common.tests.u1db_tests import test_remote_sync_target
+from leap.soledad.common.tests.u1db_tests import test_sync
+from leap.soledad.common.tests.test_couch import (
+ CouchDBTestCase,
+ CouchDBWrapper,
+)
+
+from leap.soledad.server import SoledadApp
+from leap.soledad.server.auth import SoledadTokenAuthMiddleware
+
+
+#-----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_backends`.
+#-----------------------------------------------------------------------------
+
+def make_leap_document_for_test(test, doc_id, rev, content,
+ has_conflicts=False):
+ return SoledadDocument(
+ doc_id, rev, content, has_conflicts=has_conflicts)
+
+
+LEAP_SCENARIOS = [
+ ('http', {
+ 'make_database_for_test': test_backends.make_http_database_for_test,
+ 'copy_database_for_test': test_backends.copy_http_database_for_test,
+ 'make_document_for_test': make_leap_document_for_test,
+ 'make_app_with_state': make_soledad_app}),
+]
+
+
+def make_token_http_database_for_test(test, replica_uid):
+ test.startServer()
+ test.request_state._create_database(replica_uid)
+
+ class _HTTPDatabaseWithToken(
+ http_database.HTTPDatabase, auth.TokenBasedAuth):
+
+ def set_token_credentials(self, uuid, token):
+ auth.TokenBasedAuth.set_token_credentials(self, uuid, token)
+
+ def _sign_request(self, method, url_query, params):
+ return auth.TokenBasedAuth._sign_request(
+ self, method, url_query, params)
+
+ http_db = _HTTPDatabaseWithToken(test.getURL('test'))
+ http_db.set_token_credentials('user-uuid', 'auth-token')
+ return http_db
+
+
+def copy_token_http_database_for_test(test, db):
+ # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS
+ # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE
+ # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN
+ # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR
+ # HOUSE.
+ http_db = test.request_state._copy_database(db)
+ http_db.set_token_credentials(http_db, 'user-uuid', 'auth-token')
+ return http_db
+
+
+#-----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_remote_sync_target`.
+#-----------------------------------------------------------------------------
+
+class TestSoledadSyncTargetBasics(
+ test_remote_sync_target.TestHTTPSyncTargetBasics):
+ """
+ Some tests had to be copied to this class so we can instantiate our own
+ target.
+ """
+
+ def test_parse_url(self):
+ remote_target = target.SoledadSyncTarget('http://127.0.0.1:12345/')
+ self.assertEqual('http', remote_target._url.scheme)
+ self.assertEqual('127.0.0.1', remote_target._url.hostname)
+ self.assertEqual(12345, remote_target._url.port)
+ self.assertEqual('/', remote_target._url.path)
+
+
+class TestSoledadParsingSyncStream(
+ test_remote_sync_target.TestParsingSyncStream,
+ BaseSoledadTest):
+ """
+ Some tests had to be copied to this class so we can instantiate our own
+ target.
+ """
+
+ def setUp(self):
+ test_remote_sync_target.TestParsingSyncStream.setUp(self)
+
+ def tearDown(self):
+ test_remote_sync_target.TestParsingSyncStream.tearDown(self)
+
+ def test_extra_comma(self):
+ """
+ Test adapted to use encrypted content.
+ """
+ doc = SoledadDocument('i', rev='r')
+ doc.content = {}
+ _crypto = self._soledad._crypto
+ key = _crypto.doc_passphrase(doc.doc_id)
+ secret = _crypto.secret
+
+ enc_json = crypto.encrypt_docstr(
+ doc.get_json(), doc.doc_id, doc.rev,
+ key, secret)
+ tgt = target.SoledadSyncTarget(
+ "http://foo/foo", crypto=self._soledad._crypto)
+
+ self.assertRaises(u1db.errors.BrokenSyncStream,
+ tgt._parse_sync_stream, "[\r\n{},\r\n]", None)
+ self.assertRaises(u1db.errors.BrokenSyncStream,
+ tgt._parse_sync_stream,
+ '[\r\n{},\r\n{"id": "i", "rev": "r", '
+ '"content": %s, "gen": 3, "trans_id": "T-sid"}'
+ ',\r\n]' % json.dumps(enc_json),
+ lambda doc, gen, trans_id: None)
+
+ def test_wrong_start(self):
+ tgt = target.SoledadSyncTarget("http://foo/foo")
+
+ self.assertRaises(u1db.errors.BrokenSyncStream,
+ tgt._parse_sync_stream, "{}\r\n]", None)
+
+ self.assertRaises(u1db.errors.BrokenSyncStream,
+ tgt._parse_sync_stream, "\r\n{}\r\n]", None)
+
+ self.assertRaises(u1db.errors.BrokenSyncStream,
+ tgt._parse_sync_stream, "", None)
+
+ def test_wrong_end(self):
+ tgt = target.SoledadSyncTarget("http://foo/foo")
+
+ self.assertRaises(u1db.errors.BrokenSyncStream,
+ tgt._parse_sync_stream, "[\r\n{}", None)
+
+ self.assertRaises(u1db.errors.BrokenSyncStream,
+ tgt._parse_sync_stream, "[\r\n", None)
+
+ def test_missing_comma(self):
+ tgt = target.SoledadSyncTarget("http://foo/foo")
+
+ self.assertRaises(u1db.errors.BrokenSyncStream,
+ tgt._parse_sync_stream,
+ '[\r\n{}\r\n{"id": "i", "rev": "r", '
+ '"content": "c", "gen": 3}\r\n]', None)
+
+ def test_no_entries(self):
+ tgt = target.SoledadSyncTarget("http://foo/foo")
+
+ self.assertRaises(u1db.errors.BrokenSyncStream,
+ tgt._parse_sync_stream, "[\r\n]", None)
+
+ def test_error_in_stream(self):
+ tgt = target.SoledadSyncTarget("http://foo/foo")
+
+ self.assertRaises(u1db.errors.Unavailable,
+ tgt._parse_sync_stream,
+ '[\r\n{"new_generation": 0},'
+ '\r\n{"error": "unavailable"}\r\n', None)
+
+ self.assertRaises(u1db.errors.Unavailable,
+ tgt._parse_sync_stream,
+ '[\r\n{"error": "unavailable"}\r\n', None)
+
+ self.assertRaises(u1db.errors.BrokenSyncStream,
+ tgt._parse_sync_stream,
+ '[\r\n{"error": "?"}\r\n', None)
+
+
+#
+# functions for TestRemoteSyncTargets
+#
+
+def leap_sync_target(test, path):
+ return target.SoledadSyncTarget(
+ test.getURL(path), crypto=test._soledad._crypto)
+
+
+def token_leap_sync_target(test, path):
+ st = leap_sync_target(test, path)
+ st.set_token_credentials('user-uuid', 'auth-token')
+ return st
+
+
+def make_local_db_and_soledad_target(test, path='test'):
+ test.startServer()
+ db = test.request_state._create_database(os.path.basename(path))
+ st = target.SoledadSyncTarget.connect(
+ test.getURL(path), crypto=test._soledad._crypto)
+ return db, st
+
+
+def make_local_db_and_token_soledad_target(test):
+ db, st = make_local_db_and_soledad_target(test, 'test')
+ st.set_token_credentials('user-uuid', 'auth-token')
+ return db, st
+
+
+class TestSoledadSyncTarget(
+ SoledadWithCouchServerMixin,
+ test_remote_sync_target.TestRemoteSyncTargets):
+
+ scenarios = [
+ ('token_soledad',
+ {'make_app_with_state': make_token_soledad_app,
+ 'make_document_for_test': make_leap_document_for_test,
+ 'create_db_and_target': make_local_db_and_token_soledad_target,
+ 'make_database_for_test': make_sqlcipher_database_for_test,
+ 'sync_target': token_leap_sync_target}),
+ ]
+
+ def setUp(self):
+ tests.TestCaseWithServer.setUp(self)
+ self.main_test_class = test_remote_sync_target.TestRemoteSyncTargets
+ SoledadWithCouchServerMixin.setUp(self)
+ self.startServer()
+ self.db1 = make_sqlcipher_database_for_test(self, 'test1')
+ self.db2 = self.request_state._create_database('test2')
+
+ def tearDown(self):
+ SoledadWithCouchServerMixin.tearDown(self)
+ tests.TestCaseWithServer.tearDown(self)
+ db, _ = self.request_state.ensure_database('test2')
+ db.delete_database()
+
+ def test_sync_exchange_send(self):
+ """
+ Test for sync exchanging send of document.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+ self.startServer()
+ db = self.request_state._create_database('test')
+ remote_target = self.getSyncTarget('test')
+ other_docs = []
+
+ def receive_doc(doc, gen, trans_id):
+ other_docs.append((doc.doc_id, doc.rev, doc.get_json()))
+
+ doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ new_gen, trans_id = remote_target.sync_exchange(
+ [(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
+ last_known_trans_id=None, return_doc_cb=receive_doc,
+ defer_decryption=False)
+ self.assertEqual(1, new_gen)
+ self.assertGetEncryptedDoc(
+ db, 'doc-here', 'replica:1', '{"value": "here"}', False)
+
+ def test_sync_exchange_send_failure_and_retry_scenario(self):
+ """
+ Test for sync exchange failure and retry.
+
+ This test was adapted to:
+ - decrypt remote content before assert.
+ - not expect a bounced document because soledad has stateful
+ recoverable sync.
+ """
+
+ self.startServer()
+
+ def blackhole_getstderr(inst):
+ return cStringIO.StringIO()
+
+ self.patch(self.server.RequestHandlerClass, 'get_stderr',
+ blackhole_getstderr)
+ db = self.request_state._create_database('test')
+ _put_doc_if_newer = db._put_doc_if_newer
+ trigger_ids = ['doc-here2']
+
+ def bomb_put_doc_if_newer(self, doc, save_conflict,
+ replica_uid=None, replica_gen=None,
+ replica_trans_id=None, number_of_docs=None,
+ doc_idx=None, sync_id=None):
+ if doc.doc_id in trigger_ids:
+ raise Exception
+ return _put_doc_if_newer(doc, save_conflict=save_conflict,
+ replica_uid=replica_uid,
+ replica_gen=replica_gen,
+ replica_trans_id=replica_trans_id,
+ number_of_docs=number_of_docs,
+ doc_idx=doc_idx, sync_id=sync_id)
+ from leap.soledad.common.tests.test_couch import IndexedCouchDatabase
+ self.patch(
+ IndexedCouchDatabase, '_put_doc_if_newer', bomb_put_doc_if_newer)
+ remote_target = self.getSyncTarget('test')
+ other_changes = []
+
+ def receive_doc(doc, gen, trans_id):
+ other_changes.append(
+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
+
+ doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ doc2 = self.make_document('doc-here2', 'replica:1',
+ '{"value": "here2"}')
+
+ # we do not expect an HTTPError because soledad sync fails gracefully
+ remote_target.sync_exchange(
+ [(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')],
+ 'replica', last_known_generation=0, last_known_trans_id=None,
+ return_doc_cb=receive_doc)
+ self.assertGetEncryptedDoc(
+ db, 'doc-here', 'replica:1', '{"value": "here"}',
+ False)
+ self.assertEqual(
+ (10, 'T-sid'), db._get_replica_gen_and_trans_id('replica'))
+ self.assertEqual([], other_changes)
+ # retry
+ trigger_ids = []
+ new_gen, trans_id = remote_target.sync_exchange(
+ [(doc2, 11, 'T-sud')], 'replica', last_known_generation=0,
+ last_known_trans_id=None, return_doc_cb=receive_doc)
+ self.assertGetEncryptedDoc(
+ db, 'doc-here2', 'replica:1', '{"value": "here2"}',
+ False)
+ self.assertEqual(
+ (11, 'T-sud'), db._get_replica_gen_and_trans_id('replica'))
+ self.assertEqual(2, new_gen)
+ self.assertEqual(
+ ('doc-here', 'replica:1', '{"value": "here"}', 1),
+ other_changes[0][:-1])
+
+ def test_sync_exchange_send_ensure_callback(self):
+ """
+ Test for sync exchange failure and retry.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+ self.startServer()
+ remote_target = self.getSyncTarget('test')
+ other_docs = []
+ replica_uid_box = []
+
+ def receive_doc(doc, gen, trans_id):
+ other_docs.append((doc.doc_id, doc.rev, doc.get_json()))
+
+ def ensure_cb(replica_uid):
+ replica_uid_box.append(replica_uid)
+
+ doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ new_gen, trans_id = remote_target.sync_exchange(
+ [(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
+ last_known_trans_id=None, return_doc_cb=receive_doc,
+ ensure_callback=ensure_cb, defer_decryption=False)
+ self.assertEqual(1, new_gen)
+ db = self.request_state.open_database('test')
+ self.assertEqual(1, len(replica_uid_box))
+ self.assertEqual(db._replica_uid, replica_uid_box[0])
+ self.assertGetEncryptedDoc(
+ db, 'doc-here', 'replica:1', '{"value": "here"}', False)
+
+ def test_sync_exchange_in_stream_error(self):
+ # we bypass this test because our sync_exchange process does not
+ # return u1db error 503 "unavailable" for now.
+ pass
+
+
+#-----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_sync`.
+#-----------------------------------------------------------------------------
+
+target_scenarios = [
+ ('token_leap', {'create_db_and_target':
+ make_local_db_and_token_soledad_target,
+ 'make_app_with_state': make_soledad_app}),
+]
+
+
+class SoledadDatabaseSyncTargetTests(
+ SoledadWithCouchServerMixin, test_sync.DatabaseSyncTargetTests):
+
+ scenarios = (
+ tests.multiply_scenarios(
+ tests.DatabaseBaseTests.scenarios,
+ target_scenarios))
+
+ whitebox = False
+
+ def setUp(self):
+ self.main_test_class = test_sync.DatabaseSyncTargetTests
+ SoledadWithCouchServerMixin.setUp(self)
+
+ def test_sync_exchange(self):
+ """
+ Test sync exchange.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+ sol, _ = make_local_db_and_soledad_target(self)
+ docs_by_gen = [
+ (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10,
+ 'T-sid')]
+ new_gen, trans_id = self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=0,
+ last_known_trans_id=None, return_doc_cb=self.receive_doc,
+ defer_decryption=False)
+ self.assertGetEncryptedDoc(
+ self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
+ self.assertTransactionLog(['doc-id'], self.db)
+ last_trans_id = self.getLastTransId(self.db)
+ self.assertEqual(([], 1, last_trans_id),
+ (self.other_changes, new_gen, last_trans_id))
+ self.assertEqual(10, self.st.get_sync_info('replica')[3])
+ sol.close()
+
+ def test_sync_exchange_push_many(self):
+ """
+ Test sync exchange.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+ docs_by_gen = [
+ (self.make_document(
+ 'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'),
+ (self.make_document(
+ 'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')]
+ new_gen, trans_id = self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=0,
+ last_known_trans_id=None, return_doc_cb=self.receive_doc,
+ defer_decryption=False)
+ self.assertGetEncryptedDoc(
+ self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
+ self.assertGetEncryptedDoc(
+ self.db, 'doc-id2', 'replica:1', tests.nested_doc, False)
+ self.assertTransactionLog(['doc-id', 'doc-id2'], self.db)
+ last_trans_id = self.getLastTransId(self.db)
+ self.assertEqual(([], 2, last_trans_id),
+ (self.other_changes, new_gen, trans_id))
+ self.assertEqual(11, self.st.get_sync_info('replica')[3])
+
+ def test_sync_exchange_returns_many_new_docs(self):
+ """
+ Test sync exchange.
+
+ This test was adapted to avoid JSON serialization comparison as local
+ and remote representations might differ. It looks directly at the
+ doc's contents instead.
+ """
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ doc2 = self.db.create_doc_from_json(tests.nested_doc)
+ self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
+ new_gen, _ = self.st.sync_exchange(
+ [], 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, return_doc_cb=self.receive_doc,
+ defer_decryption=False)
+ self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
+ self.assertEqual(2, new_gen)
+ self.assertEqual(
+ [(doc.doc_id, doc.rev, 1),
+ (doc2.doc_id, doc2.rev, 2)],
+ [c[:-3] + c[-2:-1] for c in self.other_changes])
+ self.assertEqual(
+ json.loads(tests.simple_doc),
+ json.loads(self.other_changes[0][2]))
+ self.assertEqual(
+ json.loads(tests.nested_doc),
+ json.loads(self.other_changes[1][2]))
+ if self.whitebox:
+ self.assertEqual(
+ self.db._last_exchange_log['return'],
+ {'last_gen': 2, 'docs':
+ [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]})
+
+
+class TestSoledadDbSync(
+ SoledadWithCouchServerMixin, test_sync.TestDbSync):
+ """Test db.sync remote sync shortcut"""
+
+ scenarios = [
+ ('py-token-http', {
+ 'create_db_and_target': make_local_db_and_token_soledad_target,
+ 'make_app_with_state': make_token_soledad_app,
+ 'make_database_for_test': make_sqlcipher_database_for_test,
+ 'token': True
+ }),
+ ]
+
+ oauth = False
+ token = False
+
+ def setUp(self):
+ self.main_test_class = test_sync.TestDbSync
+ SoledadWithCouchServerMixin.setUp(self)
+
+ def do_sync(self, target_name):
+ """
+ Perform sync using SoledadSynchronizer, SoledadSyncTarget
+ and Token auth.
+ """
+ if self.token:
+ extra = dict(creds={'token': {
+ 'uuid': 'user-uuid',
+ 'token': 'auth-token',
+ }})
+ target_url = self.getURL(target_name)
+ return sync.SoledadSynchronizer(
+ self.db,
+ target.SoledadSyncTarget(
+ target_url,
+ crypto=self._soledad._crypto,
+ **extra)).sync(autocreate=True,
+ defer_decryption=False)
+ else:
+ return test_sync.TestDbSync.do_sync(self, target_name)
+
+ def test_db_sync(self):
+ """
+ Test sync.
+
+ Adapted to check for encrypted content.
+ """
+ doc1 = self.db.create_doc_from_json(tests.simple_doc)
+ doc2 = self.db2.create_doc_from_json(tests.nested_doc)
+ local_gen_before_sync = self.do_sync('test2')
+ gen, _, changes = self.db.whats_changed(local_gen_before_sync)
+ self.assertEqual(1, len(changes))
+ self.assertEqual(doc2.doc_id, changes[0][0])
+ self.assertEqual(1, gen - local_gen_before_sync)
+ self.assertGetEncryptedDoc(
+ self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False)
+ self.assertGetEncryptedDoc(
+ self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False)
+
+ def test_db_sync_autocreate(self):
+ """
+ We bypass this test because we never need to autocreate databases.
+ """
+ pass
+
+
+load_tests = tests.load_with_scenarios
diff --git a/common/src/leap/soledad/common/tests/test_target.py b/common/src/leap/soledad/common/tests/test_target.py
index 3457a3e1..6242099d 100644
--- a/common/src/leap/soledad/common/tests/test_target.py
+++ b/common/src/leap/soledad/common/tests/test_target.py
@@ -437,13 +437,17 @@ class TestSoledadSyncTarget(
def bomb_put_doc_if_newer(self, doc, save_conflict,
replica_uid=None, replica_gen=None,
- replica_trans_id=None):
+ replica_trans_id=None, number_of_docs=None,
+ doc_idx=None, sync_id=None):
if doc.doc_id in trigger_ids:
raise Exception
return _put_doc_if_newer(doc, save_conflict=save_conflict,
replica_uid=replica_uid,
replica_gen=replica_gen,
- replica_trans_id=replica_trans_id)
+ replica_trans_id=replica_trans_id,
+ number_of_docs=number_of_docs,
+ doc_idx=doc_idx,
+ sync_id=sync_id)
from leap.soledad.common.tests.test_couch import IndexedCouchDatabase
self.patch(
IndexedCouchDatabase, '_put_doc_if_newer', bomb_put_doc_if_newer)
@@ -457,9 +461,8 @@ class TestSoledadSyncTarget(
doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
doc2 = self.make_document('doc-here2', 'replica:1',
'{"value": "here2"}')
- self.assertRaises(
- u1db.errors.HTTPError,
- remote_target.sync_exchange,
+ # We do not expect an exception here because the sync fails gracefully
+ remote_target.sync_exchange(
[(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')],
'replica', last_known_generation=0, last_known_trans_id=None,
return_doc_cb=receive_doc)
@@ -480,11 +483,9 @@ class TestSoledadSyncTarget(
self.assertEqual(
(11, 'T-sud'), db._get_replica_gen_and_trans_id('replica'))
self.assertEqual(2, new_gen)
- # we do not expect the document to be bounced back because soledad has
- # stateful sync
- #self.assertEqual(
- # ('doc-here', 'replica:1', '{"value": "here"}', 1),
- # other_changes[0][:-1])
+ self.assertEqual(
+ ('doc-here', 'replica:1', '{"value": "here"}', 1),
+ other_changes[0][:-1])
def test_sync_exchange_send_ensure_callback(self):
"""
diff --git a/common/src/leap/soledad/common/tests/test_target_soledad.py b/common/src/leap/soledad/common/tests/test_target_soledad.py
new file mode 100644
index 00000000..899203b8
--- /dev/null
+++ b/common/src/leap/soledad/common/tests/test_target_soledad.py
@@ -0,0 +1,102 @@
+from u1db.remote import (
+ http_database,
+)
+
+from leap.soledad.client import (
+ auth,
+ VerifiedHTTPSConnection,
+)
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.server import SoledadApp
+from leap.soledad.server.auth import SoledadTokenAuthMiddleware
+
+
+from leap.soledad.common.tests import u1db_tests as tests
+from leap.soledad.common.tests import BaseSoledadTest
+from leap.soledad.common.tests.u1db_tests import test_backends
+
+
+#-----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_backends`.
+#-----------------------------------------------------------------------------
+
+def make_leap_document_for_test(test, doc_id, rev, content,
+ has_conflicts=False):
+ return SoledadDocument(
+ doc_id, rev, content, has_conflicts=has_conflicts)
+
+
+def make_soledad_app(state):
+ return SoledadApp(state)
+
+
+def make_token_soledad_app(state):
+ app = SoledadApp(state)
+
+ def _verify_authentication_data(uuid, auth_data):
+ if uuid == 'user-uuid' and auth_data == 'auth-token':
+ return True
+ return False
+
+ # we test for action authorization in leap.soledad.common.tests.test_server
+ def _verify_authorization(uuid, environ):
+ return True
+
+ application = SoledadTokenAuthMiddleware(app)
+ application._verify_authentication_data = _verify_authentication_data
+ application._verify_authorization = _verify_authorization
+ return application
+
+
+LEAP_SCENARIOS = [
+ ('http', {
+ 'make_database_for_test': test_backends.make_http_database_for_test,
+ 'copy_database_for_test': test_backends.copy_http_database_for_test,
+ 'make_document_for_test': make_leap_document_for_test,
+ 'make_app_with_state': make_soledad_app}),
+]
+
+
+def make_token_http_database_for_test(test, replica_uid):
+ test.startServer()
+ test.request_state._create_database(replica_uid)
+
+ class _HTTPDatabaseWithToken(
+ http_database.HTTPDatabase, auth.TokenBasedAuth):
+
+ def set_token_credentials(self, uuid, token):
+ auth.TokenBasedAuth.set_token_credentials(self, uuid, token)
+
+ def _sign_request(self, method, url_query, params):
+ return auth.TokenBasedAuth._sign_request(
+ self, method, url_query, params)
+
+ http_db = _HTTPDatabaseWithToken(test.getURL('test'))
+ http_db.set_token_credentials('user-uuid', 'auth-token')
+ return http_db
+
+
+def copy_token_http_database_for_test(test, db):
+ # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS
+ # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE
+ # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN
+ # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR
+ # HOUSE.
+ http_db = test.request_state._copy_database(db)
+ http_db.set_token_credentials(http_db, 'user-uuid', 'auth-token')
+ return http_db
+
+
+class SoledadTests(test_backends.AllDatabaseTests, BaseSoledadTest):
+
+ scenarios = LEAP_SCENARIOS + [
+ ('token_http', {'make_database_for_test':
+ make_token_http_database_for_test,
+ 'copy_database_for_test':
+ copy_token_http_database_for_test,
+ 'make_document_for_test': make_leap_document_for_test,
+ 'make_app_with_state': make_token_soledad_app,
+ })
+ ]
+
+load_tests = tests.load_with_scenarios
diff --git a/common/src/leap/soledad/common/tests/u1db_tests/__init__.py b/common/src/leap/soledad/common/tests/u1db_tests/__init__.py
index 99ff77b4..ad66fb06 100644
--- a/common/src/leap/soledad/common/tests/u1db_tests/__init__.py
+++ b/common/src/leap/soledad/common/tests/u1db_tests/__init__.py
@@ -13,8 +13,9 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with u1db. If not, see <http://www.gnu.org/licenses/>.
-
-"""Test infrastructure for U1DB"""
+"""
+Test infrastructure for U1DB
+"""
import copy
import shutil
diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_backends.py b/common/src/leap/soledad/common/tests/u1db_tests/test_backends.py
index c0a7e1f7..86e76fad 100644
--- a/common/src/leap/soledad/common/tests/u1db_tests/test_backends.py
+++ b/common/src/leap/soledad/common/tests/u1db_tests/test_backends.py
@@ -41,7 +41,7 @@ from u1db.remote import (
)
-def make_http_database_for_test(test, replica_uid, path='test'):
+def make_http_database_for_test(test, replica_uid, path='test', *args):
test.startServer()
test.request_state._create_database(replica_uid)
return http_database.HTTPDatabase(test.getURL(path))
diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py
index 633fd8dd..5e2bec86 100644
--- a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py
+++ b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py
@@ -1151,6 +1151,9 @@ class TestDbSync(tests.TestCaseWithServer):
target_url = self.getURL(path)
return self.db.sync(target_url, **extra)
+ def sync(self, callback=None, autocreate=False, defer_decryption=False):
+ return super(TestDbSync, self).sync(callback, autocreate)
+
def setUp(self):
super(TestDbSync, self).setUp()
self.startServer()
diff --git a/scripts/profiling/spam.sh b/scripts/profiling/spam.sh
new file mode 100755
index 00000000..a4f2b8ef
--- /dev/null
+++ b/scripts/profiling/spam.sh
@@ -0,0 +1,24 @@
+#!/bin/bash
+
+if [ $# -ne 2 ]; then
+ echo "Usage: ${0} <target_address> <number_of_messages>"
+ exit 1
+fi
+
+target_address=${1}
+missing=${2}
+echo "Will send ${missing} messages to ${target_address}..."
+
+while [[ ${success} -eq 0 && ${missing} -gt 0 ]]; do
+ echo " missing: ${missing}"
+ swaks -S \
+ -f ${target_address} \
+ -t ${target_address} \
+ -s chipmonk.cdev.bitmask.net \
+ -tlsc
+ if [ $? -eq 0 ]; then
+ missing=`expr ${missing} - 1`
+ else
+ echo " error, retrying..."
+ fi
+done
diff --git a/server/changes/bug_5368_avoid-yet-another-crypto-dep b/server/changes/bug_5368_avoid-yet-another-crypto-dep
new file mode 100644
index 00000000..6f3f2b04
--- /dev/null
+++ b/server/changes/bug_5368_avoid-yet-another-crypto-dep
@@ -0,0 +1,2 @@
+ o Pin PyOpenSSL dependency version to <0.14 to avoid yet another crypto
+ dependency.
diff --git a/server/changes/feature_3399-check-auth-in-constant-way b/server/changes/feature_3399-check-auth-in-constant-way
new file mode 100644
index 00000000..ebd18680
--- /dev/null
+++ b/server/changes/feature_3399-check-auth-in-constant-way
@@ -0,0 +1 @@
+ o Authenticate in time-insensitive manner. Closes #3399.
diff --git a/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync b/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync
new file mode 100644
index 00000000..0087c535
--- /dev/null
+++ b/server/changes/feature_5571_allow-for-interrupting-and-recovering-sync
@@ -0,0 +1 @@
+ o Allow for interrupting and recovering sync (#5517).
diff --git a/server/changes/feature_5571_split-sync-post b/server/changes/feature_5571_split-sync-post
new file mode 100644
index 00000000..ad269cd4
--- /dev/null
+++ b/server/changes/feature_5571_split-sync-post
@@ -0,0 +1 @@
+ o Split sync in multiple POST requests in server (#5571).
diff --git a/server/pkg/requirements.pip b/server/pkg/requirements.pip
index 7cbca401..be5d156b 100644
--- a/server/pkg/requirements.pip
+++ b/server/pkg/requirements.pip
@@ -3,7 +3,7 @@ couchdb
simplejson
u1db
routes
-PyOpenSSL
+PyOpenSSL<0.14
# TODO: maybe we just want twisted-web?
twisted>=12.0.0
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index c6928aaa..6dc99b5a 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -210,6 +210,8 @@ class SyncExchange(sync.SyncExchange):
:param last_known_generation: The last target replica generation the
source replica knows about.
:type last_known_generation: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
"""
self._db = db
self.source_replica_uid = source_replica_uid
@@ -284,7 +286,8 @@ class SyncExchange(sync.SyncExchange):
doc = self._db.get_doc(changed_doc_id, include_deleted=True)
return_doc_cb(doc, gen, trans_id)
- def insert_doc_from_source(self, doc, source_gen, trans_id):
+ def insert_doc_from_source(self, doc, source_gen, trans_id,
+ number_of_docs=None, doc_idx=None, sync_id=None):
"""Try to insert synced document from source.
Conflicting documents are not inserted but will be sent over
@@ -302,10 +305,18 @@ class SyncExchange(sync.SyncExchange):
:type source_gen: int
:param trans_id: The transaction id of that document change.
:type trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document.
+ :type doc_idx: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
"""
state, at_gen = self._db._put_doc_if_newer(
doc, save_conflict=False, replica_uid=self.source_replica_uid,
- replica_gen=source_gen, replica_trans_id=trans_id)
+ replica_gen=source_gen, replica_trans_id=trans_id,
+ number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id)
if state == 'inserted':
self._sync_state.put_seen_id(doc.doc_id, at_gen)
elif state == 'converged':
@@ -340,6 +351,8 @@ class SyncResource(http_app.SyncResource):
:param last_known_trans_id: The last server replica transaction_id the
client knows about.
:type last_known_trans_id: str
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
:param ensure: Whether the server replica should be created if it does
not already exist.
:type ensure: bool
@@ -355,9 +368,11 @@ class SyncResource(http_app.SyncResource):
# get a sync exchange object
self.sync_exch = self.sync_exchange_class(
db, self.source_replica_uid, last_known_generation, sync_id)
+ self._sync_id = sync_id
@http_app.http_method(content_as_args=True)
- def post_put(self, id, rev, content, gen, trans_id):
+ def post_put(self, id, rev, content, gen, trans_id, number_of_docs,
+ doc_idx):
"""
Put one incoming document into the server replica.
@@ -373,9 +388,16 @@ class SyncResource(http_app.SyncResource):
:param trans_id: The source replica transaction id corresponding to
the revision of the incoming document.
:type trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document.
+ :type doc_idx: int
"""
doc = Document(id, rev, content)
- self.sync_exch.insert_doc_from_source(doc, gen, trans_id)
+ self.sync_exch.insert_doc_from_source(
+ doc, gen, trans_id, number_of_docs=number_of_docs,
+ doc_idx=doc_idx, sync_id=self._sync_id)
@http_app.http_method(received=int, content_as_args=True)
def post_get(self, received):