summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2016-12-22 17:36:15 -0200
committerdrebs <drebs@leap.se>2016-12-22 17:36:15 -0200
commit8a463796bbaba3979234b0699d140947581421e7 (patch)
treed1e2ea96ed91ac66c7e52a30d16246a498ae9ed6
parentf072f18f317ea31e66c7890d672b5d2fd9f3ef14 (diff)
parente360a3a75999503cf45bfbbad69970a452cf3d32 (diff)
Merge tag '0.9.2'
Tag version 0.9.2 # gpg: Signature made Thu 22 Dec 2016 05:33:30 PM BRST # gpg: using RSA key 0x6071E70DCACC60B2 # gpg: Good signature from "drebs (work key) <db@leap.se>" [ultimate] # gpg: aka "drebs (work key) <drebs@leap.se>" [ultimate] # Impressão da chave primária: 9F73 295B 6306 E06F 3151 99AE 6071 E70D CACC 60B2
-rw-r--r--.gitlab-ci.yml13
-rw-r--r--CHANGELOG.rst23
-rw-r--r--client/pkg/requirements.pip1
-rw-r--r--client/src/leap/soledad/client/__init__.py1
-rw-r--r--client/src/leap/soledad/client/_crypto.py386
-rw-r--r--client/src/leap/soledad/client/adbapi.py18
-rw-r--r--client/src/leap/soledad/client/api.py111
-rw-r--r--client/src/leap/soledad/client/crypto.py3
-rw-r--r--client/src/leap/soledad/client/encdecpool.py657
-rw-r--r--client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py3
-rw-r--r--client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py3
-rw-r--r--client/src/leap/soledad/client/examples/run_benchmark.py1
-rw-r--r--client/src/leap/soledad/client/examples/soledad_sync.py2
-rw-r--r--client/src/leap/soledad/client/examples/use_adbapi.py2
-rw-r--r--client/src/leap/soledad/client/examples/use_api.py2
-rw-r--r--client/src/leap/soledad/client/http_target/__init__.py36
-rw-r--r--client/src/leap/soledad/client/http_target/api.py37
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py238
-rw-r--r--client/src/leap/soledad/client/http_target/fetch_protocol.py159
-rw-r--r--client/src/leap/soledad/client/http_target/send.py70
-rw-r--r--client/src/leap/soledad/client/http_target/send_protocol.py76
-rw-r--r--client/src/leap/soledad/client/http_target/support.py52
-rw-r--r--client/src/leap/soledad/client/interfaces.py7
-rw-r--r--client/src/leap/soledad/client/secrets.py41
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py119
-rw-r--r--client/src/leap/soledad/client/sync.py48
-rw-r--r--common/src/leap/soledad/common/backend.py6
-rw-r--r--common/src/leap/soledad/common/couch/__init__.py65
-rw-r--r--common/src/leap/soledad/common/l2db/backends/sqlite_backend.py12
-rw-r--r--common/src/leap/soledad/common/l2db/remote/http_app.py5
-rw-r--r--scripts/db_access/client_side_db.py12
-rw-r--r--scripts/docker/files/bin/client_side_db.py3
-rw-r--r--scripts/profiling/mail/soledad_client.py3
-rwxr-xr-xscripts/profiling/sync/profile-sync.py1
-rw-r--r--server/pkg/requirements.pip1
-rw-r--r--server/pkg/soledad-server4
-rw-r--r--server/src/leap/soledad/server/__init__.py17
-rw-r--r--server/src/leap/soledad/server/config.py2
-rw-r--r--server/src/leap/soledad/server/resource.py53
-rw-r--r--server/src/leap/soledad/server/sync.py98
-rw-r--r--setup.cfg8
-rw-r--r--testing/pytest.ini2
-rw-r--r--testing/test_soledad/util.py39
-rw-r--r--testing/tests/benchmarks/assets/cert_default.conf (renamed from testing/tests/perf/assets/cert_default.conf)0
-rw-r--r--testing/tests/benchmarks/conftest.py57
-rw-r--r--testing/tests/benchmarks/pytest.ini (renamed from testing/tests/perf/pytest.ini)0
-rw-r--r--testing/tests/benchmarks/test_crypto.py97
-rw-r--r--testing/tests/benchmarks/test_misc.py (renamed from testing/tests/perf/test_misc.py)0
-rw-r--r--testing/tests/benchmarks/test_sqlcipher.py (renamed from testing/tests/perf/test_sqlcipher.py)12
-rw-r--r--testing/tests/benchmarks/test_sync.py (renamed from testing/tests/perf/test_sync.py)23
-rw-r--r--testing/tests/client/test_aux_methods.py4
-rw-r--r--testing/tests/client/test_crypto.py282
-rw-r--r--testing/tests/client/test_deprecated_crypto.py91
-rw-r--r--testing/tests/conftest.py189
-rw-r--r--testing/tests/couch/conftest.py31
-rw-r--r--testing/tests/couch/test_command.py3
-rw-r--r--testing/tests/couch/test_state.py33
-rw-r--r--testing/tests/perf/conftest.py249
-rw-r--r--testing/tests/perf/test_crypto.py81
-rw-r--r--testing/tests/perf/test_encdecpool.py78
-rw-r--r--testing/tests/server/test_server.py22
-rw-r--r--testing/tests/sync/test_encdecpool.py306
-rw-r--r--testing/tests/sync/test_sqlcipher_sync.py17
-rw-r--r--testing/tests/sync/test_sync.py22
-rw-r--r--testing/tests/sync/test_sync_deferred.py196
-rw-r--r--testing/tests/sync/test_sync_mutex.py5
-rw-r--r--testing/tests/sync/test_sync_target.py191
-rw-r--r--testing/tox.ini20
68 files changed, 1894 insertions, 2555 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index dd4e4605..ac2ae1f0 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -1,7 +1,6 @@
stages:
- code-check
- tests
- - benchmark
# Cache tox envs between builds
cache:
@@ -22,15 +21,5 @@ tests:
script:
- cd testing
- tox -- --couch-url http://couchdb:5984
-
-benchmark:
- stage: benchmark
- image: leapcode/soledad:latest
- services:
- - couchdb
- script:
- - cd testing
- - tox -e perf -- --couch-url http://couchdb:5984
tags:
- - docker
- - benchmark
+ - couchdb
diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index 12cb56ab..f47749d1 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -1,3 +1,26 @@
+0.9.2 - 22 December, 2016
++++++++++++++++++++++++++
+
+Performance improvements
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+- use AES 256 GCM mode instead of CTR+HMAC.
+- streaming encryption/decryption and data transfer.
+
+Server
+~~~~~~
+
+- move server to a twisted resource entrypoint.
+
+Client
+~~~~~~
+
+- use twisted http agent in the client.
+- maintain backwards compatibility with old crypto scheme (AES 256 CTR+HMAC).
+ No migration for now, only in 0.10.
+- remove the encryption/decryption pools, replace for inline streaming crypto.
+- use sqlcipher transactions on sync.
+
0.9.1 - 27 November, 2016
+++++++++++++++++++++++++
diff --git a/client/pkg/requirements.pip b/client/pkg/requirements.pip
index 2ae844e1..24b168b4 100644
--- a/client/pkg/requirements.pip
+++ b/client/pkg/requirements.pip
@@ -2,3 +2,4 @@ pysqlcipher>2.6.3
scrypt
zope.proxy
twisted
+cryptography
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py
index 245a8971..3a114021 100644
--- a/client/src/leap/soledad/client/__init__.py
+++ b/client/src/leap/soledad/client/__init__.py
@@ -21,6 +21,7 @@ from leap.soledad.client.api import Soledad
from leap.soledad.common import soledad_assert
from ._version import get_versions
+
__version__ = get_versions()['version']
del get_versions
diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py
new file mode 100644
index 00000000..4bbdd044
--- /dev/null
+++ b/client/src/leap/soledad/client/_crypto.py
@@ -0,0 +1,386 @@
+# -*- coding: utf-8 -*-
+# _crypto.py
+# Copyright (C) 2016 LEAP Encryption Access Project
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Cryptographic operations for the soledad client
+"""
+
+import binascii
+import base64
+import hashlib
+import hmac
+import os
+import re
+import struct
+import time
+
+from io import BytesIO
+from itertools import imap
+from collections import namedtuple
+
+from twisted.internet import defer
+from twisted.internet import interfaces
+from twisted.web.client import FileBodyProducer
+
+from cryptography.exceptions import InvalidTag
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+from cryptography.hazmat.backends.multibackend import MultiBackend
+from cryptography.hazmat.backends.openssl.backend \
+ import Backend as OpenSSLBackend
+
+from zope.interface import implements
+
+
+SECRET_LENGTH = 64
+
+CRYPTO_BACKEND = MultiBackend([OpenSSLBackend()])
+
+PACMAN = struct.Struct('2sbbQ16s255p255p')
+BLOB_SIGNATURE_MAGIC = '\x13\x37'
+
+
+ENC_SCHEME = namedtuple('SCHEME', 'symkey')(1)
+ENC_METHOD = namedtuple('METHOD', 'aes_256_ctr aes_256_gcm')(1, 2)
+DocInfo = namedtuple('DocInfo', 'doc_id rev')
+
+
+class EncryptionDecryptionError(Exception):
+ pass
+
+
+class InvalidBlob(Exception):
+ pass
+
+
+class SoledadCrypto(object):
+ """
+ This class provides convenient methods for document encryption and
+ decryption using BlobEncryptor and BlobDecryptor classes.
+ """
+ def __init__(self, secret):
+ """
+ Initialize the crypto object.
+
+ :param secret: The Soledad remote storage secret.
+ :type secret: str
+ """
+ self.secret = secret
+
+ def encrypt_doc(self, doc):
+ """
+ Creates and configures a BlobEncryptor, asking it to start encryption
+ and wrapping the result as a simple JSON string with a "raw" key.
+
+ :param doc: the document to be encrypted.
+ :type doc: SoledadDocument
+ :return: A deferred whose callback will be invoked with a JSON string
+ containing the ciphertext as the value of "raw" key.
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ def put_raw(blob):
+ raw = blob.getvalue()
+ return '{"raw": "' + raw + '"}'
+
+ content = BytesIO(str(doc.get_json()))
+ info = DocInfo(doc.doc_id, doc.rev)
+ del doc
+ encryptor = BlobEncryptor(info, content, secret=self.secret)
+ d = encryptor.encrypt()
+ d.addCallback(put_raw)
+ return d
+
+ def decrypt_doc(self, doc):
+ """
+ Creates and configures a BlobDecryptor, asking it decrypt and returning
+ the decrypted cleartext content from the encrypted document.
+
+ :param doc: the document to be decrypted.
+ :type doc: SoledadDocument
+ :return: The decrypted cleartext content of the document.
+ :rtype: str
+ """
+ info = DocInfo(doc.doc_id, doc.rev)
+ ciphertext = BytesIO()
+ payload = doc.content['raw']
+ del doc
+ ciphertext.write(str(payload))
+ decryptor = BlobDecryptor(info, ciphertext, secret=self.secret)
+ return decryptor.decrypt()
+
+
+def encrypt_sym(data, key, method=ENC_METHOD.aes_256_gcm):
+ """
+ Encrypt data using AES-256 cipher in selected mode.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param key: The key used to encrypt data (must be 256 bits long).
+ :type key: str
+
+ :return: A tuple with the initialization vector and the ciphertext, both
+ encoded as base64.
+ :rtype: (str, str)
+ """
+ mode = _mode_by_method(method)
+ encryptor = AESWriter(key, mode=mode)
+ encryptor.write(data)
+ _, ciphertext = encryptor.end()
+ iv = base64.b64encode(encryptor.iv)
+ tag = encryptor.tag or ''
+ return iv, ciphertext + tag
+
+
+def decrypt_sym(data, key, iv, method=ENC_METHOD.aes_256_gcm):
+ """
+ Decrypt data using AES-256 cipher in selected mode.
+
+ :param data: The data to be decrypted.
+ :type data: str
+ :param key: The symmetric key used to decrypt data (must be 256 bits
+ long).
+ :type key: str
+ :param iv: The base64 encoded initialization vector.
+ :type iv: str
+
+ :return: The decrypted data.
+ :rtype: str
+ """
+ _iv = base64.b64decode(str(iv))
+ mode = _mode_by_method(method)
+ tag = None
+ if mode == modes.GCM:
+ data, tag = data[:-16], data[-16:]
+ decryptor = AESWriter(key, _iv, tag=tag, mode=mode)
+ decryptor.write(data)
+ _, plaintext = decryptor.end()
+ return plaintext
+
+
+class BlobEncryptor(object):
+ """
+ Produces encrypted data from the cleartext data associated with a given
+ SoledadDocument using AES-256 cipher in GCM mode.
+ The production happens using a Twisted's FileBodyProducer, which uses a
+ Cooperator to schedule calls and can be paused/resumed. Each call takes at
+ most 65536 bytes from the input.
+ Both the production input and output are file descriptors, so they can be
+ applied to a stream of data.
+ """
+ def __init__(self, doc_info, content_fd, secret=None):
+ if not secret:
+ raise EncryptionDecryptionError('no secret given')
+
+ self.doc_id = doc_info.doc_id
+ self.rev = doc_info.rev
+ self._content_fd = content_fd
+ self._producer = FileBodyProducer(content_fd, readSize=2**16)
+
+ sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret)
+ self._aes = AESWriter(sym_key)
+ self._aes.authenticate(self._make_preamble())
+
+ @property
+ def iv(self):
+ return self._aes.iv
+
+ @property
+ def tag(self):
+ return self._aes.tag
+
+ def encrypt(self):
+ """
+ Starts producing encrypted data from the cleartext data.
+
+ :return: A deferred which will be fired when encryption ends and whose
+ callback will be invoked with the resulting ciphertext.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ d = self._producer.startProducing(self._aes)
+ d.addCallback(lambda _: self._end_crypto_stream())
+ return d
+
+ def _make_preamble(self):
+ current_time = int(time.time())
+
+ return PACMAN.pack(
+ BLOB_SIGNATURE_MAGIC,
+ ENC_SCHEME.symkey,
+ ENC_METHOD.aes_256_gcm,
+ current_time,
+ self.iv,
+ str(self.doc_id),
+ str(self.rev))
+
+ def _end_crypto_stream(self):
+ preamble, encrypted = self._aes.end()
+ result = BytesIO()
+ result.write(
+ base64.urlsafe_b64encode(preamble))
+ result.write(' ')
+ result.write(
+ base64.urlsafe_b64encode(encrypted + self.tag))
+ return defer.succeed(result)
+
+
+class BlobDecryptor(object):
+ """
+ Decrypts an encrypted blob associated with a given Document.
+
+ Will raise an exception if the blob doesn't have the expected structure, or
+ if the GCM tag doesn't verify.
+ """
+
+ def __init__(self, doc_info, ciphertext_fd, result=None,
+ secret=None):
+ if not secret:
+ raise EncryptionDecryptionError('no secret given')
+
+ self.doc_id = doc_info.doc_id
+ self.rev = doc_info.rev
+
+ ciphertext_fd, preamble, iv = self._consume_preamble(ciphertext_fd)
+
+ self.result = result or BytesIO()
+ sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret)
+ self._aes = AESWriter(sym_key, iv, self.result, tag=self.tag)
+ self._aes.authenticate(preamble)
+
+ self._producer = FileBodyProducer(ciphertext_fd, readSize=2**16)
+
+ def _consume_preamble(self, ciphertext_fd):
+ ciphertext_fd.seek(0)
+ try:
+ preamble, ciphertext = _split(ciphertext_fd.getvalue())
+ self.tag, ciphertext = ciphertext[-16:], ciphertext[:-16]
+ except (TypeError, binascii.Error):
+ raise InvalidBlob
+ ciphertext_fd.close()
+
+ if len(preamble) != PACMAN.size:
+ raise InvalidBlob
+
+ try:
+ unpacked_data = PACMAN.unpack(preamble)
+ magic, sch, meth, ts, iv, doc_id, rev = unpacked_data
+ except struct.error:
+ raise InvalidBlob
+
+ if magic != BLOB_SIGNATURE_MAGIC:
+ raise InvalidBlob
+ # TODO check timestamp
+ if sch != ENC_SCHEME.symkey:
+ raise InvalidBlob('invalid scheme')
+ if meth != ENC_METHOD.aes_256_gcm:
+ raise InvalidBlob('invalid encryption scheme')
+ if rev != self.rev:
+ raise InvalidBlob('invalid revision')
+ if doc_id != self.doc_id:
+ raise InvalidBlob('invalid revision')
+ return BytesIO(ciphertext), preamble, iv
+
+ def _end_stream(self):
+ try:
+ return self._aes.end()[1]
+ except InvalidTag:
+ raise InvalidBlob('Invalid Tag. Blob authentication failed.')
+
+ def decrypt(self):
+ """
+ Starts producing encrypted data from the cleartext data.
+
+ :return: A deferred which will be fired when encryption ends and whose
+ callback will be invoked with the resulting ciphertext.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ d = self._producer.startProducing(self._aes)
+ d.addCallback(lambda _: self._end_stream())
+ return d
+
+
+class AESWriter(object):
+ """
+ A Twisted's Consumer implementation that takes an input file descriptor and
+ applies AES-256 cipher in GCM mode.
+ """
+ implements(interfaces.IConsumer)
+
+ def __init__(self, key, iv=None, _buffer=None, tag=None, mode=modes.GCM):
+ if len(key) != 32:
+ raise EncryptionDecryptionError('key is not 256 bits')
+ self.iv = iv or os.urandom(16)
+ self.buffer = _buffer or BytesIO()
+ cipher = _get_aes_cipher(key, self.iv, tag, mode)
+ cipher = cipher.decryptor() if tag else cipher.encryptor()
+ self.cipher, self.aead = cipher, ''
+
+ def authenticate(self, data):
+ self.aead += data
+ self.cipher.authenticate_additional_data(data)
+
+ @property
+ def tag(self):
+ return getattr(self.cipher, 'tag', None)
+
+ def write(self, data):
+ self.buffer.write(self.cipher.update(data))
+
+ def end(self):
+ self.buffer.write(self.cipher.finalize())
+ return self.aead, self.buffer.getvalue()
+
+
+def is_symmetrically_encrypted(content):
+ """
+ Returns True if the document was symmetrically encrypted.
+ 'EzcB' is the base64 encoding of \x13\x37 magic number and 1 (symmetrically
+ encrypted value for enc_scheme flag).
+
+ :param doc: The document content as string
+ :type doc: str
+
+ :rtype: bool
+ """
+ return content and content[:13] == '{"raw": "EzcB'
+
+
+# utils
+
+
+def _hmac_sha256(key, data):
+ return hmac.new(key, data, hashlib.sha256).digest()
+
+
+def _get_sym_key_for_doc(doc_id, secret):
+ key = secret[SECRET_LENGTH:]
+ return _hmac_sha256(key, doc_id)
+
+
+def _get_aes_cipher(key, iv, tag, mode=modes.GCM):
+ mode = mode(iv, tag) if mode == modes.GCM else mode(iv)
+ return Cipher(algorithms.AES(key), mode, backend=CRYPTO_BACKEND)
+
+
+def _split(base64_raw_payload):
+ return imap(base64.urlsafe_b64decode, re.split(' ', base64_raw_payload))
+
+
+def _mode_by_method(method):
+ if method == ENC_METHOD.aes_256_gcm:
+ return modes.GCM
+ else:
+ return modes.CTR
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py
index ce9bec05..a5328d2b 100644
--- a/client/src/leap/soledad/client/adbapi.py
+++ b/client/src/leap/soledad/client/adbapi.py
@@ -50,8 +50,7 @@ How many times a SQLCipher query should be retried in case of timeout.
SQLCIPHER_MAX_RETRIES = 10
-def getConnectionPool(opts, openfun=None, driver="pysqlcipher",
- sync_enc_pool=None):
+def getConnectionPool(opts, openfun=None, driver="pysqlcipher"):
"""
Return a connection pool.
@@ -72,7 +71,7 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher",
if openfun is None and driver == "pysqlcipher":
openfun = partial(set_init_pragmas, opts=opts)
return U1DBConnectionPool(
- opts, sync_enc_pool,
+ opts,
# the following params are relayed "as is" to twisted's
# ConnectionPool.
"%s.dbapi2" % driver, opts.path, timeout=SQLCIPHER_CONNECTION_TIMEOUT,
@@ -89,7 +88,7 @@ class U1DBConnection(adbapi.Connection):
The U1DB wrapper to use.
"""
- def __init__(self, pool, sync_enc_pool, init_u1db=False):
+ def __init__(self, pool, init_u1db=False):
"""
:param pool: The pool of connections to that owns this connection.
:type pool: adbapi.ConnectionPool
@@ -97,7 +96,6 @@ class U1DBConnection(adbapi.Connection):
:type init_u1db: bool
"""
self.init_u1db = init_u1db
- self._sync_enc_pool = sync_enc_pool
try:
adbapi.Connection.__init__(self, pool)
except dbapi2.DatabaseError as e:
@@ -116,8 +114,7 @@ class U1DBConnection(adbapi.Connection):
if self.init_u1db:
self._u1db = self.u1db_wrapper(
self._connection,
- self._pool.opts,
- self._sync_enc_pool)
+ self._pool.opts)
def __getattr__(self, name):
"""
@@ -162,12 +159,11 @@ class U1DBConnectionPool(adbapi.ConnectionPool):
connectionFactory = U1DBConnection
transactionFactory = U1DBTransaction
- def __init__(self, opts, sync_enc_pool, *args, **kwargs):
+ def __init__(self, opts, *args, **kwargs):
"""
Initialize the connection pool.
"""
self.opts = opts
- self._sync_enc_pool = sync_enc_pool
try:
adbapi.ConnectionPool.__init__(self, *args, **kwargs)
except dbapi2.DatabaseError as e:
@@ -182,7 +178,7 @@ class U1DBConnectionPool(adbapi.ConnectionPool):
try:
conn = self.connectionFactory(
- self, self._sync_enc_pool, init_u1db=True)
+ self, init_u1db=True)
replica_uid = conn._u1db._real_replica_uid
setProxiedObject(self.replica_uid, replica_uid)
except DatabaseAccessError as e:
@@ -257,7 +253,7 @@ class U1DBConnectionPool(adbapi.ConnectionPool):
tid = self.threadID()
u1db = self._u1dbconnections.get(tid)
conn = self.connectionFactory(
- self, self._sync_enc_pool, init_u1db=not bool(u1db))
+ self, init_u1db=not bool(u1db))
if self.replica_uid is None:
replica_uid = conn._u1db._real_replica_uid
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py
index 6870d5ba..da6eec66 100644
--- a/client/src/leap/soledad/client/api.py
+++ b/client/src/leap/soledad/client/api.py
@@ -56,11 +56,10 @@ from leap.soledad.common.errors import DatabaseAccessError
from leap.soledad.client import adbapi
from leap.soledad.client import events as soledad_events
from leap.soledad.client import interfaces as soledad_interfaces
-from leap.soledad.client.crypto import SoledadCrypto
+from leap.soledad.client import sqlcipher
from leap.soledad.client.secrets import SoledadSecrets
from leap.soledad.client.shared_db import SoledadSharedDatabase
-from leap.soledad.client import sqlcipher
-from leap.soledad.client import encdecpool
+from leap.soledad.client._crypto import SoledadCrypto
logger = getLogger(__name__)
@@ -131,7 +130,7 @@ class Soledad(object):
def __init__(self, uuid, passphrase, secrets_path, local_db_path,
server_url, cert_file, shared_db=None,
- auth_token=None, defer_encryption=False, syncable=True):
+ auth_token=None, syncable=True):
"""
Initialize configuration, cryptographic keys and dbs.
@@ -168,11 +167,6 @@ class Soledad(object):
Authorization token for accessing remote databases.
:type auth_token: str
- :param defer_encryption:
- Whether to defer encryption/decryption of documents, or do it
- inline while syncing.
- :type defer_encryption: bool
-
:param syncable:
If set to ``False``, this database will not attempt to synchronize
with remote replicas (default is ``True``)
@@ -188,9 +182,7 @@ class Soledad(object):
self._passphrase = passphrase
self._local_db_path = local_db_path
self._server_url = server_url
- self._defer_encryption = defer_encryption
self._secrets_path = None
- self._sync_enc_pool = None
self._dbsyncer = None
self.shared_db = shared_db
@@ -226,7 +218,6 @@ class Soledad(object):
# have to close any thread-related stuff we have already opened
# here, otherwise there might be zombie threads that may clog the
# reactor.
- self._sync_db.close()
if hasattr(self, '_dbpool'):
self._dbpool.close()
raise
@@ -289,22 +280,12 @@ class Soledad(object):
tohex = binascii.b2a_hex
# sqlcipher only accepts the hex version
key = tohex(self._secrets.get_local_storage_key())
- sync_db_key = tohex(self._secrets.get_sync_db_key())
opts = sqlcipher.SQLCipherOptions(
self._local_db_path, key,
- is_raw_key=True, create=True,
- defer_encryption=self._defer_encryption,
- sync_db_key=sync_db_key,
- )
+ is_raw_key=True, create=True)
self._sqlcipher_opts = opts
-
- # the sync_db is used both for deferred encryption and decryption, so
- # we want to initialize it anyway to allow for all combinations of
- # deferred encryption and decryption configurations.
- self._initialize_sync_db(opts)
- self._dbpool = adbapi.getConnectionPool(
- opts, sync_enc_pool=self._sync_enc_pool)
+ self._dbpool = adbapi.getConnectionPool(opts)
def _init_u1db_syncer(self):
"""
@@ -313,10 +294,7 @@ class Soledad(object):
replica_uid = self._dbpool.replica_uid
self._dbsyncer = sqlcipher.SQLCipherU1DBSync(
self._sqlcipher_opts, self._crypto, replica_uid,
- SOLEDAD_CERT,
- defer_encryption=self._defer_encryption,
- sync_db=self._sync_db,
- sync_enc_pool=self._sync_enc_pool)
+ SOLEDAD_CERT)
def sync_stats(self):
sync_phase = 0
@@ -341,12 +319,6 @@ class Soledad(object):
self._dbpool.close()
if getattr(self, '_dbsyncer', None):
self._dbsyncer.close()
- # close the sync database
- if self._sync_db:
- self._sync_db.close()
- self._sync_db = None
- if self._defer_encryption:
- self._sync_enc_pool.stop()
#
# ILocalStorage
@@ -385,7 +357,8 @@ class Soledad(object):
also be updated.
:rtype: twisted.internet.defer.Deferred
"""
- return self._defer("put_doc", doc)
+ d = self._defer("put_doc", doc)
+ return d
def delete_doc(self, doc):
"""
@@ -479,7 +452,8 @@ class Soledad(object):
# create_doc (and probably to put_doc too). There are cases (mail
# payloads for example) in which we already have the encoding in the
# headers, so we don't need to guess it.
- return self._defer("create_doc", content, doc_id=doc_id)
+ d = self._defer("create_doc", content, doc_id=doc_id)
+ return d
def create_doc_from_json(self, json, doc_id=None):
"""
@@ -700,37 +674,26 @@ class Soledad(object):
if syncable and not self._dbsyncer:
self._init_u1db_syncer()
- def sync(self, defer_decryption=True):
+ def sync(self):
"""
Synchronize documents with the server replica.
This method uses a lock to prevent multiple concurrent sync processes
over the same local db file.
- :param defer_decryption:
- Whether to defer decryption of documents, or do it inline while
- syncing.
- :type defer_decryption: bool
-
:return: A deferred lock that will run the actual sync process when
the lock is acquired, and which will fire with with the local
generation before the synchronization was performed.
:rtype: twisted.internet.defer.Deferred
"""
d = self.sync_lock.run(
- self._sync,
- defer_decryption)
+ self._sync)
return d
- def _sync(self, defer_decryption):
+ def _sync(self):
"""
Synchronize documents with the server replica.
- :param defer_decryption:
- Whether to defer decryption of documents, or do it inline while
- syncing.
- :type defer_decryption: bool
-
:return: A deferred whose callback will be invoked with the local
generation before the synchronization was performed.
:rtype: twisted.internet.defer.Deferred
@@ -740,8 +703,7 @@ class Soledad(object):
return
d = self._dbsyncer.sync(
sync_url,
- creds=self._creds,
- defer_decryption=defer_decryption)
+ creds=self._creds)
def _sync_callback(local_gen):
self._last_received_docs = docs = self._dbsyncer.received_docs
@@ -837,50 +799,6 @@ class Soledad(object):
token = property(_get_token, _set_token, doc='The authentication Token.')
- def _initialize_sync_db(self, opts):
- """
- Initialize the Symmetrically-Encrypted document to be synced database,
- and the queue to communicate with subprocess workers.
-
- :param opts:
- :type opts: SQLCipherOptions
- """
- soledad_assert(opts.sync_db_key is not None)
- sync_db_path = None
- if opts.path != ":memory:":
- sync_db_path = "%s-sync" % opts.path
- else:
- sync_db_path = ":memory:"
-
- # we copy incoming options because the opts object might be used
- # somewhere else
- sync_opts = sqlcipher.SQLCipherOptions.copy(
- opts, path=sync_db_path, create=True)
- self._sync_db = sqlcipher.getConnectionPool(
- sync_opts, extra_queries=self._sync_db_extra_init)
- if self._defer_encryption:
- # initialize syncing queue encryption pool
- self._sync_enc_pool = encdecpool.SyncEncrypterPool(
- self._crypto, self._sync_db)
- self._sync_enc_pool.start()
-
- @property
- def _sync_db_extra_init(self):
- """
- Queries for creating tables for the local sync documents db if needed.
- They are passed as extra initialization to initialize_sqlciphjer_db
-
- :rtype: tuple of strings
- """
- maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)"
- encr = encdecpool.SyncEncrypterPool
- decr = encdecpool.SyncDecrypterPool
- sql_encr_table_query = (maybe_create % (
- encr.TABLE_NAME, encr.FIELD_NAMES))
- sql_decr_table_query = (maybe_create % (
- decr.TABLE_NAME, decr.FIELD_NAMES))
- return (sql_encr_table_query, sql_decr_table_query)
-
#
# ISecretsStorage
#
@@ -1017,6 +935,7 @@ def create_path_if_not_exists(path):
# Monkey patching u1db to be able to provide a custom SSL cert
# ----------------------------------------------------------------------------
+
# We need a more reasonable timeout (in seconds)
SOLEDAD_TIMEOUT = 120
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index d81c883b..09e90171 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -32,9 +32,12 @@ from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
from leap.soledad.common import crypto
from leap.soledad.common.log import getLogger
+import warnings
logger = getLogger(__name__)
+warnings.warn("'soledad.client.crypto' MODULE DEPRECATED",
+ DeprecationWarning, stacklevel=2)
MAC_KEY_LENGTH = 64
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
deleted file mode 100644
index 056b012f..00000000
--- a/client/src/leap/soledad/client/encdecpool.py
+++ /dev/null
@@ -1,657 +0,0 @@
-# -*- coding: utf-8 -*-
-# encdecpool.py
-# Copyright (C) 2015 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""
-A pool of encryption/decryption concurrent and parallel workers for using
-during synchronization.
-"""
-
-
-import json
-from uuid import uuid4
-
-from twisted.internet.task import LoopingCall
-from twisted.internet import threads
-from twisted.internet import defer
-
-from leap.soledad.common.document import SoledadDocument
-from leap.soledad.common import soledad_assert
-from leap.soledad.common.log import getLogger
-
-from leap.soledad.client.crypto import encrypt_docstr
-from leap.soledad.client.crypto import decrypt_doc_dict
-
-
-logger = getLogger(__name__)
-
-
-#
-# Encrypt/decrypt pools of workers
-#
-
-class SyncEncryptDecryptPool(object):
- """
- Base class for encrypter/decrypter pools.
- """
-
- def __init__(self, crypto, sync_db):
- """
- Initialize the pool of encryption-workers.
-
- :param crypto: A SoledadCryto instance to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
-
- :param sync_db: A database connection handle
- :type sync_db: pysqlcipher.dbapi2.Connection
- """
- self._crypto = crypto
- self._sync_db = sync_db
- self._delayed_call = None
- self._started = False
-
- def start(self):
- self._started = True
-
- def stop(self):
- self._started = False
- # maybe cancel the next delayed call
- if self._delayed_call \
- and not self._delayed_call.called:
- self._delayed_call.cancel()
-
- @property
- def running(self):
- return self._started
-
- def _runOperation(self, query, *args):
- """
- Run an operation on the sync db.
-
- :param query: The query to be executed.
- :type query: str
- :param args: A list of query arguments.
- :type args: list
-
- :return: A deferred that will fire when the operation in the database
- has finished.
- :rtype: twisted.internet.defer.Deferred
- """
- return self._sync_db.runOperation(query, *args)
-
- def _runQuery(self, query, *args):
- """
- Run a query on the sync db.
-
- :param query: The query to be executed.
- :type query: str
- :param args: A list of query arguments.
- :type args: list
-
- :return: A deferred that will fire with the results of the database
- query.
- :rtype: twisted.internet.defer.Deferred
- """
- return self._sync_db.runQuery(query, *args)
-
-
-def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
- """
- Encrypt the content of the given document.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param key: The encryption key.
- :type key: str
- :param secret: The Soledad storage secret (used for MAC auth).
- :type secret: str
-
- :return: A tuple containing the doc id, revision and encrypted content.
- :rtype: tuple(str, str, str)
- """
- encrypted_content = encrypt_docstr(
- content, doc_id, doc_rev, key, secret)
- return doc_id, doc_rev, encrypted_content
-
-
-class SyncEncrypterPool(SyncEncryptDecryptPool):
- """
- Pool of workers that spawn subprocesses to execute the symmetric encryption
- of documents to be synced.
- """
- TABLE_NAME = "docs_tosync"
- FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
-
- ENCRYPT_LOOP_PERIOD = 2
-
- def __init__(self, *args, **kwargs):
- """
- Initialize the sync encrypter pool.
- """
- SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- # TODO delete already synced files from database
-
- def start(self):
- """
- Start the encrypter pool.
- """
- SyncEncryptDecryptPool.start(self)
- logger.debug("starting the encryption loop...")
-
- def stop(self):
- """
- Stop the encrypter pool.
- """
-
- SyncEncryptDecryptPool.stop(self)
-
- def encrypt_doc(self, doc):
- """
- Encrypt document asynchronously then insert it on
- local staging database.
-
- :param doc: The document to be encrypted.
- :type doc: SoledadDocument
- """
- soledad_assert(self._crypto is not None, "need a crypto object")
- docstr = doc.get_json()
- key = self._crypto.doc_passphrase(doc.doc_id)
- secret = self._crypto.secret
- args = doc.doc_id, doc.rev, docstr, key, secret
- # encrypt asynchronously
- # TODO use dedicated threadpool / move to ampoule
- d = threads.deferToThread(
- encrypt_doc_task, *args)
- d.addCallback(self._encrypt_doc_cb)
- return d
-
- def _encrypt_doc_cb(self, result):
- """
- Insert results of encryption routine into the local sync database.
-
- :param result: A tuple containing the doc id, revision and encrypted
- content.
- :type result: tuple(str, str, str)
- """
- doc_id, doc_rev, content = result
- return self._insert_encrypted_local_doc(doc_id, doc_rev, content)
-
- def _insert_encrypted_local_doc(self, doc_id, doc_rev, content):
- """
- Insert the contents of the encrypted doc into the local sync
- database.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- """
- query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \
- % (self.TABLE_NAME,)
- return self._runOperation(query, (doc_id, doc_rev, content))
-
- @defer.inlineCallbacks
- def get_encrypted_doc(self, doc_id, doc_rev):
- """
- Get an encrypted document from the sync db.
-
- :param doc_id: The id of the document.
- :type doc_id: str
- :param doc_rev: The revision of the document.
- :type doc_rev: str
-
- :return: A deferred that will fire with the encrypted content of the
- document or None if the document was not found in the sync
- db.
- :rtype: twisted.internet.defer.Deferred
- """
- query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \
- % self.TABLE_NAME
- result = yield self._runQuery(query, (doc_id, doc_rev))
- if result:
- logger.debug("found doc on sync db: %s" % doc_id)
- val = result.pop()
- defer.returnValue(val[0])
- logger.debug("did not find doc on sync db: %s" % doc_id)
- defer.returnValue(None)
-
- def delete_encrypted_doc(self, doc_id, doc_rev):
- """
- Delete an encrypted document from the sync db.
-
- :param doc_id: The id of the document.
- :type doc_id: str
- :param doc_rev: The revision of the document.
- :type doc_rev: str
-
- :return: A deferred that will fire when the operation in the database
- has finished.
- :rtype: twisted.internet.defer.Deferred
- """
- query = "DELETE FROM %s WHERE doc_id=? and rev=?" \
- % self.TABLE_NAME
- self._runOperation(query, (doc_id, doc_rev))
-
-
-def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
- idx):
- """
- Decrypt the content of the given document.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The encrypted content of the document as JSON dict.
- :type content: dict
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification of
- that document.
- :type trans_id: str
- :param key: The encryption key.
- :type key: str
- :param secret: The Soledad storage secret (used for MAC auth).
- :type secret: str
- :param idx: The index of this document in the current sync process.
- :type idx: int
-
- :return: A tuple containing the doc id, revision and encrypted content.
- :rtype: tuple(str, str, str)
- """
- decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret)
- return doc_id, doc_rev, decrypted_content, gen, trans_id, idx
-
-
-class SyncDecrypterPool(SyncEncryptDecryptPool):
- """
- Pool of workers that spawn subprocesses to execute the symmetric decryption
- of documents that were received.
-
- The decryption of the received documents is done in two steps:
-
- 1. Encrypted documents are stored in the sync db by the actual soledad
- sync loop.
- 2. The soledad sync loop tells us how many documents we should expect
- to process.
- 3. We start a decrypt-and-process loop:
-
- a. Encrypted documents are fetched.
- b. Encrypted documents are decrypted.
- c. The longest possible list of decrypted documents are inserted
- in the soledad db (this depends on which documents have already
- arrived and which documents have already been decrypte, because
- the order of insertion in the local soledad db matters).
- d. Processed documents are deleted from the database.
-
- 4. When we have processed as many documents as we should, the loop
- finishes.
- """
- TABLE_NAME = "docs_received"
- FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \
- "trans_id, encrypted, idx, sync_id"
-
- """
- Period of recurrence of the periodic decrypting task, in seconds.
- """
- DECRYPT_LOOP_PERIOD = 0.5
-
- def __init__(self, *args, **kwargs):
- """
- Initialize the decrypter pool, and setup a dict for putting the
- results of the decrypted docs until they are picked by the insert
- routine that gets them in order.
-
- :param insert_doc_cb: A callback for inserting received documents from
- target. If not overriden, this will call u1db
- insert_doc_from_target in synchronizer, which
- implements the TAKE OTHER semantics.
- :type insert_doc_cb: function
- :param source_replica_uid: The source replica uid, used to find the
- correct callback for inserting documents.
- :type source_replica_uid: str
- """
- self._insert_doc_cb = kwargs.pop("insert_doc_cb")
- self.source_replica_uid = kwargs.pop("source_replica_uid")
-
- SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
-
- self._docs_to_process = None
- self._processed_docs = 0
- self._last_inserted_idx = 0
-
- self._loop = LoopingCall(self._decrypt_and_recurse)
-
- def _start_pool(self, period):
- self._loop.start(period)
-
- def start(self, docs_to_process):
- """
- Set the number of documents we expect to process.
-
- This should be called by the during the sync exchange process as soon
- as we know how many documents are arriving from the server.
-
- :param docs_to_process: The number of documents to process.
- :type docs_to_process: int
- """
- SyncEncryptDecryptPool.start(self)
- self._decrypted_docs_indexes = set()
- self._sync_id = uuid4().hex
- self._docs_to_process = docs_to_process
- self._deferred = defer.Deferred()
- d = self._init_db()
- d.addCallback(lambda _: self._start_pool(self.DECRYPT_LOOP_PERIOD))
- return d
-
- def stop(self):
- if self._loop.running:
- self._loop.stop()
- self._finish()
- SyncEncryptDecryptPool.stop(self)
-
- def _init_db(self):
- """
- Ensure sync_id column is present then
- Empty the received docs table of the sync database.
-
- :return: A deferred that will fire when the operation in the database
- has finished.
- :rtype: twisted.internet.defer.Deferred
- """
- ensure_sync_id_column = ("ALTER TABLE %s ADD COLUMN sync_id" %
- self.TABLE_NAME)
- d = self._runQuery(ensure_sync_id_column)
-
- def empty_received_docs(_):
- query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,)
- return self._runOperation(query, (self._sync_id,))
-
- d.addCallbacks(empty_received_docs, empty_received_docs)
- return d
-
- def _errback(self, failure):
- logger.error(failure)
- self._deferred.errback(failure)
- self._processed_docs = 0
- self._last_inserted_idx = 0
-
- @property
- def deferred(self):
- """
- Deferred that will be fired when the decryption loop has finished
- processing all the documents.
- """
- return self._deferred
-
- def insert_encrypted_received_doc(
- self, doc_id, doc_rev, content, gen, trans_id, idx):
- """
- Decrypt and insert a received document into local staging area to be
- processed later on.
-
- :param doc_id: The document ID.
- :type doc_id: str
- :param doc_rev: The document Revision
- :param doc_rev: str
- :param content: The content of the document
- :type content: dict
- :param gen: The document Generation
- :type gen: int
- :param trans_id: Transaction ID
- :type trans_id: str
- :param idx: The index of this document in the current sync process.
- :type idx: int
-
- :return: A deferred that will fire after the decrypted document has
- been inserted in the sync db.
- :rtype: twisted.internet.defer.Deferred
- """
- soledad_assert(self._crypto is not None, "need a crypto object")
-
- key = self._crypto.doc_passphrase(doc_id)
- secret = self._crypto.secret
- args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx
- # decrypt asynchronously
- # TODO use dedicated threadpool / move to ampoule
- d = threads.deferToThread(
- decrypt_doc_task, *args)
- # callback will insert it for later processing
- d.addCallback(self._decrypt_doc_cb)
- return d
-
- def insert_received_doc(
- self, doc_id, doc_rev, content, gen, trans_id, idx):
- """
- Insert a document that is not symmetrically encrypted.
- We store it in the staging area (the decrypted_docs dictionary) to be
- picked up in order as the preceding documents are decrypted.
-
- :param doc_id: The document id
- :type doc_id: str
- :param doc_rev: The document revision
- :param doc_rev: str or dict
- :param content: The content of the document
- :type content: dict
- :param gen: The document generation
- :type gen: int
- :param trans_id: The transaction id
- :type trans_id: str
- :param idx: The index of this document in the current sync process.
- :type idx: int
-
- :return: A deferred that will fire when the operation in the database
- has finished.
- :rtype: twisted.internet.defer.Deferred
- """
- if not isinstance(content, str):
- content = json.dumps(content)
- query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \
- % self.TABLE_NAME
- d = self._runOperation(
- query, (doc_id, doc_rev, content, gen, trans_id, 0,
- idx, self._sync_id))
- d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx))
- return d
-
- def _delete_received_docs(self, doc_ids):
- """
- Delete a list of received docs after get them inserted into the db.
-
- :param doc_id: Document ID list.
- :type doc_id: list
-
- :return: A deferred that will fire when the operation in the database
- has finished.
- :rtype: twisted.internet.defer.Deferred
- """
- placeholders = ', '.join('?' for _ in doc_ids)
- query = "DELETE FROM '%s' WHERE doc_id in (%s)" \
- % (self.TABLE_NAME, placeholders)
- return self._runOperation(query, (doc_ids))
-
- def _decrypt_doc_cb(self, result):
- """
- Store the decryption result in the sync db from where it will later be
- picked by _process_decrypted_docs.
-
- :param result: A tuple containing the document's id, revision,
- content, generation, transaction id and sync index.
- :type result: tuple(str, str, str, int, str, int)
-
- :return: A deferred that will fire after the document has been
- inserted in the sync db.
- :rtype: twisted.internet.defer.Deferred
- """
- doc_id, rev, content, gen, trans_id, idx = result
- logger.debug("sync decrypter pool: decrypted doc %s: %s %s %s"
- % (doc_id, rev, gen, trans_id))
- return self.insert_received_doc(
- doc_id, rev, content, gen, trans_id, idx)
-
- def _get_docs(self, encrypted=None, sequence=None):
- """
- Get documents from the received docs table in the sync db.
-
- :param encrypted: If not None, only return documents with encrypted
- field equal to given parameter.
- :type encrypted: bool or None
- :param order_by: The name of the field to order results.
-
- :return: A deferred that will fire with the results of the database
- query.
- :rtype: twisted.internet.defer.Deferred
- """
- query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \
- "idx FROM %s" % self.TABLE_NAME
- parameters = []
- if encrypted or sequence:
- query += " WHERE sync_id = ? and"
- parameters += [self._sync_id]
- if encrypted:
- query += " encrypted = ?"
- parameters += [int(encrypted)]
- if sequence:
- query += " idx in (" + ', '.join('?' * len(sequence)) + ")"
- parameters += [int(i) for i in sequence]
- query += " ORDER BY idx ASC"
- return self._runQuery(query, parameters)
-
- @defer.inlineCallbacks
- def _get_insertable_docs(self):
- """
- Return a list of non-encrypted documents ready to be inserted.
-
- :return: A deferred that will fire with the list of insertable
- documents.
- :rtype: twisted.internet.defer.Deferred
- """
- # Here, check in memory what are the insertable indexes that can
- # form a sequence starting from the last inserted index
- sequence = []
- insertable_docs = []
- next_index = self._last_inserted_idx + 1
- while next_index in self._decrypted_docs_indexes:
- sequence.append(str(next_index))
- next_index += 1
- if len(sequence) > 900:
- # 999 is the default value of SQLITE_MAX_VARIABLE_NUMBER
- # if we try to query more, SQLite will refuse
- # we need to find a way to improve this
- # being researched in #7669
- break
- # Then fetch all the ones ready for insertion.
- if sequence:
- insertable_docs = yield self._get_docs(encrypted=False,
- sequence=sequence)
- defer.returnValue(insertable_docs)
-
- @defer.inlineCallbacks
- def _process_decrypted_docs(self):
- """
- Fetch as many decrypted documents as can be taken from the expected
- order and insert them in the local replica.
-
- :return: A deferred that will fire with the list of inserted
- documents.
- :rtype: twisted.internet.defer.Deferred
- """
- insertable = yield self._get_insertable_docs()
- processed_docs_ids = []
- for doc_fields in insertable:
- method = self._insert_decrypted_local_doc
- # FIXME: This is used only because SQLCipherU1DBSync is synchronous
- # When adbapi is used there is no need for an external thread
- # Without this the reactor can freeze and fail docs download
- yield threads.deferToThread(method, *doc_fields)
- processed_docs_ids.append(doc_fields[0])
- yield self._delete_received_docs(processed_docs_ids)
-
- def _insert_decrypted_local_doc(self, doc_id, doc_rev, content,
- gen, trans_id, encrypted, idx):
- """
- Insert the decrypted document into the local replica.
-
- Make use of the passed callback `insert_doc_cb` passed to the caller
- by u1db sync.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification
- of that document.
- :type trans_id: str
- """
- # could pass source_replica in params for callback chain
- logger.debug("sync decrypter pool: inserting doc in local db: "
- "%s:%s %s" % (doc_id, doc_rev, gen))
-
- # convert deleted documents to avoid error on document creation
- if content == 'null':
- content = None
- doc = SoledadDocument(doc_id, doc_rev, content)
- gen = int(gen)
- self._insert_doc_cb(doc, gen, trans_id)
-
- # store info about processed docs
- self._last_inserted_idx = idx
- self._processed_docs += 1
-
- @defer.inlineCallbacks
- def _decrypt_and_recurse(self):
- """
- Decrypt the documents received from remote replica and insert them
- into the local one.
-
- This method implicitelly returns a defferred (see the decorator
- above). It should only be called by _launch_decrypt_and_process().
- because this way any exceptions raised here will be stored by the
- errback attached to the deferred returned.
-
- :return: A deferred which will fire after all decrypt, process and
- delete operations have been executed.
- :rtype: twisted.internet.defer.Deferred
- """
- if not self.running:
- defer.returnValue(None)
- processed = self._processed_docs
- pending = self._docs_to_process
-
- if processed < pending:
- yield self._process_decrypted_docs()
- else:
- self._finish()
-
- def _finish(self):
- self._processed_docs = 0
- self._last_inserted_idx = 0
- self._decrypted_docs_indexes = set()
- if not self._deferred.called:
- self._deferred.callback(None)
diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py
index 4fc91d9d..92bc85d6 100644
--- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py
+++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py
@@ -58,6 +58,7 @@ def debug(*args):
if not silent:
print(*args)
+
debug("[+] db path:", tmpdb)
debug("[+] num docs", numdocs)
@@ -74,6 +75,7 @@ dbpool = adbapi.getConnectionPool(opts)
def createDoc(doc):
return dbpool.runU1DBQuery("create_doc", doc)
+
db_indexes = {
'by-chash': ['chash'],
'by-number': ['number']}
@@ -168,6 +170,7 @@ def insert_docs(_):
deferreds.append(d)
return defer.gatherResults(deferreds, consumeErrors=True)
+
d = create_indexes(None)
d.addCallback(insert_docs)
d.addCallback(get_from_index)
diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py
index 38ea18a3..429566c7 100644
--- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py
+++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py
@@ -58,6 +58,7 @@ def debug(*args):
if not silent:
print(*args)
+
debug("[+] db path:", tmpdb)
debug("[+] num docs", numdocs)
@@ -74,6 +75,7 @@ dbpool = adbapi.getConnectionPool(opts)
def createDoc(doc, doc_id):
return dbpool.runU1DBQuery("create_doc", doc, doc_id=doc_id)
+
db_indexes = {
'by-chash': ['chash'],
'by-number': ['number']}
@@ -168,6 +170,7 @@ def insert_docs(_):
deferreds.append(d)
return defer.gatherResults(deferreds, consumeErrors=True)
+
d = create_indexes(None)
d.addCallback(insert_docs)
d.addCallback(get_from_index)
diff --git a/client/src/leap/soledad/client/examples/run_benchmark.py b/client/src/leap/soledad/client/examples/run_benchmark.py
index 61621e89..ddedf433 100644
--- a/client/src/leap/soledad/client/examples/run_benchmark.py
+++ b/client/src/leap/soledad/client/examples/run_benchmark.py
@@ -14,6 +14,7 @@ cmd = "SILENT=1 TIMES={times} TMPDIR={tmpdir} python ./use_{version}api.py"
def parse_time(r):
return r.split('\n')[-1]
+
with open(CSVFILE, 'w') as log:
for times in range(0, 10000, 500):
diff --git a/client/src/leap/soledad/client/examples/soledad_sync.py b/client/src/leap/soledad/client/examples/soledad_sync.py
index 63077ee3..3aed10eb 100644
--- a/client/src/leap/soledad/client/examples/soledad_sync.py
+++ b/client/src/leap/soledad/client/examples/soledad_sync.py
@@ -40,7 +40,7 @@ def init_soledad(_):
global soledad
soledad = Soledad(uuid, _pass, secrets_path, local_db_path,
server_url, cert_file,
- auth_token=token, defer_encryption=False)
+ auth_token=token)
def getall(_):
d = soledad.get_all_docs()
diff --git a/client/src/leap/soledad/client/examples/use_adbapi.py b/client/src/leap/soledad/client/examples/use_adbapi.py
index a2683836..39301b41 100644
--- a/client/src/leap/soledad/client/examples/use_adbapi.py
+++ b/client/src/leap/soledad/client/examples/use_adbapi.py
@@ -39,6 +39,7 @@ def debug(*args):
if not silent:
print(*args)
+
debug("[+] db path:", tmpdb)
debug("[+] times", times)
@@ -87,6 +88,7 @@ def allDone(_):
print((end_time - start_time).total_seconds())
reactor.stop()
+
deferreds = []
payload = open('manifest.phk').read()
diff --git a/client/src/leap/soledad/client/examples/use_api.py b/client/src/leap/soledad/client/examples/use_api.py
index e2501c98..db77c4b3 100644
--- a/client/src/leap/soledad/client/examples/use_api.py
+++ b/client/src/leap/soledad/client/examples/use_api.py
@@ -36,6 +36,7 @@ def debug(*args):
if not silent:
print(*args)
+
debug("[+] db path:", tmpdb)
debug("[+] times", times)
@@ -52,6 +53,7 @@ db = sqlcipher.SQLCipherDatabase(opts)
def allDone():
debug("ALL DONE!")
+
payload = open('manifest.phk').read()
for i in range(times):
diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py
index 62e8bcf0..0e250bf1 100644
--- a/client/src/leap/soledad/client/http_target/__init__.py
+++ b/client/src/leap/soledad/client/http_target/__init__.py
@@ -25,10 +25,13 @@ after receiving.
import os
from leap.soledad.common.log import getLogger
-from leap.common.http import HTTPClient
+from leap.common.certs import get_compatible_ssl_context_factory
+from twisted.web.client import Agent
+from twisted.internet import reactor
from leap.soledad.client.http_target.send import HTTPDocSender
from leap.soledad.client.http_target.api import SyncTargetAPI
from leap.soledad.client.http_target.fetch import HTTPDocFetcher
+from leap.soledad.client import crypto as old_crypto
logger = getLogger(__name__)
@@ -51,8 +54,7 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):
the parsed documents that the remote send us, before being decrypted and
written to the main database.
"""
- def __init__(self, url, source_replica_uid, creds, crypto, cert_file,
- sync_db=None, sync_enc_pool=None):
+ def __init__(self, url, source_replica_uid, creds, crypto, cert_file):
"""
Initialize the sync target.
@@ -65,21 +67,11 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):
:type creds: creds
:param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
document contents when syncing.
- :type crypto: soledad.crypto.SoledadCrypto
+ :type crypto: soledad._crypto.SoledadCrypto
:param cert_file: Path to the certificate of the ca used to validate
the SSL certificate used by the remote soledad
server.
:type cert_file: str
- :param sync_db: Optional. handler for the db with the symmetric
- encryption of the syncing documents. If
- None, encryption will be done in-place,
- instead of retreiving it from the dedicated
- database.
- :type sync_db: Sqlite handler
- :param sync_enc_pool: The encryption pool to use to defer encryption.
- If None is passed the encryption will not be
- deferred.
- :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool
"""
if url.endswith("/"):
url = url[:-1]
@@ -89,17 +81,13 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):
self._uuid = None
self.set_creds(creds)
self._crypto = crypto
- self._sync_db = sync_db
- self._sync_enc_pool = sync_enc_pool
+ # TODO: DEPRECATED CRYPTO
+ self._deprecated_crypto = old_crypto.SoledadCrypto(crypto.secret)
self._insert_doc_cb = None
- # asynchronous encryption/decryption attributes
- self._decryption_callback = None
- self._sync_decr_pool = None
-
- # XXX Increasing timeout of simple requests to avoid chances of hitting
- # the duplicated syncing bug. This could be reduced to the 30s default
- # after implementing Cancellable Sync. See #7382
- self._http = HTTPClient(cert_file, timeout=90)
+
+ # Twisted default Agent with our own ssl context factory
+ self._http = Agent(reactor,
+ get_compatible_ssl_context_factory(cert_file))
if DO_STATS:
self.sync_exchange_phase = [0]
diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py
index 3c8e3764..1b086a00 100644
--- a/client/src/leap/soledad/client/http_target/api.py
+++ b/client/src/leap/soledad/client/http_target/api.py
@@ -18,10 +18,13 @@ import os
import json
import base64
+from StringIO import StringIO
from uuid import uuid4
from twisted.web.error import Error
from twisted.internet import defer
+from twisted.web.http_headers import Headers
+from twisted.web.client import FileBodyProducer
from leap.soledad.client.http_target.support import readBody
from leap.soledad.common.errors import InvalidAuthTokenError
@@ -39,14 +42,6 @@ class SyncTargetAPI(SyncTarget):
Declares public methods and implements u1db.SyncTarget.
"""
- @defer.inlineCallbacks
- def close(self):
- if self._sync_enc_pool:
- self._sync_enc_pool.stop()
- if self._sync_decr_pool:
- self._sync_decr_pool.stop()
- yield self._http.close()
-
@property
def uuid(self):
return self._uuid
@@ -69,16 +64,20 @@ class SyncTargetAPI(SyncTarget):
def _base_header(self):
return self._auth_header.copy() if self._auth_header else {}
- @property
- def _defer_encryption(self):
- return self._sync_enc_pool is not None
-
def _http_request(self, url, method='GET', body=None, headers=None,
- content_type=None):
+ content_type=None, body_reader=readBody,
+ body_producer=None):
headers = headers or self._base_header
if content_type:
headers.update({'content-type': [content_type]})
- d = self._http.request(url, method, body, headers, readBody)
+ if not body_producer and body:
+ body = FileBodyProducer(StringIO(body))
+ elif body_producer:
+ # Upload case, check send.py
+ body = body_producer(body)
+ d = self._http.request(
+ method, url, headers=Headers(headers), bodyProducer=body)
+ d.addCallback(body_reader)
d.addErrback(_unauth_to_invalid_token_error)
return d
@@ -153,7 +152,7 @@ class SyncTargetAPI(SyncTarget):
def sync_exchange(self, docs_by_generation, source_replica_uid,
last_known_generation, last_known_trans_id,
insert_doc_cb, ensure_callback=None,
- defer_decryption=True, sync_id=None):
+ sync_id=None):
"""
Find out which documents the remote database does not know about,
encrypt and send them. After that, receive documents from the remote
@@ -185,11 +184,6 @@ class SyncTargetAPI(SyncTarget):
created.
:type ensure_callback: function
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
- :type defer_decryption: bool
-
:return: A deferred which fires with the new generation and
transaction id of the target replica.
:rtype: twisted.internet.defer.Deferred
@@ -221,8 +215,7 @@ class SyncTargetAPI(SyncTarget):
cur_target_gen, cur_target_trans_id = yield self._receive_docs(
last_known_generation, last_known_trans_id,
- ensure_callback, sync_id,
- defer_decryption=defer_decryption)
+ ensure_callback, sync_id)
# update gen and trans id info in case we just sent and did not
# receive docs.
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 184c5883..8676ceed 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -15,18 +15,19 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
-
from twisted.internet import defer
+from twisted.internet import threads
from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
from leap.soledad.client.events import emit_async
-from leap.soledad.client.crypto import is_symmetrically_encrypted
-from leap.soledad.client.encdecpool import SyncDecrypterPool
from leap.soledad.client.http_target.support import RequestBody
from leap.soledad.common.log import getLogger
+from leap.soledad.client._crypto import is_symmetrically_encrypted
from leap.soledad.common.document import SoledadDocument
from leap.soledad.common.l2db import errors
-from leap.soledad.common.l2db.remote import utils
+from leap.soledad.client import crypto as old_crypto
+
+from . import fetch_protocol
logger = getLogger(__name__)
@@ -50,208 +51,105 @@ class HTTPDocFetcher(object):
@defer.inlineCallbacks
def _receive_docs(self, last_known_generation, last_known_trans_id,
- ensure_callback, sync_id, defer_decryption):
-
- self._queue_for_decrypt = defer_decryption \
- and self._sync_db is not None
-
+ ensure_callback, sync_id):
new_generation = last_known_generation
new_transaction_id = last_known_trans_id
-
- if self._queue_for_decrypt:
- logger.debug(
- "Soledad sync: will queue received docs for decrypting.")
-
- if defer_decryption:
- self._setup_sync_decr_pool()
-
- # ---------------------------------------------------------------------
- # maybe receive the first document
- # ---------------------------------------------------------------------
-
- # we fetch the first document before fetching the rest because we need
- # to know the total number of documents to be received, and this
- # information comes as metadata to each request.
-
- doc = yield self._receive_one_doc(
+ # Acts as a queue, ensuring line order on async processing
+ # as `self._insert_doc_cb` cant be run concurrently or out of order.
+ # DeferredSemaphore solves the concurrency and its implementation uses
+ # a queue, solving the ordering.
+ # FIXME: Find a proper solution to avoid surprises on Twisted changes
+ self.semaphore = defer.DeferredSemaphore(1)
+
+ metadata = yield self._fetch_all(
last_known_generation, last_known_trans_id,
- sync_id, 0)
- self._received_docs = 0
- number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1)
+ sync_id)
+ number_of_changes, ngen, ntrans = self._parse_metadata(metadata)
+
+ # wait for pending inserts
+ yield self.semaphore.acquire()
if ngen:
new_generation = ngen
new_transaction_id = ntrans
- # ---------------------------------------------------------------------
- # maybe receive the rest of the documents
- # ---------------------------------------------------------------------
-
- # launch many asynchronous fetches and inserts of received documents
- # in the temporary sync db. Will wait for all results before
- # continuing.
-
- received = 1
- deferreds = []
- while received < number_of_changes:
- d = self._receive_one_doc(
- last_known_generation,
- last_known_trans_id, sync_id, received)
- d.addCallback(
- self._insert_received_doc,
- received + 1, # the index of the current received doc
- number_of_changes)
- deferreds.append(d)
- received += 1
- results = yield defer.gatherResults(deferreds)
-
- # get generation and transaction id of target after insertions
- if deferreds:
- _, new_generation, new_transaction_id = results.pop()
-
- # ---------------------------------------------------------------------
- # wait for async decryption to finish
- # ---------------------------------------------------------------------
-
- if defer_decryption:
- yield self._sync_decr_pool.deferred
- self._sync_decr_pool.stop()
-
defer.returnValue([new_generation, new_transaction_id])
- def _receive_one_doc(self, last_known_generation,
- last_known_trans_id, sync_id, received):
+ def _fetch_all(self, last_known_generation,
+ last_known_trans_id, sync_id):
# add remote replica metadata to the request
body = RequestBody(
last_known_generation=last_known_generation,
last_known_trans_id=last_known_trans_id,
sync_id=sync_id,
ensure=self._ensure_callback is not None)
- # inform server of how many documents have already been received
- body.insert_info(received=received)
- # send headers
+ self._received_docs = 0
+ # build a stream reader with _doc_parser as a callback
+ body_reader = fetch_protocol.build_body_reader(self._doc_parser)
+ # start download stream
return self._http_request(
self._url,
method='POST',
body=str(body),
- content_type='application/x-soledad-sync-get')
+ content_type='application/x-soledad-sync-get',
+ body_reader=body_reader)
- def _insert_received_doc(self, response, idx, total):
+ @defer.inlineCallbacks
+ def _doc_parser(self, doc_info, content, total):
"""
- Insert a received document into the local replica.
-
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
- :param idx: The index count of the current operation.
- :type idx: int
+ Insert a received document into the local replica, decrypting
+ if necessary. The case where it's not decrypted is when a doc gets
+ inserted from Server side with a GPG encrypted content.
+
+ :param doc_info: Dictionary representing Document information.
+ :type doc_info: dict
+ :param content: The Document's content.
+ :type idx: str
:param total: The total number of operations.
:type total: int
"""
- new_generation, new_transaction_id, number_of_changes, doc_id, \
- rev, content, gen, trans_id = \
- self._parse_received_doc_response(response)
+ yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content,
+ total)
- if self._sync_decr_pool and not self._sync_decr_pool.running:
- self._sync_decr_pool.start(number_of_changes)
-
- if doc_id is not None:
- # decrypt incoming document and insert into local database
- # -------------------------------------------------------------
- # symmetric decryption of document's contents
- # -------------------------------------------------------------
- # If arriving content was symmetrically encrypted, we decrypt it.
- # We do it inline if defer_decryption flag is False or no sync_db
- # was defined, otherwise we defer it writing it to the received
- # docs table.
- doc = SoledadDocument(doc_id, rev, content)
- if is_symmetrically_encrypted(doc):
- if self._queue_for_decrypt:
- self._sync_decr_pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id,
- idx)
- else:
- # defer_decryption is False or no-sync-db fallback
- doc.set_json(self._crypto.decrypt_doc(doc))
- self._insert_doc_cb(doc, gen, trans_id)
- else:
- # not symmetrically encrypted doc, insert it directly
- # or save it in the decrypted stage.
- if self._queue_for_decrypt:
- self._sync_decr_pool.insert_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id,
- idx)
- else:
- self._insert_doc_cb(doc, gen, trans_id)
- # -------------------------------------------------------------
- # end of symmetric decryption
- # -------------------------------------------------------------
+ @defer.inlineCallbacks
+ def __atomic_doc_parse(self, doc_info, content, total):
+ doc = SoledadDocument(doc_info['id'], doc_info['rev'], content)
+ if is_symmetrically_encrypted(content):
+ content = yield self._crypto.decrypt_doc(doc)
+ elif old_crypto.is_symmetrically_encrypted(doc):
+ content = self._deprecated_crypto.decrypt_doc(doc)
+ doc.set_json(content)
+
+ # TODO insert blobs here on the blob backend
+ # FIXME: This is wrong. Using the very same SQLite connection object
+ # from multiple threads is dangerous. We should bring the dbpool here
+ # or find an alternative. Deferring to a thread only helps releasing
+ # the reactor for other tasks as this is an IO intensive call.
+ yield threads.deferToThread(self._insert_doc_cb,
+ doc, doc_info['gen'], doc_info['trans_id'])
self._received_docs += 1
user_data = {'uuid': self.uuid, 'userid': self.userid}
- _emit_receive_status(user_data, self._received_docs, total)
- return number_of_changes, new_generation, new_transaction_id
+ _emit_receive_status(user_data, self._received_docs, total=total)
- def _parse_received_doc_response(self, response):
+ def _parse_metadata(self, metadata):
"""
- Parse the response from the server containing the received document.
+ Parse the response from the server containing the sync metadata.
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
+ :param response: Metadata as string
+ :type response: str
- :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev,
- content, gen, trans_id)
+ :return: (number_of_changes, new_gen, new_trans_id)
:rtype: tuple
"""
- # decode incoming stream
- parts = response.splitlines()
- if not parts or parts[0] != '[' or parts[-1] != ']':
- raise errors.BrokenSyncStream
- data = parts[1:-1]
- # decode metadata
- try:
- line, comma = utils.check_and_strip_comma(data[0])
- metadata = None
- except (IndexError):
- raise errors.BrokenSyncStream
try:
- metadata = json.loads(line)
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
- number_of_changes = metadata['number_of_changes']
+ metadata = json.loads(metadata)
+ # make sure we have replica_uid from fresh new dbs
+ if self._ensure_callback and 'replica_uid' in metadata:
+ self._ensure_callback(metadata['replica_uid'])
+ return (metadata['number_of_changes'], metadata['new_generation'],
+ metadata['new_transaction_id'])
except (ValueError, KeyError):
- raise errors.BrokenSyncStream
- # make sure we have replica_uid from fresh new dbs
- if self._ensure_callback and 'replica_uid' in metadata:
- self._ensure_callback(metadata['replica_uid'])
- # parse incoming document info
- doc_id = None
- rev = None
- content = None
- gen = None
- trans_id = None
- if number_of_changes > 0:
- try:
- entry = json.loads(data[1])
- doc_id = entry['id']
- rev = entry['rev']
- content = entry['content']
- gen = entry['gen']
- trans_id = entry['trans_id']
- except (IndexError, KeyError):
- raise errors.BrokenSyncStream
- return new_generation, new_transaction_id, number_of_changes, \
- doc_id, rev, content, gen, trans_id
-
- def _setup_sync_decr_pool(self):
- """
- Set up the SyncDecrypterPool for deferred decryption.
- """
- if self._sync_decr_pool is None and self._sync_db is not None:
- # initialize syncing queue decryption pool
- self._sync_decr_pool = SyncDecrypterPool(
- self._crypto,
- self._sync_db,
- insert_doc_cb=self._insert_doc_cb,
- source_replica_uid=self.source_replica_uid)
+ raise errors.BrokenSyncStream('Metadata parsing failed')
def _emit_receive_status(user_data, received_docs, total):
diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py
new file mode 100644
index 00000000..fa6b1969
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py
@@ -0,0 +1,159 @@
+# -*- coding: utf-8 -*-
+# fetch_protocol.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+from functools import partial
+from cStringIO import StringIO
+from twisted.web._newclient import ResponseDone
+from leap.soledad.common.l2db import errors
+from leap.soledad.common.l2db.remote import utils
+from leap.soledad.common.log import getLogger
+from .support import ReadBodyProtocol
+from .support import readBody
+
+logger = getLogger(__name__)
+
+
+class DocStreamReceiver(ReadBodyProtocol):
+ """
+ A protocol implementation that can parse incoming data from server based
+ on a line format specified on u1db implementation. Except that we split doc
+ attributes from content to ease parsing and increment throughput for larger
+ documents.
+ [\r\n
+ {metadata},\r\n
+ {doc_info},\r\n
+ {content},\r\n
+ ...
+ {doc_info},\r\n
+ {content},\r\n
+ ]
+ """
+
+ def __init__(self, response, deferred, doc_reader):
+ self.deferred = deferred
+ self.status = response.code if response else None
+ self.message = response.phrase if response else None
+ self.headers = response.headers if response else {}
+ self.delimiter = '\r\n'
+ self.metadata = ''
+ self._doc_reader = doc_reader
+ self.reset()
+
+ def reset(self):
+ self._line = 0
+ self._buffer = StringIO()
+ self._properly_finished = False
+
+ def connectionLost(self, reason):
+ """
+ Deliver the accumulated response bytes to the waiting L{Deferred}, if
+ the response body has been completely received without error.
+ """
+ try:
+ if reason.check(ResponseDone):
+ self.dataBuffer = self.metadata
+ else:
+ self.dataBuffer = self.finish()
+ except errors.BrokenSyncStream, e:
+ return self.deferred.errback(e)
+ return ReadBodyProtocol.connectionLost(self, reason)
+
+ def consumeBufferLines(self):
+ """
+ Consumes lines from buffer and rewind it, writing remaining data
+ that didn't formed a line back into buffer.
+ """
+ content = self._buffer.getvalue()[0:self._buffer.tell()]
+ self._buffer.seek(0)
+ lines = content.split(self.delimiter)
+ self._buffer.write(lines.pop(-1))
+ return lines
+
+ def dataReceived(self, data):
+ """
+ Buffer incoming data until a line breaks comes in. We check only
+ the incoming data for efficiency.
+ """
+ self._buffer.write(data)
+ if '\n' not in data:
+ return
+ lines = self.consumeBufferLines()
+ while lines:
+ line, _ = utils.check_and_strip_comma(lines.pop(0))
+ self.lineReceived(line)
+ self._line += 1
+
+ def lineReceived(self, line):
+ """
+ Protocol implementation.
+ 0: [\r\n
+ 1: {metadata},\r\n
+ (even): {doc_info},\r\n
+ (odd): {data},\r\n
+ (last): ]
+ """
+ if self._properly_finished:
+ raise errors.BrokenSyncStream("Reading a finished stream")
+ if ']' == line:
+ self._properly_finished = True
+ elif self._line == 0:
+ if line is not '[':
+ raise errors.BrokenSyncStream("Invalid start")
+ elif self._line == 1:
+ self.metadata = line
+ if 'error' in self.metadata:
+ raise errors.BrokenSyncStream("Error from server: %s" % line)
+ self.total = json.loads(line).get('number_of_changes', -1)
+ elif (self._line % 2) == 0:
+ self.current_doc = json.loads(line)
+ if 'error' in self.current_doc:
+ raise errors.BrokenSyncStream("Error from server: %s" % line)
+ else:
+ d = self._doc_reader(
+ self.current_doc, line.strip() or None, self.total)
+ d.addErrback(self._error)
+
+ def _error(self, reason):
+ logger.error(reason)
+ self.transport.loseConnection()
+
+ def finish(self):
+ """
+ Checks that ']' came and stream was properly closed.
+ """
+ if not self._properly_finished:
+ raise errors.BrokenSyncStream('Stream not properly closed')
+ content = self._buffer.getvalue()[0:self._buffer.tell()]
+ self._buffer.close()
+ return content
+
+
+def build_body_reader(doc_reader):
+ """
+ Get the documents from a sync stream and call doc_reader on each
+ doc received.
+
+ @param doc_reader: Function to be called for processing an incoming doc.
+ Will be called with doc metadata (dict parsed from 1st line) and doc
+ content (string)
+ @type doc_reader: function
+
+ @return: A function that can be called by the http Agent to create and
+ configure the proper protocol.
+ """
+ protocolClass = partial(DocStreamReceiver, doc_reader=doc_reader)
+ return partial(readBody, protocolClass=protocolClass)
diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py
index c7bd057e..2b286ec5 100644
--- a/client/src/leap/soledad/client/http_target/send.py
+++ b/client/src/leap/soledad/client/http_target/send.py
@@ -22,6 +22,7 @@ from leap.soledad.common.log import getLogger
from leap.soledad.client.events import emit_async
from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
from leap.soledad.client.http_target.support import RequestBody
+from .send_protocol import DocStreamProducer
logger = getLogger(__name__)
@@ -32,8 +33,6 @@ class HTTPDocSender(object):
They need to be encrypted and metadata prepared before sending.
"""
- MAX_BATCH_SIZE = 0 # disabled by now, this is being tested yet
-
# The uuid of the local replica.
# Any class inheriting from this one should provide a meaningful attribute
# if the sync status event is meant to be used somewhere else.
@@ -54,73 +53,50 @@ class HTTPDocSender(object):
last_known_trans_id=last_known_trans_id,
sync_id=sync_id,
ensure=self._ensure_callback is not None)
- total = len(docs_by_generation)
- while body.consumed < total:
- result = yield self._send_batch(total, body, docs_by_generation)
+ result = yield self._send_batch(body, docs_by_generation)
response_dict = json.loads(result)[0]
gen_after_send = response_dict['new_generation']
trans_id_after_send = response_dict['new_transaction_id']
defer.returnValue([gen_after_send, trans_id_after_send])
- def _delete_sent(self, docs):
- for doc, gen, trans_id in docs:
- self._sync_enc_pool.delete_encrypted_doc(
- doc.doc_id, doc.rev)
-
@defer.inlineCallbacks
- def _send_batch(self, total, body, docs):
- sent = []
- missing = total - body.consumed
- for i in xrange(1, missing + 1):
- if body.pending_size > self.MAX_BATCH_SIZE:
- break
- idx = body.consumed + i
- entry = docs[idx - 1]
- sent.append(entry)
- yield self._prepare_one_doc(entry, body, idx, total)
- result = yield self._send_request(body.pop())
- if self._defer_encryption:
- self._delete_sent(sent)
-
+ def _send_batch(self, body, docs):
+ total, calls = len(docs), []
+ for i, entry in enumerate(docs):
+ calls.append((self._prepare_one_doc,
+ entry, body, i + 1, total))
+ result = yield self._send_request(body, calls)
_emit_send_status(self.uuid, body.consumed, total)
+
defer.returnValue(result)
- def _send_request(self, body):
+ def _send_request(self, body, calls):
return self._http_request(
self._url,
method='POST',
- body=body,
- content_type='application/x-soledad-sync-put')
+ body=(body, calls),
+ content_type='application/x-soledad-sync-put',
+ body_producer=DocStreamProducer)
@defer.inlineCallbacks
def _prepare_one_doc(self, entry, body, idx, total):
- doc, gen, trans_id = entry
- content = yield self._encrypt_doc(doc)
+ get_doc_call, gen, trans_id = entry
+ doc, content = yield self._encrypt_doc(get_doc_call)
body.insert_info(
id=doc.doc_id, rev=doc.rev, content=content, gen=gen,
trans_id=trans_id, number_of_docs=total,
doc_idx=idx)
+ _emit_send_status(self.uuid, body.consumed, total)
- def _encrypt_doc(self, doc):
- d = None
+ @defer.inlineCallbacks
+ def _encrypt_doc(self, get_doc_call):
+ f, args, kwargs = get_doc_call
+ doc = yield f(*args, **kwargs)
if doc.is_tombstone():
- d = defer.succeed(None)
- elif not self._defer_encryption:
- # fallback case, for tests
- d = defer.succeed(self._crypto.encrypt_doc(doc))
+ defer.returnValue((doc, None))
else:
-
- def _maybe_encrypt_doc_inline(doc_json):
- if doc_json is None:
- # the document is not marked as tombstone, but we got
- # nothing from the sync db. As it is not encrypted
- # yet, we force inline encryption.
- return self._crypto.encrypt_doc(doc)
- return doc_json
-
- d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev)
- d.addCallback(_maybe_encrypt_doc_inline)
- return d
+ content = yield self._crypto.encrypt_doc(doc)
+ defer.returnValue((doc, content))
def _emit_send_status(user_data, idx, total):
diff --git a/client/src/leap/soledad/client/http_target/send_protocol.py b/client/src/leap/soledad/client/http_target/send_protocol.py
new file mode 100644
index 00000000..0cb6d039
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/send_protocol.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+# send_protocol.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from zope.interface import implements
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.web.iweb import IBodyProducer
+from twisted.web.iweb import UNKNOWN_LENGTH
+
+
+class DocStreamProducer(object):
+ """
+ A producer that writes the body of a request to a consumer.
+ """
+
+ implements(IBodyProducer)
+
+ def __init__(self, producer):
+ """
+ Initialize the string produer.
+
+ :param producer: A RequestBody instance and a list of producer calls
+ :type producer: (.support.RequestBody, [(function, *args)])
+ """
+ self.body, self.producer = producer
+ self.length = UNKNOWN_LENGTH
+ self.pause = False
+ self.stop = False
+
+ @defer.inlineCallbacks
+ def startProducing(self, consumer):
+ """
+ Write the body to the consumer.
+
+ :param consumer: Any IConsumer provider.
+ :type consumer: twisted.internet.interfaces.IConsumer
+
+ :return: A Deferred that fires when production ends.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ while self.producer and not self.stop:
+ if self.pause:
+ yield self.sleep(0.001)
+ continue
+ call = self.producer.pop(0)
+ fun, args = call[0], call[1:]
+ yield fun(*args)
+ consumer.write(self.body.pop(1, leave_open=True))
+ consumer.write(self.body.pop(0)) # close stream
+
+ def sleep(self, secs):
+ d = defer.Deferred()
+ reactor.callLater(secs, d.callback, None)
+ return d
+
+ def pauseProducing(self):
+ self.pause = True
+
+ def stopProducing(self):
+ self.stop = True
+
+ def resumeProducing(self):
+ self.pause = False
diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py
index 6ec98ed4..d8d8e420 100644
--- a/client/src/leap/soledad/client/http_target/support.py
+++ b/client/src/leap/soledad/client/http_target/support.py
@@ -53,6 +53,9 @@ class ReadBodyProtocol(_ReadBodyProtocol):
if exc_cls is not None:
message = respdic.get("message")
self.deferred.errback(exc_cls(message))
+ else:
+ self.deferred.errback(
+ errors.HTTPError(self.status, respdic, self.headers))
# ---8<--- end of snippet from u1db.remote.http_client
def connectionLost(self, reason):
@@ -91,7 +94,7 @@ class ReadBodyProtocol(_ReadBodyProtocol):
self.deferred.errback(reason)
-def readBody(response):
+def readBody(response, protocolClass=ReadBodyProtocol):
"""
Get the body of an L{IResponse} and return it as a byte string.
@@ -116,7 +119,7 @@ def readBody(response):
abort()
d = defer.Deferred(cancel)
- protocol = ReadBodyProtocol(response, d)
+ protocol = protocolClass(response, d)
def getAbort():
return getattr(protocol.transport, 'abortConnection', None)
@@ -155,46 +158,49 @@ class RequestBody(object):
self.headers = header_dict
self.entries = []
self.consumed = 0
- self.pending_size = 0
def insert_info(self, **entry_dict):
"""
Dumps an entry into JSON format and add it to entries list.
+ Adds 'content' key on a new line if it's present.
:param entry_dict: Entry as a dictionary
:type entry_dict: dict
-
- :return: length of the entry after JSON dumps
- :rtype: int
"""
- entry = json.dumps(entry_dict)
+ content = ''
+ if 'content' in entry_dict:
+ content = ',\r\n' + (entry_dict['content'] or '')
+ entry = json.dumps(entry_dict) + content
self.entries.append(entry)
- self.pending_size += len(entry)
- def pop(self):
+ def pop(self, amount=10, leave_open=False):
"""
- Removes all entries and returns it formatted and ready
+ Removes entries and returns it formatted and ready
to be sent.
- :param number: number of entries to pop and format
- :type number: int
+ :param amount: number of entries to pop and format
+ :type amount: int
+
+ :param leave_open: flag to skip stream closing
+ :type amount: bool
:return: formatted body ready to be sent
:rtype: str
"""
- entries = self.entries[:]
- self.entries = []
- self.pending_size = 0
- self.consumed += len(entries)
- return self.entries_to_str(entries)
+ start = self.consumed == 0
+ amount = min([len(self.entries), amount])
+ entries = [self.entries.pop(0) for i in xrange(amount)]
+ self.consumed += amount
+ end = len(self.entries) == 0 if not leave_open else False
+ return self.entries_to_str(entries, start, end)
def __str__(self):
- return self.entries_to_str(self.entries)
+ return self.pop(len(self.entries))
def __len__(self):
return len(self.entries)
- def entries_to_str(self, entries=None):
+ def entries_to_str(self, entries=None, start=True, end=True):
"""
Format a list of entries into the body format expected
by the server.
@@ -205,6 +211,10 @@ class RequestBody(object):
:return: formatted body ready to be sent
:rtype: str
"""
- data = '[\r\n' + json.dumps(self.headers)
+ data = ''
+ if start:
+ data = '[\r\n' + json.dumps(self.headers)
data += ''.join(',\r\n' + entry for entry in entries)
- return data + '\r\n]'
+ if end:
+ data += '\r\n]'
+ return data
diff --git a/client/src/leap/soledad/client/interfaces.py b/client/src/leap/soledad/client/interfaces.py
index 14b34d24..82927ff4 100644
--- a/client/src/leap/soledad/client/interfaces.py
+++ b/client/src/leap/soledad/client/interfaces.py
@@ -321,7 +321,7 @@ class ISyncableStorage(Interface):
"Property, True if the syncer is syncing.")
token = Attribute("The authentication Token.")
- def sync(self, defer_decryption=True):
+ def sync(self):
"""
Synchronize the local encrypted replica with a remote replica.
@@ -331,11 +331,6 @@ class ISyncableStorage(Interface):
:param url: the url of the target replica to sync with
:type url: str
- :param defer_decryption:
- Whether to defer the decryption process using the intermediate
- database. If False, decryption will be done inline.
- :type defer_decryption: bool
-
:return:
A deferred that will fire with the local generation before the
synchronisation was performed.
diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py
index 1eb6f31d..3fe98c64 100644
--- a/client/src/leap/soledad/client/secrets.py
+++ b/client/src/leap/soledad/client/secrets.py
@@ -34,7 +34,7 @@ from leap.soledad.common import soledad_assert_type
from leap.soledad.common import document
from leap.soledad.common.log import getLogger
from leap.soledad.client import events
-from leap.soledad.client.crypto import encrypt_sym, decrypt_sym
+from leap.soledad.client import _crypto
logger = getLogger(__name__)
@@ -126,7 +126,7 @@ class SoledadSecrets(object):
instantiates Soledad.
"""
- IV_SEPARATOR = ":"
+ SEPARATOR = ":"
"""
A separator used for storing the encryption initial value prepended to the
ciphertext.
@@ -142,7 +142,8 @@ class SoledadSecrets(object):
KDF_SALT_KEY = 'kdf_salt'
KDF_LENGTH_KEY = 'kdf_length'
KDF_SCRYPT = 'scrypt'
- CIPHER_AES256 = 'aes256'
+ CIPHER_AES256 = 'aes256' # deprecated, AES-CTR
+ CIPHER_AES256_GCM = _crypto.ENC_METHOD.aes_256_gcm
RECOVERY_DOC_VERSION_KEY = 'version'
RECOVERY_DOC_VERSION = 1
"""
@@ -343,7 +344,7 @@ class SoledadSecrets(object):
'%s%s' %
(self._passphrase_as_string(), self._uuid)).hexdigest()
- def _export_recovery_document(self):
+ def _export_recovery_document(self, cipher=None):
"""
Export the storage secrets.
@@ -364,6 +365,9 @@ class SoledadSecrets(object):
Note that multiple storage secrets might be stored in one recovery
document.
+ :param cipher: (Optional) The ciper to use. Defaults to AES256
+ :type cipher: str
+
:return: The recovery document.
:rtype: dict
"""
@@ -371,7 +375,7 @@ class SoledadSecrets(object):
encrypted_secrets = {}
for secret_id in self._secrets:
encrypted_secrets[secret_id] = self._encrypt_storage_secret(
- self._secrets[secret_id])
+ self._secrets[secret_id], doc_cipher=cipher)
# create the recovery document
data = {
self.STORAGE_SECRETS_KEY: encrypted_secrets,
@@ -447,6 +451,7 @@ class SoledadSecrets(object):
except SecretsException as e:
logger.error("failed to decrypt storage secret: %s"
% str(e))
+ raise e
return secret_count, active_secret
def _get_secrets_from_shared_db(self):
@@ -537,18 +542,25 @@ class SoledadSecrets(object):
)
if encrypted_secret_dict[self.KDF_LENGTH_KEY] != len(key):
raise SecretsException("Wrong length of decryption key.")
- if encrypted_secret_dict[self.CIPHER_KEY] != self.CIPHER_AES256:
+ supported_ciphers = [self.CIPHER_AES256, self.CIPHER_AES256_GCM]
+ doc_cipher = encrypted_secret_dict[self.CIPHER_KEY]
+ if doc_cipher not in supported_ciphers:
raise SecretsException("Unknown cipher in stored secret.")
# recover the initial value and ciphertext
iv, ciphertext = encrypted_secret_dict[self.SECRET_KEY].split(
- self.IV_SEPARATOR, 1)
+ self.SEPARATOR, 1)
ciphertext = binascii.a2b_base64(ciphertext)
- decrypted_secret = decrypt_sym(ciphertext, key, iv)
+ try:
+ decrypted_secret = _crypto.decrypt_sym(
+ ciphertext, key, iv, doc_cipher)
+ except Exception as e:
+ logger.error(e)
+ raise SecretsException("Unable to decrypt secret.")
if encrypted_secret_dict[self.LENGTH_KEY] != len(decrypted_secret):
raise SecretsException("Wrong length of decrypted secret.")
return decrypted_secret
- def _encrypt_storage_secret(self, decrypted_secret):
+ def _encrypt_storage_secret(self, decrypted_secret, doc_cipher=None):
"""
Encrypt the storage secret.
@@ -567,6 +579,8 @@ class SoledadSecrets(object):
:param decrypted_secret: The decrypted storage secret.
:type decrypted_secret: str
+ :param cipher: (Optional) The ciper to use. Defaults to AES256
+ :type cipher: str
:return: The encrypted storage secret.
:rtype: dict
@@ -575,17 +589,18 @@ class SoledadSecrets(object):
salt = os.urandom(self.SALT_LENGTH)
# get a 256-bit key
key = scrypt.hash(self._passphrase_as_string(), salt, buflen=32)
- iv, ciphertext = encrypt_sym(decrypted_secret, key)
+ doc_cipher = doc_cipher or self.CIPHER_AES256_GCM
+ iv, ciphertext = _crypto.encrypt_sym(decrypted_secret, key, doc_cipher)
+ ciphertext = binascii.b2a_base64(ciphertext)
encrypted_secret_dict = {
# leap.soledad.crypto submodule uses AES256 for symmetric
# encryption.
self.KDF_KEY: self.KDF_SCRYPT,
self.KDF_SALT_KEY: binascii.b2a_base64(salt),
self.KDF_LENGTH_KEY: len(key),
- self.CIPHER_KEY: self.CIPHER_AES256,
+ self.CIPHER_KEY: doc_cipher,
self.LENGTH_KEY: len(decrypted_secret),
- self.SECRET_KEY: '%s%s%s' % (
- str(iv), self.IV_SEPARATOR, binascii.b2a_base64(ciphertext)),
+ self.SECRET_KEY: self.SEPARATOR.join([str(iv), ciphertext])
}
return encrypted_secret_dict
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 3921c323..c9a9444e 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -42,9 +42,7 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases
handled by Soledad should be created by SQLCipher >= 2.0.
"""
import os
-import json
-from hashlib import sha256
from functools import partial
from pysqlcipher import dbapi2 as sqlcipher_dbapi2
@@ -117,7 +115,7 @@ class SQLCipherOptions(object):
@classmethod
def copy(cls, source, path=None, key=None, create=None,
is_raw_key=None, cipher=None, kdf_iter=None,
- cipher_page_size=None, defer_encryption=None, sync_db_key=None):
+ cipher_page_size=None, sync_db_key=None):
"""
Return a copy of C{source} with parameters different than None
replaced by new values.
@@ -134,7 +132,7 @@ class SQLCipherOptions(object):
args.append(getattr(source, name))
for name in ["create", "is_raw_key", "cipher", "kdf_iter",
- "cipher_page_size", "defer_encryption", "sync_db_key"]:
+ "cipher_page_size", "sync_db_key"]:
val = local_vars[name]
if val is not None:
kwargs[name] = val
@@ -145,7 +143,7 @@ class SQLCipherOptions(object):
def __init__(self, path, key, create=True, is_raw_key=False,
cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024,
- defer_encryption=False, sync_db_key=None):
+ sync_db_key=None):
"""
:param path: The filesystem path for the database to open.
:type path: str
@@ -163,10 +161,6 @@ class SQLCipherOptions(object):
:type kdf_iter: int
:param cipher_page_size: The page size.
:type cipher_page_size: int
- :param defer_encryption:
- Whether to defer encryption/decryption of documents, or do it
- inline while syncing.
- :type defer_encryption: bool
"""
self.path = path
self.key = key
@@ -175,7 +169,6 @@ class SQLCipherOptions(object):
self.cipher = cipher
self.kdf_iter = kdf_iter
self.cipher_page_size = cipher_page_size
- self.defer_encryption = defer_encryption
self.sync_db_key = sync_db_key
def __str__(self):
@@ -201,7 +194,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
"""
A U1DB implementation that uses SQLCipher as its persistence layer.
"""
- defer_encryption = False
# The attribute _index_storage_value will be used as the lookup key for the
# implementation of the SQLCipher storage backend.
@@ -225,10 +217,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
"""
# ensure the db is encrypted if the file already exists
if os.path.isfile(opts.path):
- _assert_db_is_encrypted(opts)
-
- # connect to the sqlcipher database
- self._db_handle = initialize_sqlcipher_db(opts)
+ self._db_handle = _assert_db_is_encrypted(opts)
+ else:
+ # connect to the sqlcipher database
+ self._db_handle = initialize_sqlcipher_db(opts)
# TODO ---------------------------------------------------
# Everything else in this initialization has to be factored
@@ -267,27 +259,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE')
#
- # Document operations
- #
-
- def put_doc(self, doc):
- """
- Overwrite the put_doc method, to enqueue the modified document for
- encryption before sync.
-
- :param doc: The document to be put.
- :type doc: u1db.Document
-
- :return: The new document revision.
- :rtype: str
- """
- doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc)
- if self.defer_encryption:
- # TODO move to api?
- self._sync_enc_pool.encrypt_doc(doc)
- return doc_rev
-
- #
# SQLCipher API methods
#
@@ -425,25 +396,14 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
"""
ENCRYPT_LOOP_PERIOD = 1
- def __init__(self, opts, soledad_crypto, replica_uid, cert_file,
- defer_encryption=False, sync_db=None, sync_enc_pool=None):
+ def __init__(self, opts, soledad_crypto, replica_uid, cert_file):
self._opts = opts
self._path = opts.path
self._crypto = soledad_crypto
self.__replica_uid = replica_uid
self._cert_file = cert_file
- self._sync_enc_pool = sync_enc_pool
-
- self._sync_db = sync_db
- # we store syncers in a dictionary indexed by the target URL. We also
- # store a hash of the auth info in case auth info expires and we need
- # to rebuild the syncer for that target. The final self._syncers
- # format is the following:
- #
- # self._syncers = {'<url>': ('<auth_hash>', syncer), ...}
- self._syncers = {}
# storage for the documents received during a sync
self.received_docs = []
@@ -458,6 +418,9 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
if DO_STATS:
self.sync_phase = None
+ def commit(self):
+ self._db_handle.commit()
+
@property
def _replica_uid(self):
return str(self.__replica_uid)
@@ -477,7 +440,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
raise DatabaseAccessError(str(e))
@defer.inlineCallbacks
- def sync(self, url, creds=None, defer_decryption=True):
+ def sync(self, url, creds=None):
"""
Synchronize documents with remote replica exposed at url.
@@ -492,10 +455,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
:param creds: optional dictionary giving credentials to authorize the
operation with the server.
:type creds: dict
- :param defer_decryption:
- Whether to defer the decryption process using the intermediate
- database. If False, decryption will be done inline.
- :type defer_decryption: bool
:return:
A Deferred, that will fire with the local generation (type `int`)
@@ -507,8 +466,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self.sync_phase = syncer.sync_phase
self.syncer = syncer
self.sync_exchange_phase = syncer.sync_exchange_phase
- local_gen_before_sync = yield syncer.sync(
- defer_decryption=defer_decryption)
+ local_gen_before_sync = yield syncer.sync()
self.received_docs = syncer.received_docs
defer.returnValue(local_gen_before_sync)
@@ -525,29 +483,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
:return: A synchronizer.
:rtype: Synchronizer
"""
- # we want to store at most one syncer for each url, so we also store a
- # hash of the connection credentials and replace the stored syncer for
- # a certain url if credentials have changed.
- h = sha256(json.dumps([url, creds])).hexdigest()
- cur_h, syncer = self._syncers.get(url, (None, None))
- if syncer is None or h != cur_h:
- syncer = SoledadSynchronizer(
- self,
- SoledadHTTPSyncTarget(
- url,
- # XXX is the replica_uid ready?
- self._replica_uid,
- creds=creds,
- crypto=self._crypto,
- cert_file=self._cert_file,
- sync_db=self._sync_db,
- sync_enc_pool=self._sync_enc_pool))
- self._syncers[url] = (h, syncer)
- # in order to reuse the same synchronizer multiple times we have to
- # reset its state (i.e. the number of documents received from target
- # and inserted in the local replica).
- syncer.num_inserted = 0
- return syncer
+ return SoledadSynchronizer(
+ self,
+ SoledadHTTPSyncTarget(
+ url,
+ # XXX is the replica_uid ready?
+ self._replica_uid,
+ creds=creds,
+ crypto=self._crypto,
+ cert_file=self._cert_file))
#
# Symmetric encryption of syncing docs
@@ -558,18 +502,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# XXX this SHOULD BE a callback
return self._get_generation()
- def close(self):
- """
- Close the syncer and syncdb orderly
- """
- super(SQLCipherU1DBSync, self).close()
- # close all open syncers
- for url in self._syncers.keys():
- _, syncer = self._syncers[url]
- syncer.close()
- del self._syncers[url]
- self.running = False
-
class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):
"""
@@ -599,14 +531,12 @@ class SoledadSQLCipherWrapper(SQLCipherDatabase):
It can be used from adbapi to initialize a soledad database after
getting a regular connection to a sqlcipher database.
"""
- def __init__(self, conn, opts, sync_enc_pool):
+ def __init__(self, conn, opts):
self._db_handle = conn
self._real_replica_uid = None
self._ensure_schema()
self.set_document_factory(soledad_doc_factory)
self._prime_replica_uid()
- self.defer_encryption = opts.defer_encryption
- self._sync_enc_pool = sync_enc_pool
def _assert_db_is_encrypted(opts):
@@ -635,7 +565,7 @@ def _assert_db_is_encrypted(opts):
# assert that we can access it using SQLCipher with the given
# key
dummy_query = ('SELECT count(*) FROM sqlite_master',)
- initialize_sqlcipher_db(opts, on_init=dummy_query)
+ return initialize_sqlcipher_db(opts, on_init=dummy_query)
else:
raise DatabaseIsNotEncrypted()
@@ -660,6 +590,7 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False,
return SoledadDocument(doc_id=doc_id, rev=rev, json=json,
has_conflicts=has_conflicts, syncable=syncable)
+
sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index 7ed5f693..70c841d6 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -56,23 +56,10 @@ class SoledadSynchronizer(Synchronizer):
self.sync_exchange_phase = None
@defer.inlineCallbacks
- def sync(self, defer_decryption=True):
+ def sync(self):
"""
Synchronize documents between source and target.
- Differently from u1db `Synchronizer.sync` method, this one allows to
- pass a `defer_decryption` flag that will postpone the last
- step in the synchronization dance, namely, the setting of the last
- known generation and transaction id for a given remote replica.
-
- This is done to allow the ongoing parallel decryption of the incoming
- docs to proceed without `InvalidGeneration` conflicts.
-
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
- :type defer_decryption: bool
-
:return: A deferred which will fire after the sync has finished with
the local generation before the synchronization was performed.
:rtype: twisted.internet.defer.Deferred
@@ -154,28 +141,19 @@ class SoledadSynchronizer(Synchronizer):
self.sync_phase[0] += 1
# --------------------------------------------------------------------
- # prepare to send all the changed docs
- changed_doc_ids = [doc_id for doc_id, _, _ in changes]
- docs_to_send = self.source.get_docs(
- changed_doc_ids, check_for_conflicts=False, include_deleted=True)
- ids_sent = []
- docs_by_generation = []
- idx = 0
- for doc in docs_to_send:
- _, gen, trans = changes[idx]
- docs_by_generation.append((doc, gen, trans))
- idx += 1
- ids_sent.append(doc.doc_id)
+ docs_by_generation = self._docs_by_gen_from_changes(changes)
# exchange documents and try to insert the returned ones with
# the target, return target synced-up-to gen.
new_gen, new_trans_id = yield sync_target.sync_exchange(
docs_by_generation, self.source._replica_uid,
target_last_known_gen, target_last_known_trans_id,
- self._insert_doc_from_target, ensure_callback=ensure_callback,
- defer_decryption=defer_decryption)
+ self._insert_doc_from_target, ensure_callback=ensure_callback)
+ ids_sent = [doc_id for doc_id, _, _ in changes]
logger.debug("target gen after sync: %d" % new_gen)
logger.debug("target trans_id after sync: %s" % new_trans_id)
+ if hasattr(self.source, 'commit'): # sqlcipher backend speed up
+ self.source.commit() # insert it all in a single transaction
info = {
"target_replica_uid": self.target_replica_uid,
"new_gen": new_gen,
@@ -204,6 +182,14 @@ class SoledadSynchronizer(Synchronizer):
defer.returnValue(my_gen)
+ def _docs_by_gen_from_changes(self, changes):
+ docs_by_generation = []
+ kwargs = {'include_deleted': True}
+ for doc_id, gen, trans in changes:
+ get_doc = (self.source.get_doc, (doc_id,), kwargs)
+ docs_by_generation.append((get_doc, gen, trans))
+ return docs_by_generation
+
def complete_sync(self):
"""
Last stage of the synchronization:
@@ -225,12 +211,6 @@ class SoledadSynchronizer(Synchronizer):
# if gapless record current reached generation with target
return self._record_sync_info_with_the_target(info["my_gen"])
- def close(self):
- """
- Close the synchronizer.
- """
- self.sync_target.close()
-
def _record_sync_info_with_the_target(self, start_generation):
"""
Store local replica metadata in server.
diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py
index f4f48f86..4a29ca87 100644
--- a/common/src/leap/soledad/common/backend.py
+++ b/common/src/leap/soledad/common/backend.py
@@ -73,8 +73,8 @@ class SoledadBackend(CommonBackend):
def batch_end(self):
if not self.BATCH_SUPPORT:
return
- self.batching = False
self._database.batch_end()
+ self.batching = False
for name in self.after_batch_callbacks:
self.after_batch_callbacks[name]()
self.after_batch_callbacks = None
@@ -570,7 +570,7 @@ class SoledadBackend(CommonBackend):
self._put_doc(cur_doc, doc)
def get_docs(self, doc_ids, check_for_conflicts=True,
- include_deleted=False):
+ include_deleted=False, read_content=True):
"""
Get the JSON content for many documents.
@@ -588,7 +588,7 @@ class SoledadBackend(CommonBackend):
:rtype: iterable
"""
return self._database.get_docs(doc_ids, check_for_conflicts,
- include_deleted)
+ include_deleted, read_content)
def _prune_conflicts(self, doc, doc_vcr):
"""
diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py
index 0f4102db..2e6f734e 100644
--- a/common/src/leap/soledad/common/couch/__init__.py
+++ b/common/src/leap/soledad/common/couch/__init__.py
@@ -20,6 +20,7 @@
import json
+import copy
import re
import uuid
import binascii
@@ -295,31 +296,14 @@ class CouchDatabase(object):
generation, _ = self.get_generation_info()
results = list(
- self._get_docs(None, True, include_deleted))
+ self.get_docs(None, True, include_deleted))
return (generation, results)
def get_docs(self, doc_ids, check_for_conflicts=True,
- include_deleted=False):
+ include_deleted=False, read_content=True):
"""
Get the JSON content for many documents.
- :param doc_ids: A list of document identifiers or None for all.
- :type doc_ids: list
- :param check_for_conflicts: If set to False, then the conflict check
- will be skipped, and 'None' will be
- returned instead of True/False.
- :type check_for_conflicts: bool
- :param include_deleted: If set to True, deleted documents will be
- returned with empty content. Otherwise deleted
- documents will not be included in the results.
- :return: iterable giving the Document object for each document id
- in matching doc_ids order.
- :rtype: iterable
- """
- return self._get_docs(doc_ids, check_for_conflicts, include_deleted)
-
- def _get_docs(self, doc_ids, check_for_conflicts, include_deleted):
- """
Use couch's `_all_docs` view to get the documents indicated in
`doc_ids`,
@@ -337,14 +321,21 @@ class CouchDatabase(object):
in matching doc_ids order.
:rtype: iterable
"""
- params = {'include_docs': 'true', 'attachments': 'true'}
+ params = {'include_docs': 'true', 'attachments': 'false'}
if doc_ids is not None:
params['keys'] = doc_ids
view = self._database.view("_all_docs", **params)
for row in view.rows:
- result = row['doc']
+ result = copy.deepcopy(row['doc'])
+ for file_name in result.get('_attachments', {}).keys():
+ data = self._database.get_attachment(result, file_name)
+ if data:
+ if read_content:
+ data = data.read()
+ result['_attachments'][file_name] = {'data': data}
doc = self.__parse_doc_from_couch(
- result, result['_id'], check_for_conflicts=check_for_conflicts)
+ result, result['_id'],
+ check_for_conflicts=check_for_conflicts, decode=False)
# filter out non-u1db or deleted documents
if not doc or (not include_deleted and doc.is_tombstone()):
continue
@@ -409,7 +400,7 @@ class CouchDatabase(object):
return rev
def __parse_doc_from_couch(self, result, doc_id,
- check_for_conflicts=False):
+ check_for_conflicts=False, decode=True):
# restrict to u1db documents
if 'u1db_rev' not in result:
return None
@@ -418,19 +409,23 @@ class CouchDatabase(object):
if '_attachments' not in result \
or 'u1db_content' not in result['_attachments']:
doc.make_tombstone()
- else:
+ elif decode:
doc.content = json.loads(
binascii.a2b_base64(
result['_attachments']['u1db_content']['data']))
+ else:
+ doc._json = result['_attachments']['u1db_content']['data']
# determine if there are conflicts
if check_for_conflicts \
and '_attachments' in result \
and 'u1db_conflicts' in result['_attachments']:
- doc.set_conflicts(
- self._build_conflicts(
- doc.doc_id,
- json.loads(binascii.a2b_base64(
- result['_attachments']['u1db_conflicts']['data']))))
+ if decode:
+ conflicts = binascii.a2b_base64(
+ result['_attachments']['u1db_conflicts']['data'])
+ else:
+ conflicts = result['_attachments']['u1db_conflicts']['data']
+ conflicts = json.loads(conflicts)
+ doc.set_conflicts(self._build_conflicts(doc.doc_id, conflicts))
# store couch revision
doc.couch_rev = result['_rev']
return doc
@@ -663,7 +658,7 @@ class CouchDatabase(object):
_, _, data = resource.get_json(**kwargs)
return data
- def _allocate_new_generation(self, doc_id, transaction_id):
+ def _allocate_new_generation(self, doc_id, transaction_id, save=True):
"""
Allocate a new generation number for a document modification.
@@ -703,10 +698,12 @@ class CouchDatabase(object):
DOC_ID_KEY: doc_id,
TRANSACTION_ID_KEY: transaction_id,
}
- self._database.save(gen_doc)
+ if save:
+ self._database.save(gen_doc)
break # succeeded allocating a new generation, proceed
except ResourceConflict:
pass # try again!
+ return gen_doc
def save_document(self, old_doc, doc, transaction_id):
"""
@@ -785,6 +782,7 @@ class CouchDatabase(object):
headers=envelope.headers)
except ResourceConflict:
raise RevisionConflict()
+ self._allocate_new_generation(doc.doc_id, transaction_id)
else:
for name, attachment in attachments.items():
del attachment['follows']
@@ -793,12 +791,13 @@ class CouchDatabase(object):
attachment['data'] = binascii.b2a_base64(
parts[index]).strip()
couch_doc['_attachments'] = attachments
+ gen_doc = self._allocate_new_generation(
+ doc.doc_id, transaction_id, save=False)
self.batch_docs[doc.doc_id] = couch_doc
+ self.batch_docs[gen_doc['_id']] = gen_doc
last_gen, last_trans_id = self.batch_generation
self.batch_generation = (last_gen + 1, transaction_id)
- self._allocate_new_generation(doc.doc_id, transaction_id)
-
def _new_resource(self, *path):
"""
Return a new resource for accessing a couch database.
diff --git a/common/src/leap/soledad/common/l2db/backends/sqlite_backend.py b/common/src/leap/soledad/common/l2db/backends/sqlite_backend.py
index d73c0d16..27db65af 100644
--- a/common/src/leap/soledad/common/l2db/backends/sqlite_backend.py
+++ b/common/src/leap/soledad/common/l2db/backends/sqlite_backend.py
@@ -505,12 +505,11 @@ class SQLiteDatabase(CommonBackend):
def _put_doc_if_newer(self, doc, save_conflict, replica_uid=None,
replica_gen=None, replica_trans_id=None):
- with self._db_handle:
- return super(SQLiteDatabase, self)._put_doc_if_newer(
- doc,
- save_conflict=save_conflict,
- replica_uid=replica_uid, replica_gen=replica_gen,
- replica_trans_id=replica_trans_id)
+ return super(SQLiteDatabase, self)._put_doc_if_newer(
+ doc,
+ save_conflict=save_conflict,
+ replica_uid=replica_uid, replica_gen=replica_gen,
+ replica_trans_id=replica_trans_id)
def _add_conflict(self, c, doc_id, my_doc_rev, my_content):
c.execute("INSERT INTO conflicts VALUES (?, ?, ?)",
@@ -924,4 +923,5 @@ class SQLitePartialExpandDatabase(SQLiteDatabase):
raw_doc = json.loads(doc)
self._update_indexes(doc_id, raw_doc, getters, c)
+
SQLiteDatabase.register_implementation(SQLitePartialExpandDatabase)
diff --git a/common/src/leap/soledad/common/l2db/remote/http_app.py b/common/src/leap/soledad/common/l2db/remote/http_app.py
index 5cf6645e..496274b2 100644
--- a/common/src/leap/soledad/common/l2db/remote/http_app.py
+++ b/common/src/leap/soledad/common/l2db/remote/http_app.py
@@ -194,6 +194,7 @@ class URLToResource(object):
resource_cls = params.pop('resource_cls')
return resource_cls, params
+
url_to_resource = URLToResource()
@@ -501,7 +502,9 @@ class HTTPResponder(object):
self._write('\r\n')
else:
self._write(',\r\n')
- self._write(json.dumps(entry))
+ if type(entry) == dict:
+ entry = json.dumps(entry)
+ self._write(entry)
def end_stream(self):
"end stream (array)."
diff --git a/scripts/db_access/client_side_db.py b/scripts/db_access/client_side_db.py
index 11d72791..48eec0f7 100644
--- a/scripts/db_access/client_side_db.py
+++ b/scripts/db_access/client_side_db.py
@@ -1,13 +1,13 @@
#!/usr/bin/python
import os
+import sys
import argparse
import tempfile
import getpass
import requests
import srp._pysrp as srp
import binascii
-import logging
import json
import time
@@ -15,6 +15,7 @@ from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from leap.soledad.client import Soledad
+from leap.soledad.common.log import getLogger
from leap.keymanager import KeyManager
from leap.keymanager.openpgp import OpenPGPKey
@@ -39,9 +40,9 @@ Use the --help option to see available options.
# create a logger
-logger = logging.getLogger(__name__)
-LOG_FORMAT = '%(asctime)s %(message)s'
-logging.basicConfig(format=LOG_FORMAT, level=logging.DEBUG)
+logger = getLogger(__name__)
+from twisted.python import log
+log.startLogging(sys.stdout)
safe_unhexlify = lambda x: binascii.unhexlify(x) if (
@@ -133,8 +134,7 @@ def _get_soledad_instance(uuid, passphrase, basedir, server_url, cert_file,
local_db_path=local_db_path,
server_url=server_url,
cert_file=cert_file,
- auth_token=token,
- defer_encryption=True)
+ auth_token=token)
def _get_keymanager_instance(username, provider, soledad, token,
diff --git a/scripts/docker/files/bin/client_side_db.py b/scripts/docker/files/bin/client_side_db.py
index 4be33d13..80da7392 100644
--- a/scripts/docker/files/bin/client_side_db.py
+++ b/scripts/docker/files/bin/client_side_db.py
@@ -136,8 +136,7 @@ def _get_soledad_instance(uuid, passphrase, basedir, server_url, cert_file,
local_db_path=local_db_path,
server_url=server_url,
cert_file=cert_file,
- auth_token=token,
- defer_encryption=True)
+ auth_token=token)
def _get_keymanager_instance(username, provider, soledad, token,
diff --git a/scripts/profiling/mail/soledad_client.py b/scripts/profiling/mail/soledad_client.py
index 5ac8ce39..dcd605aa 100644
--- a/scripts/profiling/mail/soledad_client.py
+++ b/scripts/profiling/mail/soledad_client.py
@@ -30,8 +30,7 @@ class SoledadClient(object):
server_url=self._server_url,
cert_file=None,
auth_token=self._auth_token,
- secret_id=None,
- defer_encryption=True)
+ secret_id=None)
def close(self):
if self._soledad is not None:
diff --git a/scripts/profiling/sync/profile-sync.py b/scripts/profiling/sync/profile-sync.py
index 34e66f03..1d59217a 100755
--- a/scripts/profiling/sync/profile-sync.py
+++ b/scripts/profiling/sync/profile-sync.py
@@ -91,7 +91,6 @@ def _get_soledad_instance_from_uuid(uuid, passphrase, basedir, server_url,
server_url=server_url,
cert_file=cert_file,
auth_token=token,
- defer_encryption=True,
syncable=True)
diff --git a/server/pkg/requirements.pip b/server/pkg/requirements.pip
index e92dfde6..e4a87e74 100644
--- a/server/pkg/requirements.pip
+++ b/server/pkg/requirements.pip
@@ -3,3 +3,4 @@ PyOpenSSL
twisted>=12.3.0
Beaker
couchdb
+python-cjson
diff --git a/server/pkg/soledad-server b/server/pkg/soledad-server
index d9dab040..753a260b 100644
--- a/server/pkg/soledad-server
+++ b/server/pkg/soledad-server
@@ -11,7 +11,7 @@
PATH=/sbin:/bin:/usr/sbin:/usr/bin
PIDFILE=/var/run/soledad.pid
-OBJ=leap.soledad.server.application.wsgi_application
+RESOURCE_CLASS=leap.soledad.server.resource.SoledadResource
HTTPS_PORT=2424
CONFDIR=/etc/soledad
CERT_PATH="${CONFDIR}/soledad-server.pem"
@@ -39,7 +39,7 @@ case "${1}" in
--syslog \
--prefix=soledad-server \
web \
- --wsgi=${OBJ} \
+ --class=${RESOURCE_CLASS} \
--port=ssl:${HTTPS_PORT}:privateKey=${PRIVKEY_PATH}:certKey=${CERT_PATH}:sslmethod=${SSL_METHOD}
echo "."
;;
diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py
index d8243c19..039bef75 100644
--- a/server/src/leap/soledad/server/__init__.py
+++ b/server/src/leap/soledad/server/__init__.py
@@ -193,7 +193,8 @@ class HTTPInvocationByMethodWithBody(
try:
content_length = int(self.environ['CONTENT_LENGTH'])
except (ValueError, KeyError):
- raise http_app.BadRequest
+ # raise http_app.BadRequest
+ content_length = self.max_request_size
if content_length <= 0:
raise http_app.BadRequest
if content_length > self.max_request_size:
@@ -219,27 +220,23 @@ class HTTPInvocationByMethodWithBody(
if content_type == 'application/x-soledad-sync-put':
meth_put = self._lookup('%s_put' % method)
meth_end = self._lookup('%s_end' % method)
- entries = []
while True:
- line = body_getline()
- entry = line.strip()
+ entry = body_getline().strip()
if entry == ']': # end of incoming document stream
break
if not entry or not comma: # empty or no prec comma
raise http_app.BadRequest
entry, comma = utils.check_and_strip_comma(entry)
- entries.append(entry)
+ content = body_getline().strip()
+ content, comma = utils.check_and_strip_comma(content)
+ meth_put({'content': content or None}, entry)
if comma or body_getline(): # extra comma or data
raise http_app.BadRequest
- for entry in entries:
- meth_put({}, entry)
return meth_end()
# handle outgoing documents
elif content_type == 'application/x-soledad-sync-get':
- line = body_getline()
- entry = line.strip()
meth_get = self._lookup('%s_get' % method)
- return meth_get({}, line)
+ return meth_get()
else:
raise http_app.BadRequest()
else:
diff --git a/server/src/leap/soledad/server/config.py b/server/src/leap/soledad/server/config.py
index 4a791cbe..3c17ec19 100644
--- a/server/src/leap/soledad/server/config.py
+++ b/server/src/leap/soledad/server/config.py
@@ -24,7 +24,7 @@ CONFIG_DEFAULTS = {
'couch_url': 'http://localhost:5984',
'create_cmd': None,
'admin_netrc': '/etc/couchdb/couchdb-admin.netrc',
- 'batching': False
+ 'batching': True
},
'database-security': {
'members': ['soledad'],
diff --git a/server/src/leap/soledad/server/resource.py b/server/src/leap/soledad/server/resource.py
new file mode 100644
index 00000000..dbb91b0a
--- /dev/null
+++ b/server/src/leap/soledad/server/resource.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+# resource.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A twisted resource that serves the Soledad Server.
+"""
+
+from twisted.web.resource import Resource
+from twisted.web.wsgi import WSGIResource
+from twisted.internet import reactor
+from twisted.python import threadpool
+
+from leap.soledad.server.application import wsgi_application
+
+
+__all__ = ['SoledadResource']
+
+
+# setup a wsgi resource with its own threadpool
+pool = threadpool.ThreadPool()
+reactor.callWhenRunning(pool.start)
+reactor.addSystemEventTrigger('after', 'shutdown', pool.stop)
+wsgi_resource = WSGIResource(reactor, pool, wsgi_application)
+
+
+class SoledadResource(Resource):
+ """
+ This is a dummy twisted resource, used only to allow different entry points
+ for the Soledad Server.
+ """
+
+ def __init__(self):
+ self.children = {'': wsgi_resource}
+
+ def getChild(self, path, request):
+ # for now, just "rewind" the path and serve the wsgi resource for all
+ # requests. In the future, we might look into the request path to
+ # decide which child resources should serve each request.
+ request.postpath.insert(0, request.prepath.pop())
+ return self.children['']
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index 3f5c4aba..b553a056 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -17,14 +17,19 @@
"""
Server side synchronization infrastructure.
"""
-from leap.soledad.common.l2db import sync, Document
+import time
+from itertools import izip
+
+from leap.soledad.common.l2db import sync
from leap.soledad.common.l2db.remote import http_app
from leap.soledad.server.caching import get_cache_for
from leap.soledad.server.state import ServerSyncState
+from leap.soledad.common.document import ServerDocument
-MAX_REQUEST_SIZE = 200 # in Mb
+MAX_REQUEST_SIZE = float('inf') # It's a stream.
MAX_ENTRY_SIZE = 200 # in Mb
+ENTRY_CACHE_SIZE = 8192 * 1024
class SyncExchange(sync.SyncExchange):
@@ -51,7 +56,7 @@ class SyncExchange(sync.SyncExchange):
# recover sync state
self._sync_state = ServerSyncState(self.source_replica_uid, sync_id)
- def find_changes_to_return(self, received):
+ def find_changes_to_return(self):
"""
Find changes to return.
@@ -59,10 +64,6 @@ class SyncExchange(sync.SyncExchange):
order using whats_changed. It excludes documents ids that have
already been considered (superseded by the sender, etc).
- :param received: How many documents the source replica has already
- received during the current sync process.
- :type received: int
-
:return: the generation of this database, which the caller can
consider themselves to be synchronized after processing
allreturned documents, and the amount of documents to be sent
@@ -78,41 +79,45 @@ class SyncExchange(sync.SyncExchange):
self._trace('after whats_changed')
seen_ids = self._sync_state.seen_ids()
# changed docs that weren't superseded by or converged with
- changes_to_return = [
+ self.changes_to_return = [
(doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes
# there was a subsequent update
if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]
self._sync_state.put_changes_to_return(
- new_gen, new_trans_id, changes_to_return)
- number_of_changes = len(changes_to_return)
- # query server for stored changes
- _, _, next_change_to_return = \
- self._sync_state.next_change_to_return(received)
+ new_gen, new_trans_id, self.changes_to_return)
+ number_of_changes = len(self.changes_to_return)
self.new_gen = new_gen
self.new_trans_id = new_trans_id
- # and append one change
- self.change_to_return = next_change_to_return
return self.new_gen, number_of_changes
- def return_one_doc(self, return_doc_cb):
- """
- Return one changed document and its last change generation to the
- source syncing replica by invoking the callback return_doc_cb.
+ def return_docs(self, return_doc_cb):
+ """Return the changed documents and their last change generation
+ repeatedly invoking the callback return_doc_cb.
- This is called once for each document to be transferred from target to
- source.
+ The final step of a sync exchange.
- :param return_doc_cb: is a callback used to return the documents with
- their last change generation to the target
- replica.
- :type return_doc_cb: callable(doc, gen, trans_id)
+ :param: return_doc_cb(doc, gen, trans_id): is a callback
+ used to return the documents with their last change generation
+ to the target replica.
+ :return: None
"""
- if self.change_to_return is not None:
- changed_doc_id, gen, trans_id = self.change_to_return
- doc = self._db.get_doc(changed_doc_id, include_deleted=True)
+ changes_to_return = self.changes_to_return
+ # return docs, including conflicts.
+ # content as a file-object (will be read when writing)
+ changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return]
+ docs = self._db.get_docs(
+ changed_doc_ids, check_for_conflicts=False,
+ include_deleted=True, read_content=False)
+
+ docs_by_gen = izip(
+ docs, (gen for _, gen, _ in changes_to_return),
+ (trans_id for _, _, trans_id in changes_to_return))
+ for doc, gen, trans_id in docs_by_gen:
return_doc_cb(doc, gen, trans_id)
def batched_insert_from_source(self, entries, sync_id):
+ if not entries:
+ return
self._db.batch_start()
for entry in entries:
doc, gen, trans_id, number_of_docs, doc_idx = entry
@@ -207,6 +212,7 @@ class SyncResource(http_app.SyncResource):
db, self.source_replica_uid, last_known_generation, sync_id)
self._sync_id = sync_id
self._staging = []
+ self._staging_size = 0
@http_app.http_method(content_as_args=True)
def post_put(
@@ -233,26 +239,37 @@ class SyncResource(http_app.SyncResource):
:param doc_idx: The index of the current document.
:type doc_idx: int
"""
- doc = Document(id, rev, content)
+ doc = ServerDocument(id, rev, json=content)
+ self._staging_size += len(content or '')
self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx))
+ if self._staging_size > ENTRY_CACHE_SIZE or doc_idx == number_of_docs:
+ self.sync_exch.batched_insert_from_source(self._staging,
+ self._sync_id)
+ self._staging = []
+ self._staging_size = 0
- @http_app.http_method(received=int, content_as_args=True)
- def post_get(self, received):
+ def post_get(self):
"""
- Return one syncing document to the client.
-
- :param received: How many documents have already been received by the
- client on the current sync session.
- :type received: int
+ Return syncing documents to the client.
"""
-
def send_doc(doc, gen, trans_id):
- entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(),
+ entry = dict(id=doc.doc_id, rev=doc.rev,
gen=gen, trans_id=trans_id)
self.responder.stream_entry(entry)
+ content_reader = doc.get_json()
+ if content_reader:
+ content = content_reader.read()
+ self.responder.stream_entry(content)
+ content_reader.close()
+ # throttle at 5mb/s
+ # FIXME: twistd cant control througput
+ # we need to either use gunicorn or go async
+ time.sleep(len(content) / (5.0 * 1024 * 1024))
+ else:
+ self.responder.stream_entry('')
new_gen, number_of_changes = \
- self.sync_exch.find_changes_to_return(received)
+ self.sync_exch.find_changes_to_return()
self.responder.content_type = 'application/x-u1db-sync-response'
self.responder.start_response(200)
self.responder.start_stream(),
@@ -264,7 +281,7 @@ class SyncResource(http_app.SyncResource):
if self.replica_uid is not None:
header['replica_uid'] = self.replica_uid
self.responder.stream_entry(header)
- self.sync_exch.return_one_doc(send_doc)
+ self.sync_exch.return_docs(send_doc)
self.responder.end_stream()
self.responder.finish_response()
@@ -273,7 +290,6 @@ class SyncResource(http_app.SyncResource):
Return the current generation and transaction_id after inserting one
incoming document.
"""
- self.sync_exch.batched_insert_from_source(self._staging, self._sync_id)
self.responder.content_type = 'application/x-soledad-sync-response'
self.responder.start_response(200)
self.responder.start_stream(),
diff --git a/setup.cfg b/setup.cfg
index 187616d5..f62466ea 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,7 +1,7 @@
[pep8]
-exclude = versioneer.py,_version.py,*.egg,build,docs,scripts,ddocs.py
-ignore = E731
+exclude = versioneer.py,_version.py,*.egg,build,docs,scripts,ddocs.py,.tox
+ignore = F812,E731
[flake8]
-exclude = versioneer.py,_version.py,*.egg,build,docs,scripts,ddocs.py
-ignore = E731
+exclude = versioneer.py,_version.py,*.egg,build,docs,scripts,ddocs.py,.tox
+ignore = F812,E731
diff --git a/testing/pytest.ini b/testing/pytest.ini
index 2d34c607..eb70b67c 100644
--- a/testing/pytest.ini
+++ b/testing/pytest.ini
@@ -1,3 +1,3 @@
[pytest]
testpaths = tests
-norecursedirs = tests/perf
+twisted = yes
diff --git a/testing/test_soledad/util.py b/testing/test_soledad/util.py
index d53f6cda..57f8199b 100644
--- a/testing/test_soledad/util.py
+++ b/testing/test_soledad/util.py
@@ -15,12 +15,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
"""
Utilities used by multiple test suites.
"""
-
import os
import random
import string
@@ -45,21 +43,20 @@ from leap.soledad.common.document import SoledadDocument
from leap.soledad.common.couch import CouchDatabase
from leap.soledad.common.couch.state import CouchServerState
-from leap.soledad.common.crypto import ENC_SCHEME_KEY
from leap.soledad.client import Soledad
from leap.soledad.client import http_target
from leap.soledad.client import auth
-from leap.soledad.client.crypto import decrypt_doc_dict
from leap.soledad.client.sqlcipher import SQLCipherDatabase
from leap.soledad.client.sqlcipher import SQLCipherOptions
+from leap.soledad.client._crypto import is_symmetrically_encrypted
from leap.soledad.server import SoledadApp
from leap.soledad.server.auth import SoledadTokenAuthMiddleware
PASSWORD = '123456'
-ADDRESS = 'leap@leap.se'
+ADDRESS = 'user-1234'
def make_local_db_and_target(test):
@@ -193,8 +190,7 @@ class MockedSharedDBTest(object):
def soledad_sync_target(
- test, path, source_replica_uid=uuid4().hex,
- sync_db=None, sync_enc_pool=None):
+ test, path, source_replica_uid=uuid4().hex):
creds = {'token': {
'uuid': 'user-uuid',
'token': 'auth-token',
@@ -204,14 +200,13 @@ def soledad_sync_target(
source_replica_uid,
creds,
test._soledad._crypto,
- None, # cert_file
- sync_db=sync_db,
- sync_enc_pool=sync_enc_pool)
+ None) # cert_file
# redefine the base leap test class so it inherits from twisted trial's
# TestCase. This is needed so trial knows that it has to manage a reactor and
# wait for deferreds returned by tests to be fired.
+
BaseLeapTest = type(
'BaseLeapTest', (unittest.TestCase,), dict(BaseLeapTest.__dict__))
@@ -221,7 +216,6 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):
"""
Instantiates Soledad for usage in tests.
"""
- defer_sync_encryption = False
@pytest.mark.usefixtures("method_tmpdir")
def setUp(self):
@@ -229,14 +223,7 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):
# repeat it here because twisted.trial does not work with
# setUpClass/tearDownClass.
- self.old_path = os.environ['PATH']
- self.old_home = os.environ['HOME']
self.home = self.tempdir
- bin_tdir = os.path.join(
- self.tempdir,
- 'bin')
- os.environ["PATH"] = bin_tdir
- os.environ["HOME"] = self.tempdir
# config info
self.db1_file = os.path.join(self.tempdir, "db1.u1db")
@@ -263,10 +250,6 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):
self._db2.close()
self._soledad.close()
- # restore paths
- os.environ["PATH"] = self.old_path
- os.environ["HOME"] = self.old_home
-
def _delete_temporary_dirs():
# XXX should not access "private" attrs
for f in [self._soledad.local_db_path,
@@ -305,12 +288,12 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):
self.tempdir, prefix, local_db_path),
server_url=server_url, # Soledad will fail if not given an url
cert_file=cert_file,
- defer_encryption=self.defer_sync_encryption,
shared_db=MockSharedDB(),
auth_token=auth_token)
self.addCleanup(soledad.close)
return soledad
+ @pytest.inlineCallbacks
def assertGetEncryptedDoc(
self, db, doc_id, doc_rev, content, has_conflicts):
"""
@@ -320,13 +303,9 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):
has_conflicts=has_conflicts)
doc = db.get_doc(doc_id)
- if ENC_SCHEME_KEY in doc.content:
- # XXX check for SYM_KEY too
- key = self._soledad._crypto.doc_passphrase(doc.doc_id)
- secret = self._soledad._crypto.secret
- decrypted = decrypt_doc_dict(
- doc.content, doc.doc_id, doc.rev,
- key, secret)
+ if is_symmetrically_encrypted(doc.content['raw']):
+ crypt = self._soledad._crypto
+ decrypted = yield crypt.decrypt_doc(doc)
doc.set_json(decrypted)
self.assertEqual(exp_doc.doc_id, doc.doc_id)
self.assertEqual(exp_doc.rev, doc.rev)
diff --git a/testing/tests/perf/assets/cert_default.conf b/testing/tests/benchmarks/assets/cert_default.conf
index 8043cea3..8043cea3 100644
--- a/testing/tests/perf/assets/cert_default.conf
+++ b/testing/tests/benchmarks/assets/cert_default.conf
diff --git a/testing/tests/benchmarks/conftest.py b/testing/tests/benchmarks/conftest.py
new file mode 100644
index 00000000..a9cc3464
--- /dev/null
+++ b/testing/tests/benchmarks/conftest.py
@@ -0,0 +1,57 @@
+import pytest
+import random
+import base64
+
+from twisted.internet import threads, reactor
+
+
+# we have to manually setup the events server in order to be able to signal
+# events. This is usually done by the enclosing application using soledad
+# client (i.e. bitmask client).
+from leap.common.events import server
+server.ensure_server()
+
+
+def pytest_addoption(parser):
+ parser.addoption(
+ "--num-docs", type="int", default=100,
+ help="the number of documents to use in performance tests")
+
+
+@pytest.fixture()
+def payload():
+ def generate(size):
+ random.seed(1337) # same seed to avoid different bench results
+ payload_bytes = bytearray(random.getrandbits(8) for _ in xrange(size))
+ # encode as base64 to avoid ascii encode/decode errors
+ return base64.b64encode(payload_bytes)[:size] # remove b64 overhead
+ return generate
+
+
+@pytest.fixture()
+def txbenchmark(benchmark):
+ def blockOnThread(*args, **kwargs):
+ return threads.deferToThread(
+ benchmark, threads.blockingCallFromThread,
+ reactor, *args, **kwargs)
+ return blockOnThread
+
+
+@pytest.fixture()
+def txbenchmark_with_setup(benchmark):
+ def blockOnThreadWithSetup(setup, f):
+ def blocking_runner(*args, **kwargs):
+ return threads.blockingCallFromThread(reactor, f, *args, **kwargs)
+
+ def blocking_setup():
+ args = threads.blockingCallFromThread(reactor, setup)
+ try:
+ return tuple(arg for arg in args), {}
+ except TypeError:
+ return ((args,), {}) if args else None
+
+ def bench():
+ return benchmark.pedantic(blocking_runner, setup=blocking_setup,
+ rounds=4, warmup_rounds=1)
+ return threads.deferToThread(bench)
+ return blockOnThreadWithSetup
diff --git a/testing/tests/perf/pytest.ini b/testing/tests/benchmarks/pytest.ini
index 7a0508ce..7a0508ce 100644
--- a/testing/tests/perf/pytest.ini
+++ b/testing/tests/benchmarks/pytest.ini
diff --git a/testing/tests/benchmarks/test_crypto.py b/testing/tests/benchmarks/test_crypto.py
new file mode 100644
index 00000000..8ee9b899
--- /dev/null
+++ b/testing/tests/benchmarks/test_crypto.py
@@ -0,0 +1,97 @@
+"""
+Benchmarks for crypto operations.
+If you don't want to stress your local machine too much, you can pass the
+SIZE_LIMT environment variable.
+
+For instance, to keep the maximum payload at 1MB:
+
+SIZE_LIMIT=1E6 py.test -s tests/perf/test_crypto.py
+"""
+import pytest
+import os
+import json
+from uuid import uuid4
+
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.client import _crypto
+
+LIMIT = int(float(os.environ.get('SIZE_LIMIT', 50 * 1000 * 1000)))
+
+
+def create_doc_encryption(size):
+ @pytest.mark.benchmark(group="test_crypto_encrypt_doc")
+ @pytest.inlineCallbacks
+ def test_doc_encryption(soledad_client, txbenchmark, payload):
+ crypto = soledad_client()._crypto
+
+ DOC_CONTENT = {'payload': payload(size)}
+ doc = SoledadDocument(
+ doc_id=uuid4().hex, rev='rev',
+ json=json.dumps(DOC_CONTENT))
+
+ yield txbenchmark(crypto.encrypt_doc, doc)
+ return test_doc_encryption
+
+
+# TODO this test is really bullshit, because it's still including
+# the json serialization.
+
+def create_doc_decryption(size):
+ @pytest.inlineCallbacks
+ @pytest.mark.benchmark(group="test_crypto_decrypt_doc")
+ def test_doc_decryption(soledad_client, txbenchmark, payload):
+ crypto = soledad_client()._crypto
+
+ DOC_CONTENT = {'payload': payload(size)}
+ doc = SoledadDocument(
+ doc_id=uuid4().hex, rev='rev',
+ json=json.dumps(DOC_CONTENT))
+
+ encrypted_doc = yield crypto.encrypt_doc(doc)
+ doc.set_json(encrypted_doc)
+
+ yield txbenchmark(crypto.decrypt_doc, doc)
+ return test_doc_decryption
+
+
+def create_raw_encryption(size):
+ @pytest.mark.benchmark(group="test_crypto_raw_encrypt")
+ def test_raw_encrypt(benchmark, payload):
+ key = payload(32)
+ benchmark(_crypto.encrypt_sym, payload(size), key)
+ return test_raw_encrypt
+
+
+def create_raw_decryption(size):
+ @pytest.mark.benchmark(group="test_crypto_raw_decrypt")
+ def test_raw_decrypt(benchmark, payload):
+ key = payload(32)
+ iv, ciphertext = _crypto.encrypt_sym(payload(size), key)
+ benchmark(_crypto.decrypt_sym, ciphertext, key, iv)
+ return test_raw_decrypt
+
+
+# Create the TESTS in the global namespace, they'll be picked by the benchmark
+# plugin.
+
+encryption_tests = [
+ ('10k', 1E4),
+ ('100k', 1E5),
+ ('500k', 5E5),
+ ('1M', 1E6),
+ ('10M', 1E7),
+ ('50M', 5E7),
+]
+
+for name, size in encryption_tests:
+ if size < LIMIT:
+ sz = int(size)
+ globals()['test_encrypt_doc_' + name] = create_doc_encryption(sz)
+ globals()['test_decrypt_doc_' + name] = create_doc_decryption(sz)
+
+
+for name, size in encryption_tests:
+ if size < LIMIT:
+ sz = int(size)
+ globals()['test_encrypt_raw_' + name] = create_raw_encryption(sz)
+ globals()['test_decrypt_raw_' + name] = create_raw_decryption(sz)
diff --git a/testing/tests/perf/test_misc.py b/testing/tests/benchmarks/test_misc.py
index ead48adf..ead48adf 100644
--- a/testing/tests/perf/test_misc.py
+++ b/testing/tests/benchmarks/test_misc.py
diff --git a/testing/tests/perf/test_sqlcipher.py b/testing/tests/benchmarks/test_sqlcipher.py
index e7a54228..39c9e3ad 100644
--- a/testing/tests/perf/test_sqlcipher.py
+++ b/testing/tests/benchmarks/test_sqlcipher.py
@@ -29,10 +29,10 @@ def build_test_sqlcipher_create(amount, size):
return test
-test_async_create_20_500k = build_test_sqlcipher_async_create(20, 500*1000)
-test_async_create_100_100k = build_test_sqlcipher_async_create(100, 100*1000)
-test_async_create_1000_10k = build_test_sqlcipher_async_create(1000, 10*1000)
+test_async_create_20_500k = build_test_sqlcipher_async_create(20, 500 * 1000)
+test_async_create_100_100k = build_test_sqlcipher_async_create(100, 100 * 1000)
+test_async_create_1000_10k = build_test_sqlcipher_async_create(1000, 10 * 1000)
# synchronous
-test_create_20_500k = build_test_sqlcipher_create(20, 500*1000)
-test_create_100_100k = build_test_sqlcipher_create(100, 100*1000)
-test_create_1000_10k = build_test_sqlcipher_create(1000, 10*1000)
+test_create_20_500k = build_test_sqlcipher_create(20, 500 * 1000)
+test_create_100_100k = build_test_sqlcipher_create(100, 100 * 1000)
+test_create_1000_10k = build_test_sqlcipher_create(1000, 10 * 1000)
diff --git a/testing/tests/perf/test_sync.py b/testing/tests/benchmarks/test_sync.py
index 0b48a0b9..1501d74b 100644
--- a/testing/tests/perf/test_sync.py
+++ b/testing/tests/benchmarks/test_sync.py
@@ -1,17 +1,14 @@
import pytest
-
from twisted.internet.defer import gatherResults
+@pytest.inlineCallbacks
def load_up(client, amount, payload):
- deferreds = []
# create a bunch of local documents
+ deferreds = []
for i in xrange(amount):
- d = client.create_doc({'content': payload})
- deferreds.append(d)
- d = gatherResults(deferreds)
- d.addCallback(lambda _: None)
- return d
+ deferreds.append(client.create_doc({'content': payload}))
+ yield gatherResults(deferreds)
def create_upload(uploads, size):
@@ -27,9 +24,9 @@ def create_upload(uploads, size):
return test
-test_upload_20_500k = create_upload(20, 500*1000)
-test_upload_100_100k = create_upload(100, 100*1000)
-test_upload_1000_10k = create_upload(1000, 10*1000)
+test_upload_20_500k = create_upload(20, 500 * 1000)
+test_upload_100_100k = create_upload(100, 100 * 1000)
+test_upload_1000_10k = create_upload(1000, 10 * 1000)
def create_download(downloads, size):
@@ -52,9 +49,9 @@ def create_download(downloads, size):
return test
-test_download_20_500k = create_download(20, 500*1000)
-test_download_100_100k = create_download(100, 100*1000)
-test_download_1000_10k = create_download(1000, 10*1000)
+test_download_20_500k = create_download(20, 500 * 1000)
+test_download_100_100k = create_download(100, 100 * 1000)
+test_download_1000_10k = create_download(1000, 10 * 1000)
@pytest.inlineCallbacks
diff --git a/testing/tests/client/test_aux_methods.py b/testing/tests/client/test_aux_methods.py
index c25ff8ca..9b4a175f 100644
--- a/testing/tests/client/test_aux_methods.py
+++ b/testing/tests/client/test_aux_methods.py
@@ -21,10 +21,10 @@ import os
from twisted.internet import defer
-from leap.soledad.common.errors import DatabaseAccessError
from leap.soledad.client import Soledad
from leap.soledad.client.adbapi import U1DBConnectionPool
from leap.soledad.client.secrets import PassphraseTooShort
+from leap.soledad.client.secrets import SecretsException
from test_soledad.util import BaseSoledadTest
@@ -108,7 +108,7 @@ class AuxMethodsTestCase(BaseSoledadTest):
sol.change_passphrase(u'654321')
sol.close()
- with self.assertRaises(DatabaseAccessError):
+ with self.assertRaises(SecretsException):
self._soledad_instance(
'leap@leap.se',
passphrase=u'123',
diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py
index 77252b46..49a61438 100644
--- a/testing/tests/client/test_crypto.py
+++ b/testing/tests/client/test_crypto.py
@@ -17,47 +17,173 @@
"""
Tests for cryptographic related stuff.
"""
-import os
-import hashlib
import binascii
+import base64
+import hashlib
+import json
+import os
+
+from io import BytesIO
+
+import pytest
+
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+from cryptography.hazmat.backends import default_backend
+from cryptography.exceptions import InvalidTag
-from leap.soledad.client import crypto
from leap.soledad.common.document import SoledadDocument
from test_soledad.util import BaseSoledadTest
-from leap.soledad.common.crypto import WrongMacError
-from leap.soledad.common.crypto import UnknownMacMethodError
-from leap.soledad.common.crypto import ENC_JSON_KEY
-from leap.soledad.common.crypto import ENC_SCHEME_KEY
-from leap.soledad.common.crypto import MAC_KEY
-from leap.soledad.common.crypto import MAC_METHOD_KEY
+from leap.soledad.client import _crypto
+
+from twisted.trial import unittest
+from twisted.internet import defer
+
+
+snowden1 = (
+ "You can't come up against "
+ "the world's most powerful intelligence "
+ "agencies and not accept the risk. "
+ "If they want to get you, over time "
+ "they will.")
+
+
+class AESTest(unittest.TestCase):
+
+ def test_chunked_encryption(self):
+ key = 'A' * 32
+
+ fd = BytesIO()
+ aes = _crypto.AESWriter(key, _buffer=fd)
+ iv = aes.iv
+
+ data = snowden1
+ block = 16
+
+ for i in range(len(data) / block):
+ chunk = data[i * block:(i + 1) * block]
+ aes.write(chunk)
+ aes.end()
+
+ ciphertext_chunked = fd.getvalue()
+ ciphertext, tag = _aes_encrypt(key, iv, data)
+
+ assert ciphertext_chunked == ciphertext
+
+ def test_decrypt(self):
+ key = 'A' * 32
+ iv = 'A' * 16
+
+ data = snowden1
+ block = 16
+
+ ciphertext, tag = _aes_encrypt(key, iv, data)
+
+ fd = BytesIO()
+ aes = _crypto.AESWriter(key, iv, fd, tag=tag)
+
+ for i in range(len(ciphertext) / block):
+ chunk = ciphertext[i * block:(i + 1) * block]
+ aes.write(chunk)
+ aes.end()
+
+ cleartext_chunked = fd.getvalue()
+ assert cleartext_chunked == data
+
+
+class BlobTestCase(unittest.TestCase):
+
+ class doc_info:
+ doc_id = 'D-deadbeef'
+ rev = '397932e0c77f45fcb7c3732930e7e9b2:1'
+
+ @defer.inlineCallbacks
+ def test_blob_encryptor(self):
+
+ inf = BytesIO(snowden1)
+
+ blob = _crypto.BlobEncryptor(
+ self.doc_info, inf,
+ secret='A' * 96)
+ encrypted = yield blob.encrypt()
+ preamble, ciphertext = _crypto._split(encrypted.getvalue())
+ ciphertext = ciphertext[:-16]
-class EncryptedSyncTestCase(BaseSoledadTest):
+ assert len(preamble) == _crypto.PACMAN.size
+ unpacked_data = _crypto.PACMAN.unpack(preamble)
+ magic, sch, meth, ts, iv, doc_id, rev = unpacked_data
+ assert magic == _crypto.BLOB_SIGNATURE_MAGIC
+ assert sch == 1
+ assert meth == _crypto.ENC_METHOD.aes_256_gcm
+ assert iv == blob.iv
+ assert doc_id == 'D-deadbeef'
+ assert rev == self.doc_info.rev
- """
- Tests that guarantee that data will always be encrypted when syncing.
- """
+ aes_key = _crypto._get_sym_key_for_doc(
+ self.doc_info.doc_id, 'A' * 96)
+ assert ciphertext == _aes_encrypt(aes_key, blob.iv, snowden1)[0]
- def test_encrypt_decrypt_json(self):
+ decrypted = _aes_decrypt(aes_key, blob.iv, blob.tag, ciphertext,
+ preamble)
+ assert str(decrypted) == snowden1
+
+ @defer.inlineCallbacks
+ def test_blob_decryptor(self):
+
+ inf = BytesIO(snowden1)
+
+ blob = _crypto.BlobEncryptor(
+ self.doc_info, inf,
+ secret='A' * 96)
+ ciphertext = yield blob.encrypt()
+
+ decryptor = _crypto.BlobDecryptor(
+ self.doc_info, ciphertext,
+ secret='A' * 96)
+ decrypted = yield decryptor.decrypt()
+ assert decrypted == snowden1
+
+ @defer.inlineCallbacks
+ def test_encrypt_and_decrypt(self):
"""
- Test encrypting and decrypting documents.
+ Check that encrypting and decrypting gives same doc.
"""
- simpledoc = {'key': 'val'}
- doc1 = SoledadDocument(doc_id='id')
- doc1.content = simpledoc
-
- # encrypt doc
- doc1.set_json(self._soledad._crypto.encrypt_doc(doc1))
- # assert content is different and includes keys
- self.assertNotEqual(
- simpledoc, doc1.content,
- 'incorrect document encryption')
- self.assertTrue(ENC_JSON_KEY in doc1.content)
- self.assertTrue(ENC_SCHEME_KEY in doc1.content)
- # decrypt doc
- doc1.set_json(self._soledad._crypto.decrypt_doc(doc1))
- self.assertEqual(
- simpledoc, doc1.content, 'incorrect document encryption')
+ crypto = _crypto.SoledadCrypto('A' * 96)
+ payload = {'key': 'someval'}
+ doc1 = SoledadDocument('id1', '1', json.dumps(payload))
+
+ encrypted = yield crypto.encrypt_doc(doc1)
+ assert encrypted != payload
+ assert 'raw' in encrypted
+ doc2 = SoledadDocument('id1', '1')
+ doc2.set_json(encrypted)
+ assert _crypto.is_symmetrically_encrypted(encrypted)
+ decrypted = yield crypto.decrypt_doc(doc2)
+ assert len(decrypted) != 0
+ assert json.loads(decrypted) == payload
+
+ @defer.inlineCallbacks
+ def test_decrypt_with_wrong_tag_raises(self):
+ """
+ Trying to decrypt a document with wrong MAC should raise.
+ """
+ crypto = _crypto.SoledadCrypto('A' * 96)
+ payload = {'key': 'someval'}
+ doc1 = SoledadDocument('id1', '1', json.dumps(payload))
+
+ encrypted = yield crypto.encrypt_doc(doc1)
+ encdict = json.loads(encrypted)
+ preamble, raw = _crypto._split(str(encdict['raw']))
+ # mess with tag
+ messed = raw[:-16] + '0' * 16
+
+ preamble = base64.urlsafe_b64encode(preamble)
+ newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed))
+ doc2 = SoledadDocument('id1', '1')
+ doc2.set_json(json.dumps({"raw": str(newraw)}))
+
+ with pytest.raises(_crypto.InvalidBlob):
+ yield crypto.decrypt_doc(doc2)
class RecoveryDocumentTestCase(BaseSoledadTest):
@@ -74,13 +200,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
encrypted_secret = rd[
self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id]
self.assertTrue(self._soledad.secrets.CIPHER_KEY in encrypted_secret)
- self.assertTrue(
- encrypted_secret[self._soledad.secrets.CIPHER_KEY] == 'aes256')
+ self.assertEquals(
+ _crypto.ENC_METHOD.aes_256_gcm,
+ encrypted_secret[self._soledad.secrets.CIPHER_KEY])
self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret)
self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret)
- def test_import_recovery_document(self):
- rd = self._soledad.secrets._export_recovery_document()
+ def test_import_recovery_document(self, cipher='aes256'):
+ rd = self._soledad.secrets._export_recovery_document(cipher)
s = self._soledad_instance()
s.secrets._import_recovery_document(rd)
s.secrets.set_secret_id(self._soledad.secrets._secret_id)
@@ -89,6 +216,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest):
'Failed settinng secret for symmetric encryption.')
s.close()
+ def test_import_GCM_recovery_document(self):
+ cipher = self._soledad.secrets.CIPHER_AES256_GCM
+ self.test_import_recovery_document(cipher)
+
+ def test_import_legacy_CTR_recovery_document(self):
+ cipher = self._soledad.secrets.CIPHER_AES256
+ self.test_import_recovery_document(cipher)
+
class SoledadSecretsTestCase(BaseSoledadTest):
@@ -146,60 +281,21 @@ class SoledadSecretsTestCase(BaseSoledadTest):
"Should have a secret at this point")
-class MacAuthTestCase(BaseSoledadTest):
-
- def test_decrypt_with_wrong_mac_raises(self):
- """
- Trying to decrypt a document with wrong MAC should raise.
- """
- simpledoc = {'key': 'val'}
- doc = SoledadDocument(doc_id='id')
- doc.content = simpledoc
- # encrypt doc
- doc.set_json(self._soledad._crypto.encrypt_doc(doc))
- self.assertTrue(MAC_KEY in doc.content)
- self.assertTrue(MAC_METHOD_KEY in doc.content)
- # mess with MAC
- doc.content[MAC_KEY] = '1234567890ABCDEF'
- # try to decrypt doc
- self.assertRaises(
- WrongMacError,
- self._soledad._crypto.decrypt_doc, doc)
-
- def test_decrypt_with_unknown_mac_method_raises(self):
- """
- Trying to decrypt a document with unknown MAC method should raise.
- """
- simpledoc = {'key': 'val'}
- doc = SoledadDocument(doc_id='id')
- doc.content = simpledoc
- # encrypt doc
- doc.set_json(self._soledad._crypto.encrypt_doc(doc))
- self.assertTrue(MAC_KEY in doc.content)
- self.assertTrue(MAC_METHOD_KEY in doc.content)
- # mess with MAC method
- doc.content[MAC_METHOD_KEY] = 'mymac'
- # try to decrypt doc
- self.assertRaises(
- UnknownMacMethodError,
- self._soledad._crypto.decrypt_doc, doc)
-
-
class SoledadCryptoAESTestCase(BaseSoledadTest):
def test_encrypt_decrypt_sym(self):
# generate 256-bit key
key = os.urandom(32)
- iv, cyphertext = crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
- plaintext = crypto.decrypt_sym(cyphertext, key, iv)
+ plaintext = _crypto.decrypt_sym(cyphertext, key, iv)
self.assertEqual('data', plaintext)
- def test_decrypt_with_wrong_iv_fails(self):
+ def test_decrypt_with_wrong_iv_raises(self):
key = os.urandom(32)
- iv, cyphertext = crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
@@ -208,13 +304,13 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):
wrongiv = rawiv
while wrongiv == rawiv:
wrongiv = os.urandom(1) + rawiv[1:]
- plaintext = crypto.decrypt_sym(
- cyphertext, key, iv=binascii.b2a_base64(wrongiv))
- self.assertNotEqual('data', plaintext)
+ with pytest.raises(InvalidTag):
+ _crypto.decrypt_sym(
+ cyphertext, key, iv=binascii.b2a_base64(wrongiv))
- def test_decrypt_with_wrong_key_fails(self):
+ def test_decrypt_with_wrong_key_raises(self):
key = os.urandom(32)
- iv, cyphertext = crypto.encrypt_sym('data', key)
+ iv, cyphertext = _crypto.encrypt_sym('data', key)
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
@@ -222,5 +318,21 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):
# ensure keys are different in case we are extremely lucky
while wrongkey == key:
wrongkey = os.urandom(32)
- plaintext = crypto.decrypt_sym(cyphertext, wrongkey, iv)
- self.assertNotEqual('data', plaintext)
+ with pytest.raises(InvalidTag):
+ _crypto.decrypt_sym(cyphertext, wrongkey, iv)
+
+
+def _aes_encrypt(key, iv, data):
+ backend = default_backend()
+ cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend)
+ encryptor = cipher.encryptor()
+ return encryptor.update(data) + encryptor.finalize(), encryptor.tag
+
+
+def _aes_decrypt(key, iv, tag, data, aead=''):
+ backend = default_backend()
+ cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend)
+ decryptor = cipher.decryptor()
+ if aead:
+ decryptor.authenticate_additional_data(aead)
+ return decryptor.update(data) + decryptor.finalize()
diff --git a/testing/tests/client/test_deprecated_crypto.py b/testing/tests/client/test_deprecated_crypto.py
new file mode 100644
index 00000000..8ee3735c
--- /dev/null
+++ b/testing/tests/client/test_deprecated_crypto.py
@@ -0,0 +1,91 @@
+import json
+from twisted.internet import defer
+from uuid import uuid4
+from urlparse import urljoin
+
+from leap.soledad.client import crypto as old_crypto
+from leap.soledad.common.couch import CouchDatabase
+from leap.soledad.common import crypto as common_crypto
+
+from test_soledad.u1db_tests import simple_doc
+from test_soledad.util import SoledadWithCouchServerMixin
+from test_soledad.util import make_token_soledad_app
+from test_soledad.u1db_tests import TestCaseWithServer
+
+
+def deprecate_client_crypto(client):
+ secret = client._crypto.secret
+ _crypto = old_crypto.SoledadCrypto(secret)
+ setattr(client._dbsyncer, '_crypto', _crypto)
+ return client
+
+
+def couch_database(couch_url, uuid):
+ db = CouchDatabase(couch_url, "user-%s" % (uuid,))
+ return db
+
+
+class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer):
+
+ def setUp(self):
+ SoledadWithCouchServerMixin.setUp(self)
+ TestCaseWithServer.setUp(self)
+
+ def tearDown(self):
+ SoledadWithCouchServerMixin.tearDown(self)
+ TestCaseWithServer.tearDown(self)
+
+ @staticmethod
+ def make_app_with_state(state):
+ return make_token_soledad_app(state)
+
+ @defer.inlineCallbacks
+ def test_touch_updates_remote_representation(self):
+ self.startTwistedServer()
+ user = 'user-' + uuid4().hex
+ server_url = 'http://%s:%d' % (self.server_address)
+ client = self._soledad_instance(user=user, server_url=server_url)
+ deprecated_client = deprecate_client_crypto(
+ self._soledad_instance(user=user, server_url=server_url))
+
+ self.make_app()
+ remote = self.request_state._create_database(replica_uid=client._uuid)
+ remote = CouchDatabase.open_database(
+ urljoin(self.couch_url, 'user-' + user),
+ create=True)
+
+ # ensure remote db is empty
+ gen, docs = remote.get_all_docs()
+ assert gen == 0
+ assert len(docs) == 0
+
+ # create a doc with deprecated client and sync
+ yield deprecated_client.create_doc(json.loads(simple_doc))
+ yield deprecated_client.sync()
+
+ # check for doc in remote db
+ gen, docs = remote.get_all_docs()
+ assert gen == 1
+ assert len(docs) == 1
+ doc = docs.pop()
+ content = doc.content
+ assert common_crypto.ENC_JSON_KEY in content
+ assert common_crypto.ENC_SCHEME_KEY in content
+ assert common_crypto.ENC_METHOD_KEY in content
+ assert common_crypto.ENC_IV_KEY in content
+ assert common_crypto.MAC_KEY in content
+ assert common_crypto.MAC_METHOD_KEY in content
+
+ # "touch" the document with a newer client and synx
+ _, docs = yield client.get_all_docs()
+ yield client.put_doc(doc)
+ yield client.sync()
+
+ # check for newer representation of doc in remote db
+ gen, docs = remote.get_all_docs()
+ assert gen == 2
+ assert len(docs) == 1
+ doc = docs.pop()
+ content = doc.content
+ assert len(content) == 1
+ assert 'raw' in content
diff --git a/testing/tests/conftest.py b/testing/tests/conftest.py
index 9e4319ac..1ff1cbb7 100644
--- a/testing/tests/conftest.py
+++ b/testing/tests/conftest.py
@@ -1,4 +1,29 @@
+import json
+import os
import pytest
+import requests
+import signal
+import time
+
+from hashlib import sha512
+from subprocess import call
+from urlparse import urljoin
+from uuid import uuid4
+
+from leap.soledad.common.couch import CouchDatabase
+from leap.soledad.client import Soledad
+
+
+#
+# default options for all tests
+#
+
+DEFAULT_PASSPHRASE = '123'
+
+DEFAULT_URL = 'http://127.0.0.1:2424'
+DEFAULT_PRIVKEY = 'soledad_privkey.pem'
+DEFAULT_CERTKEY = 'soledad_certkey.pem'
+DEFAULT_TOKEN = 'an-auth-token'
def pytest_addoption(parser):
@@ -16,3 +41,167 @@ def couch_url(request):
@pytest.fixture
def method_tmpdir(request, tmpdir):
request.instance.tempdir = tmpdir.strpath
+
+
+#
+# remote_db fixture: provides an empty database for a given user in a per
+# function scope.
+#
+
+class UserDatabase(object):
+
+ def __init__(self, url, uuid):
+ self._remote_db_url = urljoin(url, 'user-%s' % uuid)
+
+ def setup(self):
+ return CouchDatabase.open_database(
+ url=self._remote_db_url, create=True, replica_uid=None)
+
+ def teardown(self):
+ requests.delete(self._remote_db_url)
+
+
+@pytest.fixture()
+def remote_db(request):
+ couch_url = request.config.option.couch_url
+
+ def create(uuid):
+ db = UserDatabase(couch_url, uuid)
+ request.addfinalizer(db.teardown)
+ return db.setup()
+ return create
+
+
+def get_pid(pidfile):
+ if not os.path.isfile(pidfile):
+ return 0
+ try:
+ with open(pidfile) as f:
+ return int(f.read())
+ except IOError:
+ return 0
+
+
+#
+# soledad_server fixture: provides a running soledad server in a per module
+# context (same soledad server for all tests in this module).
+#
+
+class SoledadServer(object):
+
+ def __init__(self, tmpdir_factory, couch_url):
+ tmpdir = tmpdir_factory.mktemp('soledad-server')
+ self._pidfile = os.path.join(tmpdir.strpath, 'soledad-server.pid')
+ self._logfile = os.path.join(tmpdir.strpath, 'soledad-server.log')
+ self._couch_url = couch_url
+
+ def start(self):
+ self._create_conf_file()
+ # start the server
+ call([
+ 'twistd',
+ '--logfile=%s' % self._logfile,
+ '--pidfile=%s' % self._pidfile,
+ 'web',
+ '--wsgi=leap.soledad.server.application.wsgi_application',
+ '--port=2424'
+ ])
+
+ def _create_conf_file(self):
+ if not os.access('/etc', os.W_OK):
+ return
+ if not os.path.isdir('/etc/soledad'):
+ os.mkdir('/etc/soledad')
+ with open('/etc/soledad/soledad-server.conf', 'w') as f:
+ content = '[soledad-server]\ncouch_url = %s' % self._couch_url
+ f.write(content)
+
+ def stop(self):
+ pid = get_pid(self._pidfile)
+ os.kill(pid, signal.SIGKILL)
+
+
+@pytest.fixture(scope='module')
+def soledad_server(tmpdir_factory, request):
+ couch_url = request.config.option.couch_url
+ server = SoledadServer(tmpdir_factory, couch_url)
+ server.start()
+ request.addfinalizer(server.stop)
+ return server
+
+
+#
+# soledad_dbs fixture: provides all databases needed by soledad server in a per
+# module scope (same databases for all tests in this module).
+#
+
+def _token_dbname():
+ dbname = 'tokens_' + \
+ str(int(time.time() / (30 * 24 * 3600)))
+ return dbname
+
+
+class SoledadDatabases(object):
+
+ def __init__(self, url):
+ self._token_db_url = urljoin(url, _token_dbname())
+ self._shared_db_url = urljoin(url, 'shared')
+
+ def setup(self, uuid):
+ self._create_dbs()
+ self._add_token(uuid)
+
+ def _create_dbs(self):
+ requests.put(self._token_db_url)
+ requests.put(self._shared_db_url)
+
+ def _add_token(self, uuid):
+ token = sha512(DEFAULT_TOKEN).hexdigest()
+ content = {'type': 'Token', 'user_id': uuid}
+ requests.put(
+ self._token_db_url + '/' + token, data=json.dumps(content))
+
+ def teardown(self):
+ requests.delete(self._token_db_url)
+ requests.delete(self._shared_db_url)
+
+
+@pytest.fixture()
+def soledad_dbs(request):
+ couch_url = request.config.option.couch_url
+
+ def create(uuid):
+ db = SoledadDatabases(couch_url)
+ request.addfinalizer(db.teardown)
+ return db.setup(uuid)
+ return create
+
+
+#
+# soledad_client fixture: provides a clean soledad client for a test function.
+#
+
+@pytest.fixture()
+def soledad_client(tmpdir, soledad_server, remote_db, soledad_dbs, request):
+ passphrase = DEFAULT_PASSPHRASE
+ server_url = DEFAULT_URL
+ token = DEFAULT_TOKEN
+ default_uuid = uuid4().hex
+ remote_db(default_uuid)
+ soledad_dbs(default_uuid)
+
+ # get a soledad instance
+ def create():
+ secrets_path = os.path.join(tmpdir.strpath, '%s.secret' % default_uuid)
+ local_db_path = os.path.join(tmpdir.strpath, '%s.db' % default_uuid)
+ soledad_client = Soledad(
+ default_uuid,
+ unicode(passphrase),
+ secrets_path=secrets_path,
+ local_db_path=local_db_path,
+ server_url=server_url,
+ cert_file=None,
+ auth_token=token)
+ request.addfinalizer(soledad_client.close)
+ return soledad_client
+ return create
diff --git a/testing/tests/couch/conftest.py b/testing/tests/couch/conftest.py
deleted file mode 100644
index 1074f091..00000000
--- a/testing/tests/couch/conftest.py
+++ /dev/null
@@ -1,31 +0,0 @@
-import couchdb
-import pytest
-import random
-import string
-
-
-@pytest.fixture
-def random_name():
- return 'user-' + ''.join(
- random.choice(
- string.ascii_lowercase) for _ in range(10))
-
-
-class RandomDatabase(object):
-
- def __init__(self, couch_url, name):
- self.couch_url = couch_url
- self.name = name
- self.server = couchdb.client.Server(couch_url)
- self.database = self.server.create(name)
-
- def teardown(self):
- self.server.delete(self.name)
-
-
-@pytest.fixture
-def db(random_name, request):
- couch_url = request.config.getoption('--couch-url')
- db = RandomDatabase(couch_url, random_name)
- request.addfinalizer(db.teardown)
- return db
diff --git a/testing/tests/couch/test_command.py b/testing/tests/couch/test_command.py
index 68097fb1..9fb2c153 100644
--- a/testing/tests/couch/test_command.py
+++ b/testing/tests/couch/test_command.py
@@ -25,6 +25,7 @@ class CommandBasedDBCreationTest(unittest.TestCase):
state.ensure_database, "user-1337")
def test_raises_unauthorized_by_default(self):
- state = couch_state.CouchServerState("url", check_schema_versions=False)
+ state = couch_state.CouchServerState("url",
+ check_schema_versions=False)
self.assertRaises(u1db_errors.Unauthorized,
state.ensure_database, "user-1337")
diff --git a/testing/tests/couch/test_state.py b/testing/tests/couch/test_state.py
index e293b5b8..e5ac3704 100644
--- a/testing/tests/couch/test_state.py
+++ b/testing/tests/couch/test_state.py
@@ -1,25 +1,32 @@
import pytest
-
from leap.soledad.common.couch import CONFIG_DOC_ID
from leap.soledad.common.couch import SCHEMA_VERSION
from leap.soledad.common.couch import SCHEMA_VERSION_KEY
from leap.soledad.common.couch.state import CouchServerState
+from uuid import uuid4
from leap.soledad.common.errors import WrongCouchSchemaVersionError
from leap.soledad.common.errors import MissingCouchConfigDocumentError
+from test_soledad.util import CouchDBTestCase
+
+class CouchDesignDocsTests(CouchDBTestCase):
-def test_wrong_couch_version_raises(db):
- wrong_schema_version = SCHEMA_VERSION + 1
- db.database.create(
- {'_id': CONFIG_DOC_ID, SCHEMA_VERSION_KEY: wrong_schema_version})
- with pytest.raises(WrongCouchSchemaVersionError):
- CouchServerState(db.couch_url, create_cmd='/bin/echo',
- check_schema_versions=True)
+ def setUp(self):
+ CouchDBTestCase.setUp(self)
+ self.db = self.couch_server.create('user-' + uuid4().hex)
+ self.addCleanup(self.delete_db, self.db.name)
+ def test_wrong_couch_version_raises(self):
+ wrong_schema_version = SCHEMA_VERSION + 1
+ self.db.create(
+ {'_id': CONFIG_DOC_ID, SCHEMA_VERSION_KEY: wrong_schema_version})
+ with pytest.raises(WrongCouchSchemaVersionError):
+ CouchServerState(self.couch_url, create_cmd='/bin/echo',
+ check_schema_versions=True)
-def test_missing_config_doc_raises(db):
- db.database.create({})
- with pytest.raises(MissingCouchConfigDocumentError):
- CouchServerState(db.couch_url, create_cmd='/bin/echo',
- check_schema_versions=True)
+ def test_missing_config_doc_raises(self):
+ self.db.create({})
+ with pytest.raises(MissingCouchConfigDocumentError):
+ CouchServerState(self.couch_url, create_cmd='/bin/echo',
+ check_schema_versions=True)
diff --git a/testing/tests/perf/conftest.py b/testing/tests/perf/conftest.py
deleted file mode 100644
index 6fa6b2c0..00000000
--- a/testing/tests/perf/conftest.py
+++ /dev/null
@@ -1,249 +0,0 @@
-import json
-import os
-import pytest
-import requests
-import random
-import base64
-import signal
-import time
-
-from hashlib import sha512
-from uuid import uuid4
-from subprocess import call
-from urlparse import urljoin
-from twisted.internet import threads, reactor
-
-from leap.soledad.client import Soledad
-from leap.soledad.common.couch import CouchDatabase
-
-
-# we have to manually setup the events server in order to be able to signal
-# events. This is usually done by the enclosing application using soledad
-# client (i.e. bitmask client).
-from leap.common.events import server
-server.ensure_server()
-
-
-def pytest_addoption(parser):
- parser.addoption(
- "--couch-url", type="string", default="http://127.0.0.1:5984",
- help="the url for the couch server to be used during tests")
- parser.addoption(
- "--num-docs", type="int", default=100,
- help="the number of documents to use in performance tests")
-
-
-#
-# default options for all tests
-#
-
-DEFAULT_PASSPHRASE = '123'
-
-DEFAULT_URL = 'http://127.0.0.1:2424'
-DEFAULT_PRIVKEY = 'soledad_privkey.pem'
-DEFAULT_CERTKEY = 'soledad_certkey.pem'
-DEFAULT_TOKEN = 'an-auth-token'
-
-
-@pytest.fixture()
-def payload():
- def generate(size):
- random.seed(1337) # same seed to avoid different bench results
- payload_bytes = bytearray(random.getrandbits(8) for _ in xrange(size))
- # encode as base64 to avoid ascii encode/decode errors
- return base64.b64encode(payload_bytes)[:size] # remove b64 overhead
- return generate
-
-
-#
-# soledad_dbs fixture: provides all databases needed by soledad server in a per
-# module scope (same databases for all tests in this module).
-#
-
-def _token_dbname():
- dbname = 'tokens_' + \
- str(int(time.time() / (30 * 24 * 3600)))
- return dbname
-
-
-class SoledadDatabases(object):
-
- def __init__(self, url):
- self._token_db_url = urljoin(url, _token_dbname())
- self._shared_db_url = urljoin(url, 'shared')
-
- def setup(self, uuid):
- self._create_dbs()
- self._add_token(uuid)
-
- def _create_dbs(self):
- requests.put(self._token_db_url)
- requests.put(self._shared_db_url)
-
- def _add_token(self, uuid):
- token = sha512(DEFAULT_TOKEN).hexdigest()
- content = {'type': 'Token', 'user_id': uuid}
- requests.put(
- self._token_db_url + '/' + token, data=json.dumps(content))
-
- def teardown(self):
- requests.delete(self._token_db_url)
- requests.delete(self._shared_db_url)
-
-
-@pytest.fixture()
-def soledad_dbs(request):
- couch_url = request.config.option.couch_url
-
- def create(uuid):
- db = SoledadDatabases(couch_url)
- request.addfinalizer(db.teardown)
- return db.setup(uuid)
- return create
-
-
-#
-# remote_db fixture: provides an empty database for a given user in a per
-# function scope.
-#
-
-class UserDatabase(object):
-
- def __init__(self, url, uuid):
- self._remote_db_url = urljoin(url, 'user-%s' % uuid)
-
- def setup(self):
- return CouchDatabase.open_database(
- url=self._remote_db_url, create=True, replica_uid=None)
-
- def teardown(self):
- requests.delete(self._remote_db_url)
-
-
-@pytest.fixture()
-def remote_db(request):
- couch_url = request.config.option.couch_url
-
- def create(uuid):
- db = UserDatabase(couch_url, uuid)
- request.addfinalizer(db.teardown)
- return db.setup()
- return create
-
-
-def get_pid(pidfile):
- if not os.path.isfile(pidfile):
- return 0
- try:
- with open(pidfile) as f:
- return int(f.read())
- except IOError:
- return 0
-
-
-#
-# soledad_server fixture: provides a running soledad server in a per module
-# context (same soledad server for all tests in this module).
-#
-
-class SoledadServer(object):
-
- def __init__(self, tmpdir_factory, couch_url):
- tmpdir = tmpdir_factory.mktemp('soledad-server')
- self._pidfile = os.path.join(tmpdir.strpath, 'soledad-server.pid')
- self._logfile = os.path.join(tmpdir.strpath, 'soledad-server.log')
- self._couch_url = couch_url
-
- def start(self):
- self._create_conf_file()
- # start the server
- call([
- 'twistd',
- '--logfile=%s' % self._logfile,
- '--pidfile=%s' % self._pidfile,
- 'web',
- '--wsgi=leap.soledad.server.application.wsgi_application',
- '--port=2424'
- ])
-
- def _create_conf_file(self):
- if not os.access('/etc', os.W_OK):
- return
- if not os.path.isdir('/etc/soledad'):
- os.mkdir('/etc/soledad')
- with open('/etc/soledad/soledad-server.conf', 'w') as f:
- content = '[soledad-server]\ncouch_url = %s' % self._couch_url
- f.write(content)
-
- def stop(self):
- pid = get_pid(self._pidfile)
- os.kill(pid, signal.SIGKILL)
-
-
-@pytest.fixture(scope='module')
-def soledad_server(tmpdir_factory, request):
- couch_url = request.config.option.couch_url
- server = SoledadServer(tmpdir_factory, couch_url)
- server.start()
- request.addfinalizer(server.stop)
- return server
-
-
-@pytest.fixture()
-def txbenchmark(benchmark):
- def blockOnThread(*args, **kwargs):
- return threads.deferToThread(
- benchmark, threads.blockingCallFromThread,
- reactor, *args, **kwargs)
- return blockOnThread
-
-
-@pytest.fixture()
-def txbenchmark_with_setup(benchmark):
- def blockOnThreadWithSetup(setup, f):
- def blocking_runner(*args, **kwargs):
- return threads.blockingCallFromThread(reactor, f, *args, **kwargs)
-
- def blocking_setup():
- args = threads.blockingCallFromThread(reactor, setup)
- try:
- return tuple(arg for arg in args), {}
- except TypeError:
- return ((args,), {}) if args else None
-
- def bench():
- return benchmark.pedantic(blocking_runner, setup=blocking_setup,
- rounds=4, warmup_rounds=1)
- return threads.deferToThread(bench)
- return blockOnThreadWithSetup
-
-
-#
-# soledad_client fixture: provides a clean soledad client for a test function.
-#
-
-@pytest.fixture()
-def soledad_client(tmpdir, soledad_server, remote_db, soledad_dbs, request):
- passphrase = DEFAULT_PASSPHRASE
- server_url = DEFAULT_URL
- token = DEFAULT_TOKEN
- default_uuid = uuid4().hex
- remote_db(default_uuid)
- soledad_dbs(default_uuid)
-
- # get a soledad instance
- def create():
- secrets_path = os.path.join(tmpdir.strpath, '%s.secret' % uuid4().hex)
- local_db_path = os.path.join(tmpdir.strpath, '%s.db' % uuid4().hex)
- soledad_client = Soledad(
- default_uuid,
- unicode(passphrase),
- secrets_path=secrets_path,
- local_db_path=local_db_path,
- server_url=server_url,
- cert_file=None,
- auth_token=token,
- defer_encryption=True)
- request.addfinalizer(soledad_client.close)
- return soledad_client
- return create
diff --git a/testing/tests/perf/test_crypto.py b/testing/tests/perf/test_crypto.py
deleted file mode 100644
index be00560b..00000000
--- a/testing/tests/perf/test_crypto.py
+++ /dev/null
@@ -1,81 +0,0 @@
-import pytest
-import json
-from uuid import uuid4
-from leap.soledad.common.document import SoledadDocument
-from leap.soledad.client.crypto import encrypt_sym
-from leap.soledad.client.crypto import decrypt_sym
-
-
-def create_doc_encryption(size):
- @pytest.mark.benchmark(group="test_crypto_encrypt_doc")
- def test_doc_encryption(soledad_client, benchmark, payload):
- crypto = soledad_client()._crypto
-
- DOC_CONTENT = {'payload': payload(size)}
- doc = SoledadDocument(
- doc_id=uuid4().hex, rev='rev',
- json=json.dumps(DOC_CONTENT))
-
- benchmark(crypto.encrypt_doc, doc)
- return test_doc_encryption
-
-
-def create_doc_decryption(size):
- @pytest.mark.benchmark(group="test_crypto_decrypt_doc")
- def test_doc_decryption(soledad_client, benchmark, payload):
- crypto = soledad_client()._crypto
-
- DOC_CONTENT = {'payload': payload(size)}
- doc = SoledadDocument(
- doc_id=uuid4().hex, rev='rev',
- json=json.dumps(DOC_CONTENT))
- encrypted_doc = crypto.encrypt_doc(doc)
- doc.set_json(encrypted_doc)
-
- benchmark(crypto.decrypt_doc, doc)
- return test_doc_decryption
-
-
-test_encrypt_doc_10k = create_doc_encryption(10*1000)
-test_encrypt_doc_100k = create_doc_encryption(100*1000)
-test_encrypt_doc_500k = create_doc_encryption(500*1000)
-test_encrypt_doc_1M = create_doc_encryption(1000*1000)
-test_encrypt_doc_10M = create_doc_encryption(10*1000*1000)
-test_encrypt_doc_50M = create_doc_encryption(50*1000*1000)
-test_decrypt_doc_10k = create_doc_decryption(10*1000)
-test_decrypt_doc_100k = create_doc_decryption(100*1000)
-test_decrypt_doc_500k = create_doc_decryption(500*1000)
-test_decrypt_doc_1M = create_doc_decryption(1000*1000)
-test_decrypt_doc_10M = create_doc_decryption(10*1000*1000)
-test_decrypt_doc_50M = create_doc_decryption(50*1000*1000)
-
-
-def create_raw_encryption(size):
- @pytest.mark.benchmark(group="test_crypto_raw_encrypt")
- def test_raw_encrypt(benchmark, payload):
- key = payload(32)
- benchmark(encrypt_sym, payload(size), key)
- return test_raw_encrypt
-
-
-def create_raw_decryption(size):
- @pytest.mark.benchmark(group="test_crypto_raw_decrypt")
- def test_raw_decrypt(benchmark, payload):
- key = payload(32)
- iv, ciphertext = encrypt_sym(payload(size), key)
- benchmark(decrypt_sym, ciphertext, key, iv)
- return test_raw_decrypt
-
-
-test_encrypt_raw_10k = create_raw_encryption(10*1000)
-test_encrypt_raw_100k = create_raw_encryption(100*1000)
-test_encrypt_raw_500k = create_raw_encryption(500*1000)
-test_encrypt_raw_1M = create_raw_encryption(1000*1000)
-test_encrypt_raw_10M = create_raw_encryption(10*1000*1000)
-test_encrypt_raw_50M = create_raw_encryption(50*1000*1000)
-test_decrypt_raw_10k = create_raw_decryption(10*1000)
-test_decrypt_raw_100k = create_raw_decryption(100*1000)
-test_decrypt_raw_500k = create_raw_decryption(500*1000)
-test_decrypt_raw_1M = create_raw_decryption(1000*1000)
-test_decrypt_raw_10M = create_raw_decryption(10*1000*1000)
-test_decrypt_raw_50M = create_raw_decryption(50*1000*1000)
diff --git a/testing/tests/perf/test_encdecpool.py b/testing/tests/perf/test_encdecpool.py
deleted file mode 100644
index 77091a41..00000000
--- a/testing/tests/perf/test_encdecpool.py
+++ /dev/null
@@ -1,78 +0,0 @@
-import pytest
-import json
-from uuid import uuid4
-from twisted.internet.defer import gatherResults
-from leap.soledad.client.encdecpool import SyncEncrypterPool
-from leap.soledad.client.encdecpool import SyncDecrypterPool
-from leap.soledad.common.document import SoledadDocument
-# FIXME: test load is low due issue #7370, higher values will get out of memory
-
-
-def create_encrypt(amount, size):
- @pytest.mark.benchmark(group="test_pool_encrypt")
- @pytest.inlineCallbacks
- def test(soledad_client, txbenchmark_with_setup, request, payload):
- DOC_CONTENT = {'payload': payload(size)}
-
- def setup():
- client = soledad_client()
- pool = SyncEncrypterPool(client._crypto, client._sync_db)
- pool.start()
- request.addfinalizer(pool.stop)
- docs = [
- SoledadDocument(doc_id=uuid4().hex, rev='rev',
- json=json.dumps(DOC_CONTENT))
- for _ in xrange(amount)
- ]
- return pool, docs
-
- @pytest.inlineCallbacks
- def put_and_wait(pool, docs):
- yield gatherResults([pool.encrypt_doc(doc) for doc in docs])
-
- yield txbenchmark_with_setup(setup, put_and_wait)
- return test
-
-test_encdecpool_encrypt_100_10k = create_encrypt(100, 10*1000)
-test_encdecpool_encrypt_100_100k = create_encrypt(100, 100*1000)
-test_encdecpool_encrypt_100_500k = create_encrypt(100, 500*1000)
-
-
-def create_decrypt(amount, size):
- @pytest.mark.benchmark(group="test_pool_decrypt")
- @pytest.inlineCallbacks
- def test(soledad_client, txbenchmark_with_setup, request, payload):
- DOC_CONTENT = {'payload': payload(size)}
- client = soledad_client()
-
- def setup():
- pool = SyncDecrypterPool(
- client._crypto,
- client._sync_db,
- source_replica_uid=client._dbpool.replica_uid,
- insert_doc_cb=lambda x, y, z: False) # ignored
- pool.start(amount)
- request.addfinalizer(pool.stop)
- crypto = client._crypto
- docs = []
- for _ in xrange(amount):
- doc = SoledadDocument(
- doc_id=uuid4().hex, rev='rev',
- json=json.dumps(DOC_CONTENT))
- encrypted_content = json.loads(crypto.encrypt_doc(doc))
- docs.append((doc.doc_id, encrypted_content))
- return pool, docs
-
- def put_and_wait(pool, docs):
- deferreds = [] # fires on completion
- for idx, (doc_id, content) in enumerate(docs, 1):
- deferreds.append(pool.insert_encrypted_received_doc(
- doc_id, 'rev', content, idx, "trans_id", idx))
- return gatherResults(deferreds)
-
- yield txbenchmark_with_setup(setup, put_and_wait)
- return test
-
-test_encdecpool_decrypt_100_10k = create_decrypt(100, 10*1000)
-test_encdecpool_decrypt_100_100k = create_decrypt(100, 100*1000)
-test_encdecpool_decrypt_100_500k = create_decrypt(100, 500*1000)
diff --git a/testing/tests/server/test_server.py b/testing/tests/server/test_server.py
index 6bbcf002..6710caaf 100644
--- a/testing/tests/server/test_server.py
+++ b/testing/tests/server/test_server.py
@@ -41,7 +41,7 @@ from test_soledad.util import (
BaseSoledadTest,
)
-from leap.soledad.common import crypto
+from leap.soledad.client import _crypto
from leap.soledad.client import Soledad
from leap.soledad.server.config import load_configuration
from leap.soledad.server.config import CONFIG_DEFAULTS
@@ -412,13 +412,9 @@ class EncryptedSyncTestCase(
self.assertEqual(soldoc.doc_id, couchdoc.doc_id)
self.assertEqual(soldoc.rev, couchdoc.rev)
couch_content = couchdoc.content.keys()
- self.assertEqual(6, len(couch_content))
- self.assertTrue(crypto.ENC_JSON_KEY in couch_content)
- self.assertTrue(crypto.ENC_SCHEME_KEY in couch_content)
- self.assertTrue(crypto.ENC_METHOD_KEY in couch_content)
- self.assertTrue(crypto.ENC_IV_KEY in couch_content)
- self.assertTrue(crypto.MAC_KEY in couch_content)
- self.assertTrue(crypto.MAC_METHOD_KEY in couch_content)
+ self.assertEqual(['raw'], couch_content)
+ content = couchdoc.get_json()
+ self.assertTrue(_crypto.is_symmetrically_encrypted(content))
d = sol1.get_all_docs()
d.addCallback(_db1AssertEmptyDocList)
@@ -473,16 +469,6 @@ class EncryptedSyncTestCase(
"""
return self._test_encrypted_sym_sync(passphrase=u'ãáàäéàëíìïóòöõúùüñç')
- def test_sync_very_large_files(self):
- """
- Test if Soledad can sync very large files.
- """
- self.skipTest(
- "Work in progress. For reference, see: "
- "https://leap.se/code/issues/7370")
- length = 100 * (10 ** 6) # 100 MB
- return self._test_encrypted_sym_sync(doc_size=length, number_of_docs=1)
-
def test_sync_many_small_files(self):
"""
Test if Soledad can sync many smallfiles.
diff --git a/testing/tests/sync/test_encdecpool.py b/testing/tests/sync/test_encdecpool.py
deleted file mode 100644
index 4a32885e..00000000
--- a/testing/tests/sync/test_encdecpool.py
+++ /dev/null
@@ -1,306 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_encdecpool.py
-# Copyright (C) 2015 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for encryption and decryption pool.
-"""
-import json
-from random import shuffle
-
-from mock import MagicMock
-from twisted.internet.defer import inlineCallbacks
-
-from leap.soledad.client.encdecpool import SyncEncrypterPool
-from leap.soledad.client.encdecpool import SyncDecrypterPool
-
-from leap.soledad.common.document import SoledadDocument
-from test_soledad.util import BaseSoledadTest
-from twisted.internet import defer
-
-DOC_ID = "mydoc"
-DOC_REV = "rev"
-DOC_CONTENT = {'simple': 'document'}
-
-
-class TestSyncEncrypterPool(BaseSoledadTest):
-
- def setUp(self):
- BaseSoledadTest.setUp(self)
- crypto = self._soledad._crypto
- sync_db = self._soledad._sync_db
- self._pool = SyncEncrypterPool(crypto, sync_db)
- self._pool.start()
-
- def tearDown(self):
- self._pool.stop()
- BaseSoledadTest.tearDown(self)
-
- @inlineCallbacks
- def test_get_encrypted_doc_returns_none(self):
- """
- Test that trying to get an encrypted doc from the pool returns None if
- the document was never added for encryption.
- """
- doc = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV)
- self.assertIsNone(doc)
-
- @inlineCallbacks
- def test_encrypt_doc_and_get_it_back(self):
- """
- Test that the pool actually encrypts a document added to the queue.
- """
- doc = SoledadDocument(
- doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT))
-
- yield self._pool.encrypt_doc(doc)
- encrypted = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV)
-
- self.assertIsNotNone(encrypted)
-
-
-class TestSyncDecrypterPool(BaseSoledadTest):
-
- def _insert_doc_cb(self, doc, gen, trans_id):
- """
- Method used to mock the sync's return_doc_cb callback.
- """
- self._inserted_docs.append((doc, gen, trans_id))
-
- def _setup_pool(self, sync_db=None):
- sync_db = sync_db or self._soledad._sync_db
- return SyncDecrypterPool(
- self._soledad._crypto,
- sync_db,
- source_replica_uid=self._soledad._dbpool.replica_uid,
- insert_doc_cb=self._insert_doc_cb)
-
- def setUp(self):
- BaseSoledadTest.setUp(self)
- # setup the pool
- self._pool = self._setup_pool()
- # reset the inserted docs mock
- self._inserted_docs = []
-
- def tearDown(self):
- if self._pool.running:
- self._pool.stop()
- BaseSoledadTest.tearDown(self)
-
- def test_insert_received_doc(self):
- """
- Test that one document added to the pool is inserted using the
- callback.
- """
- self._pool.start(1)
- self._pool.insert_received_doc(
- DOC_ID, DOC_REV, "{}", 1, "trans_id", 1)
-
- def _assert_doc_was_inserted(_):
- self.assertEqual(
- self._inserted_docs,
- [(SoledadDocument(DOC_ID, DOC_REV, "{}"), 1, u"trans_id")])
-
- self._pool.deferred.addCallback(_assert_doc_was_inserted)
- return self._pool.deferred
-
- def test_looping_control(self):
- """
- Start and stop cleanly.
- """
- self._pool.start(10)
- self.assertTrue(self._pool.running)
- self._pool.stop()
- self.assertFalse(self._pool.running)
- self.assertTrue(self._pool.deferred.called)
-
- def test_sync_id_col_is_created_if_non_existing_in_docs_recvd_table(self):
- """
- Test that docs_received table is migrated, and has the sync_id column
- """
- mock_run_query = MagicMock(return_value=defer.succeed(None))
- mock_sync_db = MagicMock()
- mock_sync_db.runQuery = mock_run_query
- pool = self._setup_pool(mock_sync_db)
- d = pool.start(10)
- pool.stop()
-
- def assert_trial_to_create_sync_id_column(_):
- mock_run_query.assert_called_once_with(
- "ALTER TABLE docs_received ADD COLUMN sync_id")
-
- d.addCallback(assert_trial_to_create_sync_id_column)
- return d
-
- def test_insert_received_doc_many(self):
- """
- Test that many documents added to the pool are inserted using the
- callback.
- """
- many = 100
- self._pool.start(many)
-
- # insert many docs in the pool
- for i in xrange(many):
- gen = idx = i + 1
- doc_id = "doc_id: %d" % idx
- rev = "rev: %d" % idx
- content = {'idx': idx}
- trans_id = "trans_id: %d" % idx
- self._pool.insert_received_doc(
- doc_id, rev, content, gen, trans_id, idx)
-
- def _assert_doc_was_inserted(_):
- self.assertEqual(many, len(self._inserted_docs))
- idx = 1
- for doc, gen, trans_id in self._inserted_docs:
- expected_gen = idx
- expected_doc_id = "doc_id: %d" % idx
- expected_rev = "rev: %d" % idx
- expected_content = json.dumps({'idx': idx})
- expected_trans_id = "trans_id: %d" % idx
-
- self.assertEqual(expected_doc_id, doc.doc_id)
- self.assertEqual(expected_rev, doc.rev)
- self.assertEqual(expected_content, json.dumps(doc.content))
- self.assertEqual(expected_gen, gen)
- self.assertEqual(expected_trans_id, trans_id)
-
- idx += 1
-
- self._pool.deferred.addCallback(_assert_doc_was_inserted)
- return self._pool.deferred
-
- def test_insert_encrypted_received_doc(self):
- """
- Test that one encrypted document added to the pool is decrypted and
- inserted using the callback.
- """
- crypto = self._soledad._crypto
- doc = SoledadDocument(
- doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT))
- encrypted_content = json.loads(crypto.encrypt_doc(doc))
-
- # insert the encrypted document in the pool
- self._pool.start(1)
- self._pool.insert_encrypted_received_doc(
- DOC_ID, DOC_REV, encrypted_content, 1, "trans_id", 1)
-
- def _assert_doc_was_decrypted_and_inserted(_):
- self.assertEqual(1, len(self._inserted_docs))
- self.assertEqual(self._inserted_docs, [(doc, 1, u"trans_id")])
-
- self._pool.deferred.addCallback(
- _assert_doc_was_decrypted_and_inserted)
- return self._pool.deferred
-
- @inlineCallbacks
- def test_processing_order(self):
- """
- This test ensures that processing of documents only occur if there is
- a sequence in place.
- """
- crypto = self._soledad._crypto
-
- docs = []
- for i in xrange(1, 10):
- i = str(i)
- doc = SoledadDocument(
- doc_id=DOC_ID + i, rev=DOC_REV + i,
- json=json.dumps(DOC_CONTENT))
- encrypted_content = json.loads(crypto.encrypt_doc(doc))
- docs.append((doc, encrypted_content))
-
- # insert the encrypted document in the pool
- yield self._pool.start(10) # pool is expecting to process 10 docs
- self._pool._loop.stop() # we are processing manually
- # first three arrives, forming a sequence
- for i, (doc, encrypted_content) in enumerate(docs[:3]):
- gen = idx = i + 1
- yield self._pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, encrypted_content, gen, "trans_id", idx)
-
- # last one arrives alone, so it can't be processed
- doc, encrypted_content = docs[-1]
- yield self._pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, encrypted_content, 10, "trans_id", 10)
-
- yield self._pool._decrypt_and_recurse()
-
- self.assertEqual(3, self._pool._processed_docs)
-
- def test_insert_encrypted_received_doc_many(self, many=100):
- """
- Test that many encrypted documents added to the pool are decrypted and
- inserted using the callback.
- """
- crypto = self._soledad._crypto
- self._pool.start(many)
- docs = []
-
- # insert many encrypted docs in the pool
- for i in xrange(many):
- gen = idx = i + 1
- doc_id = "doc_id: %d" % idx
- rev = "rev: %d" % idx
- content = {'idx': idx}
- trans_id = "trans_id: %d" % idx
-
- doc = SoledadDocument(
- doc_id=doc_id, rev=rev, json=json.dumps(content))
-
- encrypted_content = json.loads(crypto.encrypt_doc(doc))
- docs.append((doc_id, rev, encrypted_content, gen,
- trans_id, idx))
- shuffle(docs)
-
- for doc in docs:
- self._pool.insert_encrypted_received_doc(*doc)
-
- def _assert_docs_were_decrypted_and_inserted(_):
- self.assertEqual(many, len(self._inserted_docs))
- idx = 1
- for doc, gen, trans_id in self._inserted_docs:
- expected_gen = idx
- expected_doc_id = "doc_id: %d" % idx
- expected_rev = "rev: %d" % idx
- expected_content = json.dumps({'idx': idx})
- expected_trans_id = "trans_id: %d" % idx
-
- self.assertEqual(expected_doc_id, doc.doc_id)
- self.assertEqual(expected_rev, doc.rev)
- self.assertEqual(expected_content, json.dumps(doc.content))
- self.assertEqual(expected_gen, gen)
- self.assertEqual(expected_trans_id, trans_id)
-
- idx += 1
-
- self._pool.deferred.addCallback(
- _assert_docs_were_decrypted_and_inserted)
- return self._pool.deferred
-
- @inlineCallbacks
- def test_pool_reuse(self):
- """
- The pool is reused between syncs, this test verifies that
- reusing is fine.
- """
- for i in xrange(3):
- yield self.test_insert_encrypted_received_doc_many(5)
- self._inserted_docs = []
- decrypted_docs = yield self._pool._get_docs(encrypted=False)
- # check that decrypted docs staging is clean
- self.assertEquals([], decrypted_docs)
- self._pool.stop()
diff --git a/testing/tests/sync/test_sqlcipher_sync.py b/testing/tests/sync/test_sqlcipher_sync.py
index 3cbefc8b..26f63a40 100644
--- a/testing/tests/sync/test_sqlcipher_sync.py
+++ b/testing/tests/sync/test_sqlcipher_sync.py
@@ -27,8 +27,6 @@ from leap.soledad.common.l2db import sync
from leap.soledad.common.l2db import vectorclock
from leap.soledad.common.l2db import errors
-from leap.soledad.common.crypto import ENC_SCHEME_KEY
-from leap.soledad.client.crypto import decrypt_doc_dict
from leap.soledad.client.http_target import SoledadHTTPSyncTarget
from test_soledad import u1db_tests as tests
@@ -545,13 +543,7 @@ class SQLCipherDatabaseSyncTests(
self.assertFalse(doc2.has_conflicts)
self.sync(self.db2, db3)
doc3 = db3.get_doc('the-doc')
- if ENC_SCHEME_KEY in doc3.content:
- _crypto = self._soledad._crypto
- key = _crypto.doc_passphrase(doc3.doc_id)
- secret = _crypto.secret
- doc3.set_json(decrypt_doc_dict(
- doc3.content,
- doc3.doc_id, doc3.rev, key, secret))
+
self.assertEqual(doc4.get_json(), doc3.get_json())
self.assertFalse(doc3.has_conflicts)
self.db1.close()
@@ -713,15 +705,12 @@ def make_local_db_and_soledad_target(
test.startTwistedServer()
replica_uid = os.path.basename(path)
db = test.request_state._create_database(replica_uid)
- sync_db = test._soledad._sync_db
- sync_enc_pool = test._soledad._sync_enc_pool
st = soledad_sync_target(
test, db._dbname,
- source_replica_uid=source_replica_uid,
- sync_db=sync_db,
- sync_enc_pool=sync_enc_pool)
+ source_replica_uid=source_replica_uid)
return db, st
+
target_scenarios = [
('leap', {
'create_db_and_target': make_local_db_and_soledad_target,
diff --git a/testing/tests/sync/test_sync.py b/testing/tests/sync/test_sync.py
index 5290003e..76757c5b 100644
--- a/testing/tests/sync/test_sync.py
+++ b/testing/tests/sync/test_sync.py
@@ -19,6 +19,7 @@ import threading
import time
from urlparse import urljoin
+from mock import Mock
from twisted.internet import defer
from testscenarios import TestWithScenarios
@@ -184,10 +185,9 @@ class TestSoledadDbSync(
target = soledad_sync_target(
self, self.db2._dbname,
source_replica_uid=self._soledad._dbpool.replica_uid)
- self.addCleanup(target.close)
return sync.SoledadSynchronizer(
self.db,
- target).sync(defer_decryption=False)
+ target).sync()
@defer.inlineCallbacks
def test_db_sync(self):
@@ -211,3 +211,21 @@ class TestSoledadDbSync(
self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False)
# TODO: add u1db.tests.test_sync.TestRemoteSyncIntegration
+
+
+class TestSoledadSynchronizer(BaseSoledadTest):
+
+ def setUp(self):
+ BaseSoledadTest.setUp(self)
+ self.db = Mock()
+ self.target = Mock()
+ self.synchronizer = sync.SoledadSynchronizer(
+ self.db,
+ self.target)
+
+ def test_docs_by_gen_includes_deleted(self):
+ changes = [('id', 'gen', 'trans')]
+ docs_by_gen = self.synchronizer._docs_by_gen_from_changes(changes)
+ f, args, kwargs = docs_by_gen[0][0]
+ self.assertIn('include_deleted', kwargs)
+ self.assertTrue(kwargs['include_deleted'])
diff --git a/testing/tests/sync/test_sync_deferred.py b/testing/tests/sync/test_sync_deferred.py
deleted file mode 100644
index 4948aaf8..00000000
--- a/testing/tests/sync/test_sync_deferred.py
+++ /dev/null
@@ -1,196 +0,0 @@
-# test_sync_deferred.py
-# Copyright (C) 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Test Leap backend bits: sync with deferred encryption/decryption.
-"""
-import time
-import os
-import random
-import string
-import shutil
-
-from urlparse import urljoin
-
-from twisted.internet import defer
-
-from leap.soledad.common import couch
-
-from leap.soledad.client import sync
-from leap.soledad.client.sqlcipher import SQLCipherOptions
-from leap.soledad.client.sqlcipher import SQLCipherDatabase
-
-from testscenarios import TestWithScenarios
-
-from test_soledad import u1db_tests as tests
-from test_soledad.util import ADDRESS
-from test_soledad.util import SoledadWithCouchServerMixin
-from test_soledad.util import make_soledad_app
-from test_soledad.util import soledad_sync_target
-
-
-# Just to make clear how this test is different... :)
-DEFER_DECRYPTION = True
-
-WAIT_STEP = 1
-MAX_WAIT = 10
-DBPASS = "pass"
-
-
-class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin):
-
- """
- Another base class for testing the deferred encryption/decryption during
- the syncs, using the intermediate database.
- """
- defer_sync_encryption = True
-
- def setUp(self):
- SoledadWithCouchServerMixin.setUp(self)
- self.startTwistedServer()
- # config info
- self.db1_file = os.path.join(self.tempdir, "db1.u1db")
- os.unlink(self.db1_file)
- self.db_pass = DBPASS
- self.email = ADDRESS
-
- # get a random prefix for each test, so we do not mess with
- # concurrency during initialization and shutting down of
- # each local db.
- self.rand_prefix = ''.join(
- map(lambda x: random.choice(string.ascii_letters), range(6)))
-
- # open test dbs: db1 will be the local sqlcipher db (which
- # instantiates a syncdb). We use the self._soledad instance that was
- # already created on some setUp method.
- import binascii
- tohex = binascii.b2a_hex
- key = tohex(self._soledad.secrets.get_local_storage_key())
- sync_db_key = tohex(self._soledad.secrets.get_sync_db_key())
- dbpath = self._soledad._local_db_path
-
- self.opts = SQLCipherOptions(
- dbpath, key, is_raw_key=True, create=False,
- defer_encryption=True, sync_db_key=sync_db_key)
- self.db1 = SQLCipherDatabase(self.opts)
-
- self.db2 = self.request_state._create_database('test')
-
- def tearDown(self):
- # XXX should not access "private" attrs
- shutil.rmtree(os.path.dirname(self._soledad._local_db_path))
- SoledadWithCouchServerMixin.tearDown(self)
-
-
-class SyncTimeoutError(Exception):
-
- """
- Dummy exception to notify timeout during sync.
- """
- pass
-
-
-class TestSoledadDbSyncDeferredEncDecr(
- TestWithScenarios,
- BaseSoledadDeferredEncTest,
- tests.TestCaseWithServer):
-
- """
- Test db.sync remote sync shortcut.
- Case with deferred encryption and decryption: using the intermediate
- syncdb.
- """
-
- scenarios = [
- ('http', {
- 'make_app_with_state': make_soledad_app,
- 'make_database_for_test': tests.make_memory_database_for_test,
- }),
- ]
-
- oauth = False
- token = True
-
- def setUp(self):
- """
- Need to explicitely invoke inicialization on all bases.
- """
- BaseSoledadDeferredEncTest.setUp(self)
- self.server = self.server_thread = None
- self.syncer = None
-
- def tearDown(self):
- """
- Need to explicitely invoke destruction on all bases.
- """
- dbsyncer = getattr(self, 'dbsyncer', None)
- if dbsyncer:
- dbsyncer.close()
- BaseSoledadDeferredEncTest.tearDown(self)
-
- def do_sync(self):
- """
- Perform sync using SoledadSynchronizer, SoledadSyncTarget
- and Token auth.
- """
- replica_uid = self._soledad._dbpool.replica_uid
- sync_db = self._soledad._sync_db
- sync_enc_pool = self._soledad._sync_enc_pool
- dbsyncer = self._soledad._dbsyncer # Soledad.sync uses the dbsyncer
-
- target = soledad_sync_target(
- self, self.db2._dbname,
- source_replica_uid=replica_uid,
- sync_db=sync_db,
- sync_enc_pool=sync_enc_pool)
- self.addCleanup(target.close)
- return sync.SoledadSynchronizer(
- dbsyncer,
- target).sync(defer_decryption=True)
-
- def wait_for_sync(self):
- """
- Wait for sync to finish.
- """
- wait = 0
- syncer = self.syncer
- if syncer is not None:
- while syncer.syncing:
- time.sleep(WAIT_STEP)
- wait += WAIT_STEP
- if wait >= MAX_WAIT:
- raise SyncTimeoutError
-
- @defer.inlineCallbacks
- def test_db_sync(self):
- """
- Test sync.
-
- Adapted to check for encrypted content.
- """
- doc1 = self.db1.create_doc_from_json(tests.simple_doc)
- doc2 = self.db2.create_doc_from_json(tests.nested_doc)
- local_gen_before_sync = yield self.do_sync()
-
- gen, _, changes = self.db1.whats_changed(local_gen_before_sync)
- self.assertEqual(1, len(changes))
-
- self.assertEqual(doc2.doc_id, changes[0][0])
- self.assertEqual(1, gen - local_gen_before_sync)
-
- self.assertGetEncryptedDoc(
- self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False)
- self.assertGetEncryptedDoc(
- self.db1, doc2.doc_id, doc2.rev, tests.nested_doc, False)
diff --git a/testing/tests/sync/test_sync_mutex.py b/testing/tests/sync/test_sync_mutex.py
index 2626ab2a..432a3cd2 100644
--- a/testing/tests/sync/test_sync_mutex.py
+++ b/testing/tests/sync/test_sync_mutex.py
@@ -47,7 +47,7 @@ from test_soledad.util import soledad_sync_target
_old_sync = SoledadSynchronizer.sync
-def _timed_sync(self, defer_decryption=True):
+def _timed_sync(self):
t = time.time()
sync_id = uuid.uuid4()
@@ -62,10 +62,11 @@ def _timed_sync(self, defer_decryption=True):
self.source.sync_times[sync_id]['end'] = t
return passthrough
- d = _old_sync(self, defer_decryption=defer_decryption)
+ d = _old_sync(self)
d.addBoth(_store_finish_time)
return d
+
SoledadSynchronizer.sync = _timed_sync
# -- end of monkey-patching
diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py
index 964468ce..6ce9a5c5 100644
--- a/testing/tests/sync/test_sync_target.py
+++ b/testing/tests/sync/test_sync_target.py
@@ -30,10 +30,11 @@ from testscenarios import TestWithScenarios
from twisted.internet import defer
from leap.soledad.client import http_target as target
-from leap.soledad.client import crypto
+from leap.soledad.client.http_target.fetch_protocol import DocStreamReceiver
from leap.soledad.client.sqlcipher import SQLCipherU1DBSync
from leap.soledad.client.sqlcipher import SQLCipherOptions
from leap.soledad.client.sqlcipher import SQLCipherDatabase
+from leap.soledad.client import _crypto
from leap.soledad.common import l2db
@@ -44,6 +45,7 @@ from test_soledad.util import make_soledad_app
from test_soledad.util import make_token_soledad_app
from test_soledad.util import make_soledad_document_for_test
from test_soledad.util import soledad_sync_target
+from twisted.trial import unittest
from test_soledad.util import SoledadWithCouchServerMixin
from test_soledad.util import ADDRESS
from test_soledad.util import SQLCIPHER_SCENARIOS
@@ -53,92 +55,69 @@ from test_soledad.util import SQLCIPHER_SCENARIOS
# The following tests come from `u1db.tests.test_remote_sync_target`.
# -----------------------------------------------------------------------------
-class TestSoledadParseReceivedDocResponse(SoledadWithCouchServerMixin):
+class TestSoledadParseReceivedDocResponse(unittest.TestCase):
"""
Some tests had to be copied to this class so we can instantiate our own
target.
"""
- def setUp(self):
- SoledadWithCouchServerMixin.setUp(self)
- creds = {'token': {
- 'uuid': 'user-uuid',
- 'token': 'auth-token',
- }}
- self.target = target.SoledadHTTPSyncTarget(
- self.couch_url,
- uuid4().hex,
- creds,
- self._soledad._crypto,
- None)
-
- def tearDown(self):
- self.target.close()
- SoledadWithCouchServerMixin.tearDown(self)
+ def parse(self, stream):
+ parser = DocStreamReceiver(None, None, lambda *_: defer.succeed(42))
+ parser.dataReceived(stream)
+ parser.finish()
def test_extra_comma(self):
- """
- Test adapted to use encrypted content.
- """
doc = SoledadDocument('i', rev='r')
- doc.content = {}
- _crypto = self._soledad._crypto
- key = _crypto.doc_passphrase(doc.doc_id)
- secret = _crypto.secret
+ doc.content = {'a': 'b'}
- enc_json = crypto.encrypt_docstr(
- doc.get_json(), doc.doc_id, doc.rev,
- key, secret)
+ encrypted_docstr = _crypto.SoledadCrypto('safe').encrypt_doc(doc)
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("[\r\n{},\r\n]")
+ self.parse("[\r\n{},\r\n]")
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response(
+ self.parse(
('[\r\n{},\r\n{"id": "i", "rev": "r", ' +
- '"content": %s, "gen": 3, "trans_id": "T-sid"}' +
- ',\r\n]') % json.dumps(enc_json))
+ '"gen": 3, "trans_id": "T-sid"},\r\n' +
+ '%s,\r\n]') % encrypted_docstr)
def test_wrong_start(self):
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("{}\r\n]")
-
- with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("\r\n{}\r\n]")
+ self.parse("{}\r\n]")
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("")
+ self.parse("\r\n{}\r\n]")
def test_wrong_end(self):
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("[\r\n{}")
+ self.parse("[\r\n{}")
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("[\r\n")
+ self.parse("[\r\n")
def test_missing_comma(self):
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response(
+ self.parse(
'[\r\n{}\r\n{"id": "i", "rev": "r", '
'"content": "c", "gen": 3}\r\n]')
def test_no_entries(self):
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("[\r\n]")
+ self.parse("[\r\n]")
def test_error_in_stream(self):
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response(
+ self.parse(
'[\r\n{"new_generation": 0},'
'\r\n{"error": "unavailable"}\r\n')
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response(
+ self.parse(
'[\r\n{"error": "unavailable"}\r\n')
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response('[\r\n{"error": "?"}\r\n')
+ self.parse('[\r\n{"error": "?"}\r\n')
#
# functions for TestRemoteSyncTargets
@@ -151,13 +130,9 @@ def make_local_db_and_soledad_target(
test.startTwistedServer()
replica_uid = os.path.basename(path)
db = test.request_state._create_database(replica_uid)
- sync_db = test._soledad._sync_db
- sync_enc_pool = test._soledad._sync_enc_pool
st = soledad_sync_target(
test, db._dbname,
- source_replica_uid=source_replica_uid,
- sync_db=sync_db,
- sync_enc_pool=sync_enc_pool)
+ source_replica_uid=source_replica_uid)
return db, st
@@ -188,16 +163,11 @@ class TestSoledadSyncTarget(
def getSyncTarget(self, path=None, source_replica_uid=uuid4().hex):
if self.port is None:
self.startTwistedServer()
- sync_db = self._soledad._sync_db
- sync_enc_pool = self._soledad._sync_enc_pool
if path is None:
path = self.db2._dbname
target = self.sync_target(
self, path,
- source_replica_uid=source_replica_uid,
- sync_db=sync_db,
- sync_enc_pool=sync_enc_pool)
- self.addCleanup(target.close)
+ source_replica_uid=source_replica_uid)
return target
def setUp(self):
@@ -229,10 +199,10 @@ class TestSoledadSyncTarget(
other_docs.append((doc.doc_id, doc.rev, doc.get_json()))
doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ get_doc = (lambda _: doc, (1,), {})
new_gen, trans_id = yield remote_target.sync_exchange(
- [(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=receive_doc,
- defer_decryption=False)
+ [(get_doc, 10, 'T-sid')], 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=receive_doc)
self.assertEqual(1, new_gen)
self.assertGetEncryptedDoc(
db, 'doc-here', 'replica:1', '{"value": "here"}', False)
@@ -278,15 +248,16 @@ class TestSoledadSyncTarget(
doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
doc2 = self.make_document('doc-here2', 'replica:1',
'{"value": "here2"}')
+ get_doc1 = (lambda _: doc1, (1,), {})
+ get_doc2 = (lambda _: doc2, (2,), {})
with self.assertRaises(l2db.errors.U1DBError):
yield remote_target.sync_exchange(
- [(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')],
+ [(get_doc1, 10, 'T-sid'), (get_doc2, 11, 'T-sud')],
'replica',
last_known_generation=0,
last_known_trans_id=None,
- insert_doc_cb=receive_doc,
- defer_decryption=False)
+ insert_doc_cb=receive_doc)
self.assertGetEncryptedDoc(
db, 'doc-here', 'replica:1', '{"value": "here"}',
@@ -297,9 +268,8 @@ class TestSoledadSyncTarget(
# retry
trigger_ids = []
new_gen, trans_id = yield remote_target.sync_exchange(
- [(doc2, 11, 'T-sud')], 'replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=receive_doc,
- defer_decryption=False)
+ [(get_doc2, 11, 'T-sud')], 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=receive_doc)
self.assertGetEncryptedDoc(
db, 'doc-here2', 'replica:1', '{"value": "here2"}',
False)
@@ -328,10 +298,11 @@ class TestSoledadSyncTarget(
replica_uid_box.append(replica_uid)
doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ get_doc = (lambda _: doc, (1,), {})
new_gen, trans_id = yield remote_target.sync_exchange(
- [(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
+ [(get_doc, 10, 'T-sid')], 'replica', last_known_generation=0,
last_known_trans_id=None, insert_doc_cb=receive_doc,
- ensure_callback=ensure_cb, defer_decryption=False)
+ ensure_callback=ensure_cb)
self.assertEqual(1, new_gen)
db = self.db2
self.assertEqual(1, len(replica_uid_box))
@@ -339,6 +310,37 @@ class TestSoledadSyncTarget(
self.assertGetEncryptedDoc(
db, 'doc-here', 'replica:1', '{"value": "here"}', False)
+ @defer.inlineCallbacks
+ def test_sync_exchange_send_events(self):
+ """
+ Test for sync exchange's SOLEDAD_SYNC_SEND_STATUS event.
+ """
+ remote_target = self.getSyncTarget()
+ uuid = remote_target.uuid
+ events = []
+
+ def mocked_events(*args):
+ events.append((args))
+ self.patch(
+ target.send, '_emit_send_status', mocked_events)
+
+ doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ doc2 = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ doc3 = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ get_doc = (lambda _: doc, (1,), {})
+ get_doc2 = (lambda _: doc2, (1,), {})
+ get_doc3 = (lambda _: doc3, (1,), {})
+ docs = [(get_doc, 10, 'T-sid'),
+ (get_doc2, 11, 'T-sid2'), (get_doc3, 12, 'T-sid3')]
+ new_gen, trans_id = yield remote_target.sync_exchange(
+ docs, 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=lambda _: 1,
+ ensure_callback=lambda _: 1)
+ self.assertEqual(1, new_gen)
+ self.assertEqual(4, len(events))
+ self.assertEquals([(uuid, 0, 3), (uuid, 1, 3), (uuid, 2, 3),
+ (uuid, 3, 3)], events)
+
def test_sync_exchange_in_stream_error(self):
self.skipTest("bypass this test because our sync_exchange process "
"does not return u1db error 503 \"unavailable\" for "
@@ -421,7 +423,6 @@ class SoledadDatabaseSyncTargetTests(
def tearDown(self):
self.db.close()
- self.st.close()
tests.TestCaseWithServer.tearDown(self)
SoledadWithCouchServerMixin.tearDown(self)
@@ -442,12 +443,12 @@ class SoledadDatabaseSyncTargetTests(
This test was adapted to decrypt remote content before assert.
"""
docs_by_gen = [
- (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10,
- 'T-sid')]
+ ((self.make_document,
+ ('doc-id', 'replica:1', tests.simple_doc,), {}),
+ 10, 'T-sid')]
new_gen, trans_id = yield self.st.sync_exchange(
docs_by_gen, 'replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc,
- defer_decryption=False)
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
self.assertGetEncryptedDoc(
self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
self.assertTransactionLog(['doc-id'], self.db)
@@ -465,14 +466,13 @@ class SoledadDatabaseSyncTargetTests(
This test was adapted to decrypt remote content before assert.
"""
docs_by_gen = [
- (self.make_document(
- 'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'),
- (self.make_document(
- 'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')]
+ ((self.make_document,
+ ('doc-id', 'replica:1', tests.simple_doc), {}), 10, 'T-1'),
+ ((self.make_document,
+ ('doc-id2', 'replica:1', tests.nested_doc), {}), 11, 'T-2')]
new_gen, trans_id = yield self.st.sync_exchange(
docs_by_gen, 'replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc,
- defer_decryption=False)
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
self.assertGetEncryptedDoc(
self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
self.assertGetEncryptedDoc(
@@ -498,8 +498,7 @@ class SoledadDatabaseSyncTargetTests(
self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
new_gen, _ = yield self.st.sync_exchange(
[], 'other-replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc,
- defer_decryption=False)
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
self.assertEqual(2, new_gen)
self.assertEqual(
@@ -552,7 +551,8 @@ class SoledadDatabaseSyncTargetTests(
doc = self.db.create_doc_from_json('{}')
edit_rev = 'replica:1|' + doc.rev
docs_by_gen = [
- (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')]
+ ((self.make_document, (doc.doc_id, edit_rev, None), {}),
+ 10, 'T-sid')]
new_gen, trans_id = yield self.st.sync_exchange(
docs_by_gen, 'replica', last_known_generation=0,
last_known_trans_id=None, insert_doc_cb=self.receive_doc)
@@ -571,7 +571,7 @@ class SoledadDatabaseSyncTargetTests(
self.assertTransactionLog([doc.doc_id], self.db)
new_doc = '{"key": "altval"}'
docs_by_gen = [
- (self.make_document(doc.doc_id, 'replica:1', new_doc), 10,
+ ((self.make_document, (doc.doc_id, 'replica:1', new_doc), {}), 10,
'T-sid')]
new_gen, _ = yield self.st.sync_exchange(
docs_by_gen, 'replica', last_known_generation=0,
@@ -591,7 +591,7 @@ class SoledadDatabaseSyncTargetTests(
self.assertTransactionLog([doc.doc_id], self.db)
gen, txid = self.db._get_generation_info()
docs_by_gen = [
- (self.make_document(doc.doc_id, doc.rev, tests.simple_doc),
+ ((self.make_document, (doc.doc_id, doc.rev, tests.simple_doc), {}),
10, 'T-sid')]
new_gen, _ = yield self.st.sync_exchange(
docs_by_gen, 'replica', last_known_generation=gen,
@@ -624,9 +624,9 @@ class SoledadDatabaseSyncTargetTests(
[], 'other-replica', last_known_generation=0,
last_known_trans_id=None, insert_doc_cb=self.receive_doc)
self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
+ self.assertEqual(2, new_gen)
self.assertEqual(
(doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1])
- self.assertEqual(2, new_gen)
if self.whitebox:
self.assertEqual(self.db._last_exchange_log['return'],
{'last_gen': 2, 'docs': [(doc.doc_id, doc.rev)]})
@@ -637,7 +637,7 @@ class SoledadDatabaseSyncTargetTests(
self.assertTransactionLog([doc.doc_id], self.db)
new_doc = '{"key": "altval"}'
docs_by_gen = [
- (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
+ ((self.make_document, (doc.doc_id, 'test:1|z:2', new_doc), {}), 10,
'T-sid')]
new_gen, _ = yield self.st.sync_exchange(
docs_by_gen, 'other-replica', last_known_generation=0,
@@ -662,7 +662,7 @@ class SoledadDatabaseSyncTargetTests(
self.assertTransactionLog([doc.doc_id], self.db)
new_doc = '{"key": "altval"}'
docs_by_gen = [
- (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
+ ((self.make_document, (doc.doc_id, 'test:1|z:2', new_doc), {}), 10,
'T-sid')]
new_gen, _ = yield self.st.sync_exchange(
docs_by_gen, 'other-replica', last_known_generation=0,
@@ -683,7 +683,7 @@ class SoledadDatabaseSyncTargetTests(
self.assertTransactionLog([doc.doc_id], self.db)
new_doc = '{"key": "altval"}'
docs_by_gen = [
- (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
+ ((self.make_document, (doc.doc_id, 'test:1|z:2', new_doc), {}), 10,
'T-sid')]
new_gen, _ = yield self.st.sync_exchange(
docs_by_gen, 'other-replica', last_known_generation=0,
@@ -694,9 +694,9 @@ class SoledadDatabaseSyncTargetTests(
def test_sync_exchange_converged_handling(self):
doc = self.db.create_doc_from_json(tests.simple_doc)
docs_by_gen = [
- (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'),
- (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5,
- 'T-bar')]
+ ((self.make_document, ('new', 'other:1', '{}'), {}), 4, 'T-foo'),
+ ((self.make_document, (doc.doc_id, doc.rev, doc.get_json()), {}),
+ 5, 'T-bar')]
new_gen, _ = yield self.st.sync_exchange(
docs_by_gen, 'other-replica', last_known_generation=0,
last_known_trans_id=None, insert_doc_cb=self.receive_doc)
@@ -780,9 +780,6 @@ class SoledadDatabaseSyncTargetTests(
self.assertEqual(expected, called)
-# Just to make clear how this test is different... :)
-DEFER_DECRYPTION = False
-
WAIT_STEP = 1
MAX_WAIT = 10
DBPASS = "pass"
@@ -842,12 +839,10 @@ class TestSoledadDbSync(
import binascii
tohex = binascii.b2a_hex
key = tohex(self._soledad.secrets.get_local_storage_key())
- sync_db_key = tohex(self._soledad.secrets.get_sync_db_key())
dbpath = self._soledad._local_db_path
self.opts = SQLCipherOptions(
- dbpath, key, is_raw_key=True, create=False,
- defer_encryption=True, sync_db_key=sync_db_key)
+ dbpath, key, is_raw_key=True, create=False)
self.db1 = SQLCipherDatabase(self.opts)
self.db2 = self.request_state._create_database(replica_uid='test')
@@ -886,12 +881,10 @@ class TestSoledadDbSync(
self.opts,
crypto,
replica_uid,
- None,
- defer_encryption=True)
+ None)
self.dbsyncer = dbsyncer
return dbsyncer.sync(target_url,
- creds=creds,
- defer_decryption=DEFER_DECRYPTION)
+ creds=creds)
else:
return self._do_sync(self, target_name)
diff --git a/testing/tox.ini b/testing/tox.ini
index 31cb8a4f..c46c6af1 100644
--- a/testing/tox.ini
+++ b/testing/tox.ini
@@ -1,12 +1,15 @@
[tox]
envlist = py27
+skipsdist=True
[testenv]
basepython = python2.7
-commands = py.test --cov-report=html \
+commands = py.test --ignore=tests/benchmarks \
+ --cov-report=html \
--cov-report=term \
- --cov=leap.soledad \
- {posargs}
+ --cov=leap.soledad \
+ {posargs}
+usedevelop = True
deps =
coverage
pytest
@@ -18,6 +21,7 @@ deps =
pdbpp
couchdb
requests
+ service_identity
# install soledad local packages
-e../common
-e../client
@@ -27,11 +31,11 @@ setenv =
TERM=xterm
install_command = pip install {opts} {packages}
-[testenv:perf]
+[testenv:benchmark]
deps =
{[testenv]deps}
pytest-benchmark
-commands = py.test tests/perf {posargs}
+commands = py.test --benchmark-only {posargs}
[testenv:code-check]
changedir = ..
@@ -39,12 +43,12 @@ deps =
pep8
flake8
commands =
- pep8 client server common
- flake8 --ignore=F812,E731 client server common
+ pep8
+ flake8
[testenv:parallel]
deps =
{[testenv]deps}
pytest-xdist
install_command = pip install {opts} {packages}
-commands = py.test {posargs} -n 4
+commands = py.test --ignore=tests/benchmarks {posargs} -n 4