summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2015-09-24 15:23:34 -0400
committerKali Kaneko <kali@leap.se>2015-09-24 15:23:34 -0400
commit67d341b062640ace095fae835107ec677e9d7cae (patch)
tree0eaa17436ea5990dcae0737119050fa0db8f471a
parent363d960c3feddb93a0f660075d9b4b33f3713882 (diff)
parent4be6f05d91891122e83f74d21c83c5f8fcd3a618 (diff)
Merge tag '0.7.3' into debian/experimental
Tag leap.soledad version 0.7.3
-rw-r--r--CHANGELOG18
-rw-r--r--README.rst18
-rw-r--r--client/pkg/requirements-leap.pip4
-rw-r--r--client/src/leap/soledad/client/adbapi.py3
-rw-r--r--client/src/leap/soledad/client/api.py2
-rw-r--r--client/src/leap/soledad/client/encdecpool.py2
-rw-r--r--client/src/leap/soledad/client/events.py4
-rw-r--r--client/src/leap/soledad/client/http_target.py711
-rw-r--r--client/src/leap/soledad/client/http_target/__init__.py90
-rw-r--r--client/src/leap/soledad/client/http_target/api.py229
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py252
-rw-r--r--client/src/leap/soledad/client/http_target/send.py102
-rw-r--r--client/src/leap/soledad/client/http_target/support.py203
-rw-r--r--client/src/leap/soledad/client/secrets.py35
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py1
-rw-r--r--common/src/leap/soledad/common/couch.py354
-rw-r--r--common/src/leap/soledad/common/tests/test_couch.py208
-rw-r--r--common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py10
-rw-r--r--common/src/leap/soledad/common/tests/test_server.py26
-rw-r--r--common/src/leap/soledad/common/tests/test_soledad.py60
-rw-r--r--common/src/leap/soledad/common/tests/test_sqlcipher_sync.py420
-rw-r--r--common/src/leap/soledad/common/tests/test_sync.py28
-rw-r--r--common/src/leap/soledad/common/tests/test_sync_deferred.py24
-rw-r--r--common/src/leap/soledad/common/tests/test_sync_mutex.py8
-rw-r--r--common/src/leap/soledad/common/tests/test_sync_target.py56
-rw-r--r--common/src/leap/soledad/common/tests/util.py169
-rw-r--r--docs/sphinx/sync.rst32
-rw-r--r--server/src/leap/soledad/server/__init__.py5
28 files changed, 1322 insertions, 1752 deletions
diff --git a/CHANGELOG b/CHANGELOG
index a3a824cc..f35f7830 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,3 +1,19 @@
+0.7.3 Sep 22, 2015:
+Client:
+ o Bugfix: refactor code loss. Closes #7412.
+ o Bugfix: Set active secret before saving local file.
+ o Split http_target into 4 modules, separating those responsibilities.
+ o Refactor details of making an HTTP request body and headers out of the
+ send/fetch logic. This also makes it easier to enable batching.
+
+Server:
+ o Fix a bug where BadRequest could be raised after everything was persisted.
+
+Common:
+ o Refactor couch.py to separate persistence from logic while saving uploaded
+ documents. Also simplify logic while checking for conflicts.
+
+
0.7.2 Aug 26, 2015:
Client:
o Remove MAC from secrets file. Closes #6980.
@@ -6,7 +22,7 @@ Client:
o Improve how we send information on SOLEDAD_SYNC_SEND_STATUS and in
SOLEDAD_SYNC_RECEIVE_STATUS. Related to Feature #7353.
o Fix hanging sync by properly waiting db initialization on sync decrypter
- pool. Closes #7686.
+ pool. Closes #7386.
o Avoid double decryption of documents.
o Fix the order of the events emited for incoming documents.
o bugfix: move sync db and encpool creation to api.
diff --git a/README.rst b/README.rst
index 887b3df1..b98eec06 100644
--- a/README.rst
+++ b/README.rst
@@ -13,18 +13,24 @@ repository:
**leap.soledad.common** common pieces.
-.. image:: https://pypip.in/v/leap.soledad.common/badge.png
- :target: https://crate.io/packages/leap.soledad.common
+.. image:: https://badge.fury.io/py/leap.soledad.common.svg
+ :target: http://badge.fury.io/py/leap.soledad.common
+.. image:: https://img.shields.io/pypi/dm/leap.soledad.common.svg
+ :target: http://badge.fury.io/py/leap.soledad.common
**leap.soledad.client** where the soledad client lives.
-.. image:: https://pypip.in/v/leap.soledad.client/badge.png
- :target: https://crate.io/packages/leap.soledad.client
+.. image:: https://badge.fury.io/py/leap.soledad.client.svg
+ :target: http://badge.fury.io/py/leap.soledad.client
+.. image:: https://img.shields.io/pypi/dm/leap.soledad.client.svg
+ :target: http://badge.fury.io/py/leap.soledad.client
**leap.soledad.server** oh surprise! bits needed for the soledad server.
-.. image:: https://pypip.in/v/leap.soledad.server/badge.png
- :target: https://crate.io/packages/leap.soledad.server
+.. image:: https://badge.fury.io/py/leap.soledad.server.svg
+ :target: http://badge.fury.io/py/leap.soledad.server
+.. image:: https://img.shields.io/pypi/dm/leap.soledad.server.svg
+ :target: http://badge.fury.io/py/leap.soledad.server
Compatibility
diff --git a/client/pkg/requirements-leap.pip b/client/pkg/requirements-leap.pip
index c5fbcd5f..52d1263b 100644
--- a/client/pkg/requirements-leap.pip
+++ b/client/pkg/requirements-leap.pip
@@ -1,2 +1,2 @@
-leap.common>=0.4.1
-leap.soledad.common>=0.6.5
+leap.common>=0.4.3
+leap.soledad.common>=0.7.0
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py
index 237159bd..77822247 100644
--- a/client/src/leap/soledad/client/adbapi.py
+++ b/client/src/leap/soledad/client/adbapi.py
@@ -285,7 +285,8 @@ class U1DBConnectionPool(adbapi.ConnectionPool):
A final close, only called by the shutdown trigger.
"""
self.shutdownID = None
- self.threadpool.stop()
+ if self.threadpool.started:
+ self.threadpool.stop()
self.running = False
for conn in self.connections.values():
self._close(conn)
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py
index a6a98551..a558addd 100644
--- a/client/src/leap/soledad/client/api.py
+++ b/client/src/leap/soledad/client/api.py
@@ -718,7 +718,7 @@ class Soledad(object):
return failure
def _emit_done_data_sync(passthrough):
- soledad_events.emit(
+ soledad_events.emit_async(
soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid)
return passthrough
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 2ad98767..6d3c11b9 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -74,6 +74,8 @@ class SyncEncryptDecryptPool(object):
self._started = True
def stop(self):
+ if not self._started:
+ return
self._started = False
self._destroy_pool()
# maybe cancel the next delayed call
diff --git a/client/src/leap/soledad/client/events.py b/client/src/leap/soledad/client/events.py
index b1379521..058be59c 100644
--- a/client/src/leap/soledad/client/events.py
+++ b/client/src/leap/soledad/client/events.py
@@ -20,7 +20,7 @@
Signaling functions.
"""
-from leap.common.events import emit
+from leap.common.events import emit_async
from leap.common.events import catalog
@@ -40,7 +40,7 @@ SOLEDAD_SYNC_RECEIVE_STATUS = catalog.SOLEDAD_SYNC_RECEIVE_STATUS
__all__ = [
"catalog",
- "emit",
+ "emit_async",
"SOLEDAD_CREATING_KEYS",
"SOLEDAD_DONE_CREATING_KEYS",
"SOLEDAD_DOWNLOADING_KEYS",
diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py
deleted file mode 100644
index a6ef2b0d..00000000
--- a/client/src/leap/soledad/client/http_target.py
+++ /dev/null
@@ -1,711 +0,0 @@
-# -*- coding: utf-8 -*-
-# http_target.py
-# Copyright (C) 2015 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""
-A U1DB backend for encrypting data before sending to server and decrypting
-after receiving.
-"""
-
-
-import json
-import base64
-import logging
-import warnings
-
-from uuid import uuid4
-
-from twisted.internet import defer
-from twisted.web.error import Error
-from twisted.web.client import _ReadBodyProtocol
-from twisted.web.client import PartialDownloadError
-from twisted.web._newclient import ResponseDone
-from twisted.web._newclient import PotentialDataLoss
-
-from u1db import errors
-from u1db import SyncTarget
-from u1db.remote import utils
-from u1db.remote import http_errors
-
-from leap.common.http import HTTPClient
-
-from leap.soledad.common.document import SoledadDocument
-from leap.soledad.common.errors import InvalidAuthTokenError
-
-from leap.soledad.client.crypto import is_symmetrically_encrypted
-from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
-from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
-from leap.soledad.client.events import emit
-from leap.soledad.client.encdecpool import SyncDecrypterPool
-
-
-logger = logging.getLogger(__name__)
-
-
-# we want to make sure that HTTP errors will raise appropriate u1db errors,
-# that is, fire errbacks with the appropriate failures, in the context of
-# twisted. Because of that, we redefine the http body reader used by the HTTP
-# client below.
-
-class ReadBodyProtocol(_ReadBodyProtocol):
-
- def __init__(self, response, deferred):
- """
- Initialize the protocol, additionally storing the response headers.
- """
- _ReadBodyProtocol.__init__(
- self, response.code, response.phrase, deferred)
- self.headers = response.headers
-
- # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks
- def _error(self, respdic):
- descr = respdic.get("error")
- exc_cls = errors.wire_description_to_exc.get(descr)
- if exc_cls is not None:
- message = respdic.get("message")
- self.deferred.errback(exc_cls(message))
- # ---8<--- end of snippet from u1db.remote.http_client
-
- def connectionLost(self, reason):
- """
- Deliver the accumulated response bytes to the waiting L{Deferred}, if
- the response body has been completely received without error.
- """
- if reason.check(ResponseDone):
-
- body = b''.join(self.dataBuffer)
-
- # ---8<--- snippet from u1db.remote.http_client
- if self.status in (200, 201):
- self.deferred.callback(body)
- elif self.status in http_errors.ERROR_STATUSES:
- try:
- respdic = json.loads(body)
- except ValueError:
- self.deferred.errback(
- errors.HTTPError(self.status, body, self.headers))
- else:
- self._error(respdic)
- # special cases
- elif self.status == 503:
- self.deferred.errback(errors.Unavailable(body, self.headers))
- else:
- self.deferred.errback(
- errors.HTTPError(self.status, body, self.headers))
- # ---8<--- end of snippet from u1db.remote.http_client
-
- elif reason.check(PotentialDataLoss):
- self.deferred.errback(
- PartialDownloadError(self.status, self.message,
- b''.join(self.dataBuffer)))
- else:
- self.deferred.errback(reason)
-
-
-def readBody(response):
- """
- Get the body of an L{IResponse} and return it as a byte string.
-
- This is a helper function for clients that don't want to incrementally
- receive the body of an HTTP response.
-
- @param response: The HTTP response for which the body will be read.
- @type response: L{IResponse} provider
-
- @return: A L{Deferred} which will fire with the body of the response.
- Cancelling it will close the connection to the server immediately.
- """
- def cancel(deferred):
- """
- Cancel a L{readBody} call, close the connection to the HTTP server
- immediately, if it is still open.
-
- @param deferred: The cancelled L{defer.Deferred}.
- """
- abort = getAbort()
- if abort is not None:
- abort()
-
- d = defer.Deferred(cancel)
- protocol = ReadBodyProtocol(response, d)
-
- def getAbort():
- return getattr(protocol.transport, 'abortConnection', None)
-
- response.deliverBody(protocol)
-
- if protocol.transport is not None and getAbort() is None:
- warnings.warn(
- 'Using readBody with a transport that does not have an '
- 'abortConnection method',
- category=DeprecationWarning,
- stacklevel=2)
-
- return d
-
-
-class SoledadHTTPSyncTarget(SyncTarget):
-
- """
- A SyncTarget that encrypts data before sending and decrypts data after
- receiving.
-
- Normally encryption will have been written to the sync database upon
- document modification. The sync database is also used to write temporarily
- the parsed documents that the remote send us, before being decrypted and
- written to the main database.
- """
-
- def __init__(self, url, source_replica_uid, creds, crypto, cert_file,
- sync_db=None, sync_enc_pool=None):
- """
- Initialize the sync target.
-
- :param url: The server sync url.
- :type url: str
- :param source_replica_uid: The source replica uid which we use when
- deferring decryption.
- :type source_replica_uid: str
- :param creds: A dictionary containing the uuid and token.
- :type creds: creds
- :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
- :type crypto: soledad.crypto.SoledadCrypto
- :param cert_file: Path to the certificate of the ca used to validate
- the SSL certificate used by the remote soledad
- server.
- :type cert_file: str
- :param sync_db: Optional. handler for the db with the symmetric
- encryption of the syncing documents. If
- None, encryption will be done in-place,
- instead of retreiving it from the dedicated
- database.
- :type sync_db: Sqlite handler
- :param sync_enc_pool: The encryption pool to use to defer encryption.
- If None is passed the encryption will not be
- deferred.
- :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool
- """
- if url.endswith("/"):
- url = url[:-1]
- self._url = str(url) + "/sync-from/" + str(source_replica_uid)
- self.source_replica_uid = source_replica_uid
- self._auth_header = None
- self.set_creds(creds)
- self._crypto = crypto
- self._sync_db = sync_db
- self._sync_enc_pool = sync_enc_pool
- self._insert_doc_cb = None
- # asynchronous encryption/decryption attributes
- self._decryption_callback = None
- self._sync_decr_pool = None
- self._http = HTTPClient(cert_file)
-
- def close(self):
- self._http.close()
-
- def set_creds(self, creds):
- """
- Update credentials.
-
- :param creds: A dictionary containing the uuid and token.
- :type creds: dict
- """
- uuid = creds['token']['uuid']
- token = creds['token']['token']
- auth = '%s:%s' % (uuid, token)
- b64_token = base64.b64encode(auth)
- self._auth_header = {'Authorization': ['Token %s' % b64_token]}
-
- @property
- def _defer_encryption(self):
- return self._sync_enc_pool is not None
-
- #
- # SyncTarget API
- #
-
- @defer.inlineCallbacks
- def get_sync_info(self, source_replica_uid):
- """
- Return information about known state of remote database.
-
- Return the replica_uid and the current database generation of the
- remote database, and its last-seen database generation for the client
- replica.
-
- :param source_replica_uid: The client-size replica uid.
- :type source_replica_uid: str
-
- :return: A deferred which fires with (target_replica_uid,
- target_replica_generation, target_trans_id,
- source_replica_last_known_generation,
- source_replica_last_known_transaction_id)
- :rtype: twisted.internet.defer.Deferred
- """
- raw = yield self._http_request(self._url, headers=self._auth_header)
- res = json.loads(raw)
- defer.returnValue((
- res['target_replica_uid'],
- res['target_replica_generation'],
- res['target_replica_transaction_id'],
- res['source_replica_generation'],
- res['source_transaction_id']
- ))
-
- def record_sync_info(
- self, source_replica_uid, source_replica_generation,
- source_replica_transaction_id):
- """
- Record tip information for another replica.
-
- After sync_exchange has been processed, the caller will have
- received new content from this replica. This call allows the
- source replica instigating the sync to inform us what their
- generation became after applying the documents we returned.
-
- This is used to allow future sync operations to not need to repeat data
- that we just talked about. It also means that if this is called at the
- wrong time, there can be database records that will never be
- synchronized.
-
- :param source_replica_uid: The identifier for the source replica.
- :type source_replica_uid: str
- :param source_replica_generation: The database generation for the
- source replica.
- :type source_replica_generation: int
- :param source_replica_transaction_id: The transaction id associated
- with the source replica
- generation.
- :type source_replica_transaction_id: str
-
- :return: A deferred which fires with the result of the query.
- :rtype: twisted.internet.defer.Deferred
- """
- data = json.dumps({
- 'generation': source_replica_generation,
- 'transaction_id': source_replica_transaction_id
- })
- headers = self._auth_header.copy()
- headers.update({'content-type': ['application/json']})
- return self._http_request(
- self._url,
- method='PUT',
- headers=headers,
- body=data)
-
- @defer.inlineCallbacks
- def sync_exchange(self, docs_by_generation, source_replica_uid,
- last_known_generation, last_known_trans_id,
- insert_doc_cb, ensure_callback=None,
- defer_decryption=True, sync_id=None):
- """
- Find out which documents the remote database does not know about,
- encrypt and send them. After that, receive documents from the remote
- database.
-
- :param docs_by_generations: A list of (doc_id, generation, trans_id)
- of local documents that were changed since
- the last local generation the remote
- replica knows about.
- :type docs_by_generations: list of tuples
-
- :param source_replica_uid: The uid of the source replica.
- :type source_replica_uid: str
-
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
-
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
-
- :param insert_doc_cb: A callback for inserting received documents from
- target. If not overriden, this will call u1db
- insert_doc_from_target in synchronizer, which
- implements the TAKE OTHER semantics.
- :type insert_doc_cb: function
-
- :param ensure_callback: A callback that ensures we know the target
- replica uid if the target replica was just
- created.
- :type ensure_callback: function
-
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
- :type defer_decryption: bool
-
- :return: A deferred which fires with the new generation and
- transaction id of the target replica.
- :rtype: twisted.internet.defer.Deferred
- """
-
- self._ensure_callback = ensure_callback
-
- if sync_id is None:
- sync_id = str(uuid4())
- self.source_replica_uid = source_replica_uid
-
- # save a reference to the callback so we can use it after decrypting
- self._insert_doc_cb = insert_doc_cb
-
- gen_after_send, trans_id_after_send = yield self._send_docs(
- docs_by_generation,
- last_known_generation,
- last_known_trans_id,
- sync_id)
-
- cur_target_gen, cur_target_trans_id = yield self._receive_docs(
- last_known_generation, last_known_trans_id,
- ensure_callback, sync_id,
- defer_decryption=defer_decryption)
-
- # update gen and trans id info in case we just sent and did not
- # receive docs.
- if gen_after_send is not None and gen_after_send > cur_target_gen:
- cur_target_gen = gen_after_send
- cur_target_trans_id = trans_id_after_send
-
- defer.returnValue([cur_target_gen, cur_target_trans_id])
-
- #
- # methods to send docs
- #
-
- def _prepare(self, comma, entries, **dic):
- entry = comma + '\r\n' + json.dumps(dic)
- entries.append(entry)
- return len(entry)
-
- @defer.inlineCallbacks
- def _send_docs(self, docs_by_generation, last_known_generation,
- last_known_trans_id, sync_id):
-
- if not docs_by_generation:
- defer.returnValue([None, None])
-
- headers = self._auth_header.copy()
- headers.update({'content-type': ['application/x-soledad-sync-put']})
- # add remote replica metadata to the request
- first_entries = ['[']
- self._prepare(
- '', first_entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=self._ensure_callback is not None)
- idx = 0
- total = len(docs_by_generation)
- for doc, gen, trans_id in docs_by_generation:
- idx += 1
- result = yield self._send_one_doc(
- headers, first_entries, doc,
- gen, trans_id, total, idx)
- if self._defer_encryption:
- self._sync_enc_pool.delete_encrypted_doc(
- doc.doc_id, doc.rev)
-
- msg = "%d/%d" % (idx, total)
- content = {'sent': idx, 'total': total}
- emit(SOLEDAD_SYNC_SEND_STATUS, content)
- logger.debug("Sync send status: %s" % msg)
-
- response_dict = json.loads(result)[0]
- gen_after_send = response_dict['new_generation']
- trans_id_after_send = response_dict['new_transaction_id']
- defer.returnValue([gen_after_send, trans_id_after_send])
-
- @defer.inlineCallbacks
- def _send_one_doc(self, headers, first_entries, doc, gen, trans_id,
- number_of_docs, doc_idx):
- entries = first_entries[:]
- # add the document to the request
- content = yield self._encrypt_doc(doc)
- self._prepare(
- ',', entries,
- id=doc.doc_id, rev=doc.rev, content=content, gen=gen,
- trans_id=trans_id, number_of_docs=number_of_docs,
- doc_idx=doc_idx)
- entries.append('\r\n]')
- data = ''.join(entries)
- result = yield self._http_request(
- self._url,
- method='POST',
- headers=headers,
- body=data)
- defer.returnValue(result)
-
- def _encrypt_doc(self, doc):
- d = None
- if doc.is_tombstone():
- d = defer.succeed(None)
- elif not self._defer_encryption:
- # fallback case, for tests
- d = defer.succeed(self._crypto.encrypt_doc(doc))
- else:
-
- def _maybe_encrypt_doc_inline(doc_json):
- if doc_json is None:
- # the document is not marked as tombstone, but we got
- # nothing from the sync db. As it is not encrypted
- # yet, we force inline encryption.
- return self._crypto.encrypt_doc(doc)
- return doc_json
-
- d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev)
- d.addCallback(_maybe_encrypt_doc_inline)
- return d
-
- #
- # methods to receive doc
- #
-
- @defer.inlineCallbacks
- def _receive_docs(self, last_known_generation, last_known_trans_id,
- ensure_callback, sync_id, defer_decryption):
-
- self._queue_for_decrypt = defer_decryption \
- and self._sync_db is not None
-
- new_generation = last_known_generation
- new_transaction_id = last_known_trans_id
-
- if self._queue_for_decrypt:
- logger.debug(
- "Soledad sync: will queue received docs for decrypting.")
-
- if defer_decryption:
- self._setup_sync_decr_pool()
-
- headers = self._auth_header.copy()
- headers.update({'content-type': ['application/x-soledad-sync-get']})
-
- # ---------------------------------------------------------------------
- # maybe receive the first document
- # ---------------------------------------------------------------------
-
- # we fetch the first document before fetching the rest because we need
- # to know the total number of documents to be received, and this
- # information comes as metadata to each request.
-
- doc = yield self._receive_one_doc(
- headers, last_known_generation, last_known_trans_id,
- sync_id, 0)
- self._received_docs = 0
- number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1)
-
- # update the target gen and trans_id in case a document was received
- if ngen:
- new_generation = ngen
- new_transaction_id = ntrans
-
- if defer_decryption:
- self._sync_decr_pool.start(number_of_changes)
-
- # ---------------------------------------------------------------------
- # maybe receive the rest of the documents
- # ---------------------------------------------------------------------
-
- # launch many asynchronous fetches and inserts of received documents
- # in the temporary sync db. Will wait for all results before
- # continuing.
-
- received = 1
- deferreds = []
- while received < number_of_changes:
- d = self._receive_one_doc(
- headers, last_known_generation,
- last_known_trans_id, sync_id, received)
- d.addCallback(
- self._insert_received_doc,
- received + 1, # the index of the current received doc
- number_of_changes)
- deferreds.append(d)
- received += 1
- results = yield defer.gatherResults(deferreds)
-
- # get generation and transaction id of target after insertions
- if deferreds:
- _, new_generation, new_transaction_id = results.pop()
-
- # ---------------------------------------------------------------------
- # wait for async decryption to finish
- # ---------------------------------------------------------------------
-
- if defer_decryption:
- yield self._sync_decr_pool.deferred
- self._sync_decr_pool.stop()
-
- defer.returnValue([new_generation, new_transaction_id])
-
- def _receive_one_doc(self, headers, last_known_generation,
- last_known_trans_id, sync_id, received):
- entries = ['[']
- # add remote replica metadata to the request
- self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=self._ensure_callback is not None)
- # inform server of how many documents have already been received
- self._prepare(
- ',', entries, received=received)
- entries.append('\r\n]')
- # send headers
- return self._http_request(
- self._url,
- method='POST',
- headers=headers,
- body=''.join(entries))
-
- def _insert_received_doc(self, response, idx, total):
- """
- Insert a received document into the local replica.
-
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- """
- new_generation, new_transaction_id, number_of_changes, doc_id, \
- rev, content, gen, trans_id = \
- self._parse_received_doc_response(response)
- if doc_id is not None:
- # decrypt incoming document and insert into local database
- # -------------------------------------------------------------
- # symmetric decryption of document's contents
- # -------------------------------------------------------------
- # If arriving content was symmetrically encrypted, we decrypt it.
- # We do it inline if defer_decryption flag is False or no sync_db
- # was defined, otherwise we defer it writing it to the received
- # docs table.
- doc = SoledadDocument(doc_id, rev, content)
- if is_symmetrically_encrypted(doc):
- if self._queue_for_decrypt:
- self._sync_decr_pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id,
- idx)
- else:
- # defer_decryption is False or no-sync-db fallback
- doc.set_json(self._crypto.decrypt_doc(doc))
- self._insert_doc_cb(doc, gen, trans_id)
- else:
- # not symmetrically encrypted doc, insert it directly
- # or save it in the decrypted stage.
- if self._queue_for_decrypt:
- self._sync_decr_pool.insert_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id,
- idx)
- else:
- self._insert_doc_cb(doc, gen, trans_id)
- # -------------------------------------------------------------
- # end of symmetric decryption
- # -------------------------------------------------------------
- self._received_docs += 1
- msg = "%d/%d" % (self._received_docs, total)
- content = {'received': self._received_docs, 'total': total}
- emit(SOLEDAD_SYNC_RECEIVE_STATUS, content)
- logger.debug("Sync receive status: %s" % msg)
- return number_of_changes, new_generation, new_transaction_id
-
- def _parse_received_doc_response(self, response):
- """
- Parse the response from the server containing the received document.
-
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
-
- :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev,
- content, gen, trans_id)
- :rtype: tuple
- """
- # decode incoming stream
- parts = response.splitlines()
- if not parts or parts[0] != '[' or parts[-1] != ']':
- raise errors.BrokenSyncStream
- data = parts[1:-1]
- # decode metadata
- try:
- line, comma = utils.check_and_strip_comma(data[0])
- metadata = None
- except (IndexError):
- raise errors.BrokenSyncStream
- try:
- metadata = json.loads(line)
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
- number_of_changes = metadata['number_of_changes']
- except (ValueError, KeyError):
- raise errors.BrokenSyncStream
- # make sure we have replica_uid from fresh new dbs
- if self._ensure_callback and 'replica_uid' in metadata:
- self._ensure_callback(metadata['replica_uid'])
- # parse incoming document info
- doc_id = None
- rev = None
- content = None
- gen = None
- trans_id = None
- if number_of_changes > 0:
- try:
- entry = json.loads(data[1])
- doc_id = entry['id']
- rev = entry['rev']
- content = entry['content']
- gen = entry['gen']
- trans_id = entry['trans_id']
- except (IndexError, KeyError):
- raise errors.BrokenSyncStream
- return new_generation, new_transaction_id, number_of_changes, \
- doc_id, rev, content, gen, trans_id
-
- def _setup_sync_decr_pool(self):
- """
- Set up the SyncDecrypterPool for deferred decryption.
- """
- if self._sync_decr_pool is None and self._sync_db is not None:
- # initialize syncing queue decryption pool
- self._sync_decr_pool = SyncDecrypterPool(
- self._crypto,
- self._sync_db,
- insert_doc_cb=self._insert_doc_cb,
- source_replica_uid=self.source_replica_uid)
-
- def _http_request(self, url, method='GET', body=None, headers={}):
- d = self._http.request(url, method, body, headers, readBody)
- d.addErrback(_unauth_to_invalid_token_error)
- return d
-
-
-def _unauth_to_invalid_token_error(failure):
- """
- An errback to translate unauthorized errors to our own invalid token
- class.
-
- :param failure: The original failure.
- :type failure: twisted.python.failure.Failure
-
- :return: Either the original failure or an invalid auth token error.
- :rtype: twisted.python.failure.Failure
- """
- failure.trap(Error)
- if failure.getErrorMessage() == "401 Unauthorized":
- raise InvalidAuthTokenError
- return failure
diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py
new file mode 100644
index 00000000..7a5cea9f
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/__init__.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+# __init__.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+A U1DB backend for encrypting data before sending to server and decrypting
+after receiving.
+"""
+
+
+import logging
+
+from leap.common.http import HTTPClient
+from leap.soledad.client.http_target.send import HTTPDocSender
+from leap.soledad.client.http_target.api import SyncTargetAPI
+from leap.soledad.client.http_target.fetch import HTTPDocFetcher
+
+
+logger = logging.getLogger(__name__)
+
+
+class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):
+
+ """
+ A SyncTarget that encrypts data before sending and decrypts data after
+ receiving.
+
+ Normally encryption will have been written to the sync database upon
+ document modification. The sync database is also used to write temporarily
+ the parsed documents that the remote send us, before being decrypted and
+ written to the main database.
+ """
+ def __init__(self, url, source_replica_uid, creds, crypto, cert_file,
+ sync_db=None, sync_enc_pool=None):
+ """
+ Initialize the sync target.
+
+ :param url: The server sync url.
+ :type url: str
+ :param source_replica_uid: The source replica uid which we use when
+ deferring decryption.
+ :type source_replica_uid: str
+ :param creds: A dictionary containing the uuid and token.
+ :type creds: creds
+ :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
+ document contents when syncing.
+ :type crypto: soledad.crypto.SoledadCrypto
+ :param cert_file: Path to the certificate of the ca used to validate
+ the SSL certificate used by the remote soledad
+ server.
+ :type cert_file: str
+ :param sync_db: Optional. handler for the db with the symmetric
+ encryption of the syncing documents. If
+ None, encryption will be done in-place,
+ instead of retreiving it from the dedicated
+ database.
+ :type sync_db: Sqlite handler
+ :param sync_enc_pool: The encryption pool to use to defer encryption.
+ If None is passed the encryption will not be
+ deferred.
+ :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool
+ """
+ if url.endswith("/"):
+ url = url[:-1]
+ self._url = str(url) + "/sync-from/" + str(source_replica_uid)
+ self.source_replica_uid = source_replica_uid
+ self._auth_header = None
+ self.set_creds(creds)
+ self._crypto = crypto
+ self._sync_db = sync_db
+ self._sync_enc_pool = sync_enc_pool
+ self._insert_doc_cb = None
+ # asynchronous encryption/decryption attributes
+ self._decryption_callback = None
+ self._sync_decr_pool = None
+ self._http = HTTPClient(cert_file)
diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py
new file mode 100644
index 00000000..dcc762f6
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/api.py
@@ -0,0 +1,229 @@
+# -*- coding: utf-8 -*-
+# api.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+import base64
+
+from uuid import uuid4
+from u1db import SyncTarget
+
+from twisted.web.error import Error
+from twisted.internet import defer
+
+from leap.soledad.common.errors import InvalidAuthTokenError
+from leap.soledad.client.http_target.support import readBody
+
+
+class SyncTargetAPI(SyncTarget):
+ """
+ Declares public methods and implements u1db.SyncTarget.
+ """
+
+ @defer.inlineCallbacks
+ def close(self):
+ if self._sync_enc_pool:
+ self._sync_enc_pool.stop()
+ if self._sync_decr_pool:
+ self._sync_decr_pool.stop()
+ yield self._http.close()
+
+ def set_creds(self, creds):
+ """
+ Update credentials.
+
+ :param creds: A dictionary containing the uuid and token.
+ :type creds: dict
+ """
+ uuid = creds['token']['uuid']
+ token = creds['token']['token']
+ auth = '%s:%s' % (uuid, token)
+ b64_token = base64.b64encode(auth)
+ self._auth_header = {'Authorization': ['Token %s' % b64_token]}
+
+ @property
+ def _base_header(self):
+ return self._auth_header.copy() if self._auth_header else {}
+
+ @property
+ def _defer_encryption(self):
+ return self._sync_enc_pool is not None
+
+ def _http_request(self, url, method='GET', body=None, headers=None,
+ content_type=None):
+ headers = headers or self._base_header
+ if content_type:
+ headers.update({'content-type': [content_type]})
+ d = self._http.request(url, method, body, headers, readBody)
+ d.addErrback(_unauth_to_invalid_token_error)
+ return d
+
+ @defer.inlineCallbacks
+ def get_sync_info(self, source_replica_uid):
+ """
+ Return information about known state of remote database.
+
+ Return the replica_uid and the current database generation of the
+ remote database, and its last-seen database generation for the client
+ replica.
+
+ :param source_replica_uid: The client-size replica uid.
+ :type source_replica_uid: str
+
+ :return: A deferred which fires with (target_replica_uid,
+ target_replica_generation, target_trans_id,
+ source_replica_last_known_generation,
+ source_replica_last_known_transaction_id)
+ :rtype: twisted.internet.defer.Deferred
+ """
+ raw = yield self._http_request(self._url)
+ res = json.loads(raw)
+ defer.returnValue((
+ res['target_replica_uid'],
+ res['target_replica_generation'],
+ res['target_replica_transaction_id'],
+ res['source_replica_generation'],
+ res['source_transaction_id']
+ ))
+
+ def record_sync_info(
+ self, source_replica_uid, source_replica_generation,
+ source_replica_transaction_id):
+ """
+ Record tip information for another replica.
+
+ After sync_exchange has been processed, the caller will have
+ received new content from this replica. This call allows the
+ source replica instigating the sync to inform us what their
+ generation became after applying the documents we returned.
+
+ This is used to allow future sync operations to not need to repeat data
+ that we just talked about. It also means that if this is called at the
+ wrong time, there can be database records that will never be
+ synchronized.
+
+ :param source_replica_uid: The identifier for the source replica.
+ :type source_replica_uid: str
+ :param source_replica_generation: The database generation for the
+ source replica.
+ :type source_replica_generation: int
+ :param source_replica_transaction_id: The transaction id associated
+ with the source replica
+ generation.
+ :type source_replica_transaction_id: str
+
+ :return: A deferred which fires with the result of the query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ data = json.dumps({
+ 'generation': source_replica_generation,
+ 'transaction_id': source_replica_transaction_id
+ })
+ return self._http_request(
+ self._url,
+ method='PUT',
+ body=data,
+ content_type='application/json')
+
+ @defer.inlineCallbacks
+ def sync_exchange(self, docs_by_generation, source_replica_uid,
+ last_known_generation, last_known_trans_id,
+ insert_doc_cb, ensure_callback=None,
+ defer_decryption=True, sync_id=None):
+ """
+ Find out which documents the remote database does not know about,
+ encrypt and send them. After that, receive documents from the remote
+ database.
+
+ :param docs_by_generations: A list of (doc_id, generation, trans_id)
+ of local documents that were changed since
+ the last local generation the remote
+ replica knows about.
+ :type docs_by_generations: list of tuples
+
+ :param source_replica_uid: The uid of the source replica.
+ :type source_replica_uid: str
+
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+
+ :param insert_doc_cb: A callback for inserting received documents from
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
+ :type insert_doc_cb: function
+
+ :param ensure_callback: A callback that ensures we know the target
+ replica uid if the target replica was just
+ created.
+ :type ensure_callback: function
+
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+
+ :return: A deferred which fires with the new generation and
+ transaction id of the target replica.
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ self._ensure_callback = ensure_callback
+
+ if sync_id is None:
+ sync_id = str(uuid4())
+ self.source_replica_uid = source_replica_uid
+
+ # save a reference to the callback so we can use it after decrypting
+ self._insert_doc_cb = insert_doc_cb
+
+ gen_after_send, trans_id_after_send = yield self._send_docs(
+ docs_by_generation,
+ last_known_generation,
+ last_known_trans_id,
+ sync_id)
+
+ cur_target_gen, cur_target_trans_id = yield self._receive_docs(
+ last_known_generation, last_known_trans_id,
+ ensure_callback, sync_id,
+ defer_decryption=defer_decryption)
+
+ # update gen and trans id info in case we just sent and did not
+ # receive docs.
+ if gen_after_send is not None and gen_after_send > cur_target_gen:
+ cur_target_gen = gen_after_send
+ cur_target_trans_id = trans_id_after_send
+
+ defer.returnValue([cur_target_gen, cur_target_trans_id])
+
+
+def _unauth_to_invalid_token_error(failure):
+ """
+ An errback to translate unauthorized errors to our own invalid token
+ class.
+
+ :param failure: The original failure.
+ :type failure: twisted.python.failure.Failure
+
+ :return: Either the original failure or an invalid auth token error.
+ :rtype: twisted.python.failure.Failure
+ """
+ failure.trap(Error)
+ if failure.getErrorMessage() == "401 Unauthorized":
+ raise InvalidAuthTokenError
+ return failure
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
new file mode 100644
index 00000000..65e576d9
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -0,0 +1,252 @@
+# -*- coding: utf-8 -*-
+# fetch.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import logging
+import json
+from u1db import errors
+from u1db.remote import utils
+from twisted.internet import defer
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
+from leap.soledad.client.events import emit_async
+from leap.soledad.client.crypto import is_symmetrically_encrypted
+from leap.soledad.client.encdecpool import SyncDecrypterPool
+from leap.soledad.client.http_target.support import RequestBody
+
+logger = logging.getLogger(__name__)
+
+
+class HTTPDocFetcher(object):
+ """
+ Handles Document fetching from Soledad server, using HTTP as transport.
+ Steps:
+ * Prepares metadata by asking server for one document
+ * Fetch the total on response and prepare to ask all remaining
+ * (async) Documents will come encrypted.
+ So we parse, decrypt and insert locally as they arrive.
+ """
+
+ @defer.inlineCallbacks
+ def _receive_docs(self, last_known_generation, last_known_trans_id,
+ ensure_callback, sync_id, defer_decryption):
+
+ self._queue_for_decrypt = defer_decryption \
+ and self._sync_db is not None
+
+ new_generation = last_known_generation
+ new_transaction_id = last_known_trans_id
+
+ if self._queue_for_decrypt:
+ logger.debug(
+ "Soledad sync: will queue received docs for decrypting.")
+
+ if defer_decryption:
+ self._setup_sync_decr_pool()
+
+ # ---------------------------------------------------------------------
+ # maybe receive the first document
+ # ---------------------------------------------------------------------
+
+ # we fetch the first document before fetching the rest because we need
+ # to know the total number of documents to be received, and this
+ # information comes as metadata to each request.
+
+ doc = yield self._receive_one_doc(
+ last_known_generation, last_known_trans_id,
+ sync_id, 0)
+ self._received_docs = 0
+ number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1)
+
+ if ngen:
+ new_generation = ngen
+ new_transaction_id = ntrans
+
+ if defer_decryption:
+ self._sync_decr_pool.start(number_of_changes)
+
+ # ---------------------------------------------------------------------
+ # maybe receive the rest of the documents
+ # ---------------------------------------------------------------------
+
+ # launch many asynchronous fetches and inserts of received documents
+ # in the temporary sync db. Will wait for all results before
+ # continuing.
+
+ received = 1
+ deferreds = []
+ while received < number_of_changes:
+ d = self._receive_one_doc(
+ last_known_generation,
+ last_known_trans_id, sync_id, received)
+ d.addCallback(
+ self._insert_received_doc,
+ received + 1, # the index of the current received doc
+ number_of_changes)
+ deferreds.append(d)
+ received += 1
+ results = yield defer.gatherResults(deferreds)
+
+ # get generation and transaction id of target after insertions
+ if deferreds:
+ _, new_generation, new_transaction_id = results.pop()
+
+ # ---------------------------------------------------------------------
+ # wait for async decryption to finish
+ # ---------------------------------------------------------------------
+
+ if defer_decryption:
+ yield self._sync_decr_pool.deferred
+ self._sync_decr_pool.stop()
+
+ defer.returnValue([new_generation, new_transaction_id])
+
+ def _receive_one_doc(self, last_known_generation,
+ last_known_trans_id, sync_id, received):
+ # add remote replica metadata to the request
+ body = RequestBody(
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ # inform server of how many documents have already been received
+ body.insert_info(received=received)
+ # send headers
+ return self._http_request(
+ self._url,
+ method='POST',
+ body=str(body),
+ content_type='application/x-soledad-sync-get')
+
+ def _insert_received_doc(self, response, idx, total):
+ """
+ Insert a received document into the local replica.
+
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ """
+ new_generation, new_transaction_id, number_of_changes, doc_id, \
+ rev, content, gen, trans_id = \
+ self._parse_received_doc_response(response)
+ if doc_id is not None:
+ # decrypt incoming document and insert into local database
+ # -------------------------------------------------------------
+ # symmetric decryption of document's contents
+ # -------------------------------------------------------------
+ # If arriving content was symmetrically encrypted, we decrypt it.
+ # We do it inline if defer_decryption flag is False or no sync_db
+ # was defined, otherwise we defer it writing it to the received
+ # docs table.
+ doc = SoledadDocument(doc_id, rev, content)
+ if is_symmetrically_encrypted(doc):
+ if self._queue_for_decrypt:
+ self._sync_decr_pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id,
+ idx)
+ else:
+ # defer_decryption is False or no-sync-db fallback
+ doc.set_json(self._crypto.decrypt_doc(doc))
+ self._insert_doc_cb(doc, gen, trans_id)
+ else:
+ # not symmetrically encrypted doc, insert it directly
+ # or save it in the decrypted stage.
+ if self._queue_for_decrypt:
+ self._sync_decr_pool.insert_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id,
+ idx)
+ else:
+ self._insert_doc_cb(doc, gen, trans_id)
+ # -------------------------------------------------------------
+ # end of symmetric decryption
+ # -------------------------------------------------------------
+ self._received_docs += 1
+ _emit_receive_status(self._received_docs, total)
+ return number_of_changes, new_generation, new_transaction_id
+
+ def _parse_received_doc_response(self, response):
+ """
+ Parse the response from the server containing the received document.
+
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+
+ :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev,
+ content, gen, trans_id)
+ :rtype: tuple
+ """
+ # decode incoming stream
+ parts = response.splitlines()
+ if not parts or parts[0] != '[' or parts[-1] != ']':
+ raise errors.BrokenSyncStream
+ data = parts[1:-1]
+ # decode metadata
+ try:
+ line, comma = utils.check_and_strip_comma(data[0])
+ metadata = None
+ except (IndexError):
+ raise errors.BrokenSyncStream
+ try:
+ metadata = json.loads(line)
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
+ number_of_changes = metadata['number_of_changes']
+ except (ValueError, KeyError):
+ raise errors.BrokenSyncStream
+ # make sure we have replica_uid from fresh new dbs
+ if self._ensure_callback and 'replica_uid' in metadata:
+ self._ensure_callback(metadata['replica_uid'])
+ # parse incoming document info
+ doc_id = None
+ rev = None
+ content = None
+ gen = None
+ trans_id = None
+ if number_of_changes > 0:
+ try:
+ entry = json.loads(data[1])
+ doc_id = entry['id']
+ rev = entry['rev']
+ content = entry['content']
+ gen = entry['gen']
+ trans_id = entry['trans_id']
+ except (IndexError, KeyError):
+ raise errors.BrokenSyncStream
+ return new_generation, new_transaction_id, number_of_changes, \
+ doc_id, rev, content, gen, trans_id
+
+ def _setup_sync_decr_pool(self):
+ """
+ Set up the SyncDecrypterPool for deferred decryption.
+ """
+ if self._sync_decr_pool is None and self._sync_db is not None:
+ # initialize syncing queue decryption pool
+ self._sync_decr_pool = SyncDecrypterPool(
+ self._crypto,
+ self._sync_db,
+ insert_doc_cb=self._insert_doc_cb,
+ source_replica_uid=self.source_replica_uid)
+
+
+def _emit_receive_status(received_docs, total):
+ content = {'received': received_docs, 'total': total}
+ emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, content)
+
+ if received_docs % 20 == 0:
+ msg = "%d/%d" % (received_docs, total)
+ logger.debug("Sync receive status: %s" % msg)
diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py
new file mode 100644
index 00000000..80483f0d
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/send.py
@@ -0,0 +1,102 @@
+# -*- coding: utf-8 -*-
+# send.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+import logging
+from twisted.internet import defer
+from leap.soledad.client.events import emit_async
+from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
+from leap.soledad.client.http_target.support import RequestBody
+logger = logging.getLogger(__name__)
+
+
+class HTTPDocSender(object):
+ """
+ Handles Document uploading from Soledad server, using HTTP as transport.
+ They need to be encrypted and metadata prepared before sending.
+ """
+
+ @defer.inlineCallbacks
+ def _send_docs(self, docs_by_generation, last_known_generation,
+ last_known_trans_id, sync_id):
+
+ if not docs_by_generation:
+ defer.returnValue([None, None])
+
+ # add remote replica metadata to the request
+ body = RequestBody(
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ total = len(docs_by_generation)
+ for idx, entry in enumerate(docs_by_generation, 1):
+ yield self._prepare_one_doc(entry, body, idx, total)
+ result = yield self._http_request(
+ self._url,
+ method='POST',
+ body=body.pop(1),
+ content_type='application/x-soledad-sync-put')
+ if self._defer_encryption:
+ self._delete_sent(idx, docs_by_generation)
+ _emit_send_status(idx, total)
+ response_dict = json.loads(result)[0]
+ gen_after_send = response_dict['new_generation']
+ trans_id_after_send = response_dict['new_transaction_id']
+ defer.returnValue([gen_after_send, trans_id_after_send])
+
+ def _delete_sent(self, idx, docs_by_generation):
+ doc = docs_by_generation[idx - 1][0]
+ self._sync_enc_pool.delete_encrypted_doc(
+ doc.doc_id, doc.rev)
+
+ @defer.inlineCallbacks
+ def _prepare_one_doc(self, entry, body, idx, total):
+ doc, gen, trans_id = entry
+ content = yield self._encrypt_doc(doc)
+ body.insert_info(
+ id=doc.doc_id, rev=doc.rev, content=content, gen=gen,
+ trans_id=trans_id, number_of_docs=total,
+ doc_idx=idx)
+
+ def _encrypt_doc(self, doc):
+ d = None
+ if doc.is_tombstone():
+ d = defer.succeed(None)
+ elif not self._defer_encryption:
+ # fallback case, for tests
+ d = defer.succeed(self._crypto.encrypt_doc(doc))
+ else:
+
+ def _maybe_encrypt_doc_inline(doc_json):
+ if doc_json is None:
+ # the document is not marked as tombstone, but we got
+ # nothing from the sync db. As it is not encrypted
+ # yet, we force inline encryption.
+ return self._crypto.encrypt_doc(doc)
+ return doc_json
+
+ d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev)
+ d.addCallback(_maybe_encrypt_doc_inline)
+ return d
+
+
+def _emit_send_status(idx, total):
+ content = {'sent': idx, 'total': total}
+ emit_async(SOLEDAD_SYNC_SEND_STATUS, content)
+
+ msg = "%d/%d" % (idx, total)
+ logger.debug("Sync send status: %s" % msg)
diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py
new file mode 100644
index 00000000..44cd7089
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/support.py
@@ -0,0 +1,203 @@
+# -*- coding: utf-8 -*-
+# support.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import warnings
+import json
+from u1db import errors
+from u1db.remote import http_errors
+from twisted.internet import defer
+from twisted.web.client import _ReadBodyProtocol
+from twisted.web.client import PartialDownloadError
+from twisted.web._newclient import ResponseDone
+from twisted.web._newclient import PotentialDataLoss
+
+
+# we want to make sure that HTTP errors will raise appropriate u1db errors,
+# that is, fire errbacks with the appropriate failures, in the context of
+# twisted. Because of that, we redefine the http body reader used by the HTTP
+# client below.
+
+class ReadBodyProtocol(_ReadBodyProtocol):
+ """
+ From original Twisted implementation, focused on adding our error
+ handling and ensuring that the proper u1db error is raised.
+ """
+
+ def __init__(self, response, deferred):
+ """
+ Initialize the protocol, additionally storing the response headers.
+ """
+ _ReadBodyProtocol.__init__(
+ self, response.code, response.phrase, deferred)
+ self.headers = response.headers
+
+ # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks
+ def _error(self, respdic):
+ descr = respdic.get("error")
+ exc_cls = errors.wire_description_to_exc.get(descr)
+ if exc_cls is not None:
+ message = respdic.get("message")
+ self.deferred.errback(exc_cls(message))
+ # ---8<--- end of snippet from u1db.remote.http_client
+
+ def connectionLost(self, reason):
+ """
+ Deliver the accumulated response bytes to the waiting L{Deferred}, if
+ the response body has been completely received without error.
+ """
+ if reason.check(ResponseDone):
+
+ body = b''.join(self.dataBuffer)
+
+ # ---8<--- snippet from u1db.remote.http_client
+ if self.status in (200, 201):
+ self.deferred.callback(body)
+ elif self.status in http_errors.ERROR_STATUSES:
+ try:
+ respdic = json.loads(body)
+ except ValueError:
+ self.deferred.errback(
+ errors.HTTPError(self.status, body, self.headers))
+ else:
+ self._error(respdic)
+ # special cases
+ elif self.status == 503:
+ self.deferred.errback(errors.Unavailable(body, self.headers))
+ else:
+ self.deferred.errback(
+ errors.HTTPError(self.status, body, self.headers))
+ # ---8<--- end of snippet from u1db.remote.http_client
+
+ elif reason.check(PotentialDataLoss):
+ self.deferred.errback(
+ PartialDownloadError(self.status, self.message,
+ b''.join(self.dataBuffer)))
+ else:
+ self.deferred.errback(reason)
+
+
+def readBody(response):
+ """
+ Get the body of an L{IResponse} and return it as a byte string.
+
+ This is a helper function for clients that don't want to incrementally
+ receive the body of an HTTP response.
+
+ @param response: The HTTP response for which the body will be read.
+ @type response: L{IResponse} provider
+
+ @return: A L{Deferred} which will fire with the body of the response.
+ Cancelling it will close the connection to the server immediately.
+ """
+ def cancel(deferred):
+ """
+ Cancel a L{readBody} call, close the connection to the HTTP server
+ immediately, if it is still open.
+
+ @param deferred: The cancelled L{defer.Deferred}.
+ """
+ abort = getAbort()
+ if abort is not None:
+ abort()
+
+ d = defer.Deferred(cancel)
+ protocol = ReadBodyProtocol(response, d)
+
+ def getAbort():
+ return getattr(protocol.transport, 'abortConnection', None)
+
+ response.deliverBody(protocol)
+
+ if protocol.transport is not None and getAbort() is None:
+ warnings.warn(
+ 'Using readBody with a transport that does not have an '
+ 'abortConnection method',
+ category=DeprecationWarning,
+ stacklevel=2)
+
+ return d
+
+
+class RequestBody(object):
+ """
+ This class is a helper to generate send and fetch requests.
+ The expected format is something like:
+ [
+ {headers},
+ {entry1},
+ {...},
+ {entryN},
+ ]
+ """
+
+ def __init__(self, **header_dict):
+ """
+ Creates a new RequestBody holding header information.
+
+ :param header_dict: A dictionary with the headers.
+ :type header_dict: dict
+ """
+ self.headers = header_dict
+ self.entries = []
+
+ def insert_info(self, **entry_dict):
+ """
+ Dumps an entry into JSON format and add it to entries list.
+
+ :param entry_dict: Entry as a dictionary
+ :type entry_dict: dict
+
+ :return: length of the entry after JSON dumps
+ :rtype: int
+ """
+ entry = json.dumps(entry_dict)
+ self.entries.append(entry)
+ return len(entry)
+
+ def pop(self, number=1):
+ """
+ Removes an amount of entries and returns it formatted and ready
+ to be sent.
+
+ :param number: number of entries to pop and format
+ :type number: int
+
+ :return: formatted body ready to be sent
+ :rtype: str
+ """
+ entries = [self.entries.pop(0) for i in xrange(number)]
+ return self.entries_to_str(entries)
+
+ def __str__(self):
+ return self.entries_to_str(self.entries)
+
+ def __len__(self):
+ return len(self.entries)
+
+ def entries_to_str(self, entries=None):
+ """
+ Format a list of entries into the body format expected
+ by the server.
+
+ :param entries: entries to format
+ :type entries: list
+
+ :return: formatted body ready to be sent
+ :rtype: str
+ """
+ data = '[\r\n' + json.dumps(self.headers)
+ data += ''.join(',\r\n' + entry for entry in entries)
+ return data + '\r\n]'
diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py
index ee3aacdb..c3c3dff5 100644
--- a/client/src/leap/soledad/client/secrets.py
+++ b/client/src/leap/soledad/client/secrets.py
@@ -261,6 +261,16 @@ class SoledadSecrets(object):
logger.info("Could not find a secret in local storage.")
return False
+ def _maybe_set_active_secret(self, active_secret):
+ """
+ If no secret_id is already set, choose the passed active secret, or
+ just choose first secret available if none.
+ """
+ if not self._secret_id:
+ if not active_secret:
+ active_secret = self._secrets.items()[0][0]
+ self.set_secret_id(active_secret)
+
def _load_secrets(self):
"""
Load storage secrets from local file.
@@ -270,12 +280,7 @@ class SoledadSecrets(object):
with open(self._secrets_path, 'r') as f:
content = json.loads(f.read())
_, active_secret = self._import_recovery_document(content)
- # choose first secret if no secret_id was given
- if self._secret_id is None:
- if active_secret is None:
- self.set_secret_id(self._secrets.items()[0][0])
- else:
- self.set_secret_id(active_secret)
+ self._maybe_set_active_secret(active_secret)
# enlarge secret if needed
enlarged = False
if len(self._secrets[self._secret_id]) < self.GEN_SECRET_LENGTH:
@@ -306,12 +311,8 @@ class SoledadSecrets(object):
'Found cryptographic secrets in shared recovery '
'database.')
_, active_secret = self._import_recovery_document(doc.content)
+ self._maybe_set_active_secret(active_secret)
self._store_secrets() # save new secrets in local file
- if self._secret_id is None:
- if active_secret is None:
- self.set_secret_id(self._secrets.items()[0][0])
- else:
- self.set_secret_id(active_secret)
else:
# STAGE 3 - there are no secrets in server also, so
# generate a secret and store it in remote db.
@@ -432,13 +433,13 @@ class SoledadSecrets(object):
:return: a document with encrypted key material in its contents
:rtype: document.SoledadDocument
"""
- events.emit(events.SOLEDAD_DOWNLOADING_KEYS, self._uuid)
+ events.emit_async(events.SOLEDAD_DOWNLOADING_KEYS, self._uuid)
db = self._shared_db
if not db:
logger.warning('No shared db found')
return
doc = db.get_doc(self._shared_db_doc_id())
- events.emit(events.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid)
+ events.emit_async(events.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid)
return doc
def _put_secrets_in_shared_db(self):
@@ -461,13 +462,13 @@ class SoledadSecrets(object):
# fill doc with encrypted secrets
doc.content = self._export_recovery_document()
# upload secrets to server
- events.emit(events.SOLEDAD_UPLOADING_KEYS, self._uuid)
+ events.emit_async(events.SOLEDAD_UPLOADING_KEYS, self._uuid)
db = self._shared_db
if not db:
logger.warning('No shared db found')
return
db.put_doc(doc)
- events.emit(events.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid)
+ events.emit_async(events.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid)
#
# Management of secret for symmetric encryption.
@@ -587,13 +588,13 @@ class SoledadSecrets(object):
:return: The id of the generated secret.
:rtype: str
"""
- events.emit(events.SOLEDAD_CREATING_KEYS, self._uuid)
+ events.emit_async(events.SOLEDAD_CREATING_KEYS, self._uuid)
# generate random secret
secret = os.urandom(self.GEN_SECRET_LENGTH)
secret_id = sha256(secret).hexdigest()
self._secrets[secret_id] = secret
self._store_secrets()
- events.emit(events.SOLEDAD_DONE_CREATING_KEYS, self._uuid)
+ events.emit_async(events.SOLEDAD_DONE_CREATING_KEYS, self._uuid)
return secret_id
def _store_secrets(self):
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 2151884a..22ddc87d 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -559,6 +559,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
"""
Close the syncer and syncdb orderly
"""
+ super(SQLCipherU1DBSync, self).close()
# close all open syncers
for url in self._syncers.keys():
_, syncer = self._syncers[url]
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index 6c28e0be..1c762036 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -87,8 +87,7 @@ class CouchDocument(SoledadDocument):
atomic and consistent update of the database.
"""
- def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False,
- syncable=True):
+ def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False):
"""
Container for handling a document that is stored in couch backend.
@@ -100,27 +99,10 @@ class CouchDocument(SoledadDocument):
:type json: str
:param has_conflicts: Boolean indicating if this document has conflicts
:type has_conflicts: bool
- :param syncable: Should this document be synced with remote replicas?
- :type syncable: bool
"""
SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts)
- self._couch_rev = None
- self._conflicts = None
- self._transactions = None
-
- def _ensure_fetch_conflicts(self, get_conflicts_fun):
- """
- Ensure conflict data has been fetched from the server.
-
- :param get_conflicts_fun: A function which, given the document id and
- the couch revision, return the conflicted
- versions of the current document.
- :type get_conflicts_fun: function
- """
- if self._conflicts is None:
- self._conflicts = get_conflicts_fun(self.doc_id,
- couch_rev=self.couch_rev)
- self.has_conflicts = len(self._conflicts) > 0
+ self.couch_rev = None
+ self.transactions = None
def get_conflicts(self):
"""
@@ -149,7 +131,7 @@ class CouchDocument(SoledadDocument):
:type doc: CouchDocument
"""
if self._conflicts is None:
- raise Exception("Run self._ensure_fetch_conflicts first!")
+ raise Exception("Fetch conflicts first!")
self._conflicts.append(doc)
self.has_conflicts = len(self._conflicts) > 0
@@ -161,27 +143,48 @@ class CouchDocument(SoledadDocument):
:type conflict_revs: [str]
"""
if self._conflicts is None:
- raise Exception("Run self._ensure_fetch_conflicts first!")
+ raise Exception("Fetch conflicts first!")
self._conflicts = filter(
lambda doc: doc.rev not in conflict_revs,
self._conflicts)
self.has_conflicts = len(self._conflicts) > 0
- def _get_couch_rev(self):
- return self._couch_rev
-
- def _set_couch_rev(self, rev):
- self._couch_rev = rev
-
- couch_rev = property(_get_couch_rev, _set_couch_rev)
-
- def _get_transactions(self):
- return self._transactions
+ def update(self, new_doc):
+ # update info
+ self.rev = new_doc.rev
+ if new_doc.is_tombstone():
+ self.is_tombstone()
+ else:
+ self.content = new_doc.content
+ self.has_conflicts = new_doc.has_conflicts
- def _set_transactions(self, rev):
- self._transactions = rev
+ def prune_conflicts(self, doc_vcr, autoresolved_increment):
+ """
+ Prune conflicts that are older then the current document's revision, or
+ whose content match to the current document's content.
+ Originally in u1db.CommonBackend
- transactions = property(_get_transactions, _set_transactions)
+ :param doc: The document to have conflicts pruned.
+ :type doc: CouchDocument
+ :param doc_vcr: A vector clock representing the current document's
+ revision.
+ :type doc_vcr: u1db.vectorclock.VectorClock
+ """
+ if self.has_conflicts:
+ autoresolved = False
+ c_revs_to_prune = []
+ for c_doc in self._conflicts:
+ c_vcr = vectorclock.VectorClockRev(c_doc.rev)
+ if doc_vcr.is_newer(c_vcr):
+ c_revs_to_prune.append(c_doc.rev)
+ elif self.same_content_as(c_doc):
+ c_revs_to_prune.append(c_doc.rev)
+ doc_vcr.maximize(c_vcr)
+ autoresolved = True
+ if autoresolved:
+ doc_vcr.increment(autoresolved_increment)
+ self.rev = doc_vcr.as_str()
+ self.delete_conflicts(c_revs_to_prune)
# monkey-patch the u1db http app to use CouchDocument
@@ -482,13 +485,10 @@ class CouchDatabase(CommonBackend):
Ensure that the design documents used by the backend exist on the
couch database.
"""
- # we check for existence of one of the files, and put all of them if
- # that one does not exist
- try:
- self._database['_design/docs']
- return
- except ResourceNotFound:
- for ddoc_name in ['docs', 'syncs', 'transactions']:
+ for ddoc_name in ['docs', 'syncs', 'transactions']:
+ try:
+ self._database.info(ddoc_name)
+ except ResourceNotFound:
ddoc = json.loads(
binascii.a2b_base64(
getattr(ddocs, ddoc_name)))
@@ -750,7 +750,6 @@ class CouchDatabase(CommonBackend):
if check_for_conflicts \
and '_attachments' in result \
and 'u1db_conflicts' in result['_attachments']:
- doc.has_conflicts = True
doc.set_conflicts(
self._build_conflicts(
doc.doc_id,
@@ -1044,7 +1043,7 @@ class CouchDatabase(CommonBackend):
conflicts.append(doc)
return conflicts
- def _get_conflicts(self, doc_id, couch_rev=None):
+ def get_doc_conflicts(self, doc_id, couch_rev=None):
"""
Get the conflicted versions of a document.
@@ -1059,32 +1058,21 @@ class CouchDatabase(CommonBackend):
"""
# request conflicts attachment from server
params = {}
+ conflicts = []
if couch_rev is not None:
params['rev'] = couch_rev # restric document's couch revision
+ else:
+ # TODO: move into resource logic!
+ first_entry = self._get_doc(doc_id, check_for_conflicts=True)
+ conflicts.append(first_entry)
resource = self._database.resource(doc_id, 'u1db_conflicts')
try:
response = resource.get_json(**params)
- return self._build_conflicts(
+ return conflicts + self._build_conflicts(
doc_id, json.loads(response[2].read()))
except ResourceNotFound:
return []
- def get_doc_conflicts(self, doc_id):
- """
- Get the list of conflicts for the given document.
-
- The order of the conflicts is such that the first entry is the value
- that would be returned by "get_doc".
-
- :return: A list of the document entries that are conflicted.
- :rtype: [CouchDocument]
- """
- conflict_docs = self._get_conflicts(doc_id)
- if len(conflict_docs) == 0:
- return []
- this_doc = self._get_doc(doc_id, check_for_conflicts=True)
- return [this_doc] + conflict_docs
-
def _get_replica_gen_and_trans_id(self, other_replica_uid):
"""
Return the last known generation and transaction id for the other db
@@ -1140,9 +1128,11 @@ class CouchDatabase(CommonBackend):
:param sync_id: The id of the current sync session.
:type sync_id: str
"""
- self._do_set_replica_gen_and_trans_id(
- other_replica_uid, other_generation, other_transaction_id,
- number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id)
+ if other_replica_uid is not None and other_generation is not None:
+ self._do_set_replica_gen_and_trans_id(
+ other_replica_uid, other_generation, other_transaction_id,
+ number_of_docs=number_of_docs, doc_idx=doc_idx,
+ sync_id=sync_id)
def _do_set_replica_gen_and_trans_id(
self, other_replica_uid, other_generation, other_transaction_id,
@@ -1206,70 +1196,6 @@ class CouchDatabase(CommonBackend):
except ResourceNotFound as e:
raise_missing_design_doc_error(e, ddoc_path)
- def _add_conflict(self, doc, my_doc_rev, my_content):
- """
- Add a conflict to the document.
-
- Note that this method does not actually update the backend; rather, it
- updates the CouchDocument object which will provide the conflict data
- when the atomic document update is made.
-
- :param doc: The document to have conflicts added to.
- :type doc: CouchDocument
- :param my_doc_rev: The revision of the conflicted document.
- :type my_doc_rev: str
- :param my_content: The content of the conflicted document as a JSON
- serialized string.
- :type my_content: str
- """
- doc._ensure_fetch_conflicts(self._get_conflicts)
- doc.add_conflict(
- self._factory(doc_id=doc.doc_id, rev=my_doc_rev,
- json=my_content))
-
- def _delete_conflicts(self, doc, conflict_revs):
- """
- Delete the conflicted revisions from the list of conflicts of C{doc}.
-
- Note that this method does not actually update the backend; rather, it
- updates the CouchDocument object which will provide the conflict data
- when the atomic document update is made.
-
- :param doc: The document to have conflicts deleted.
- :type doc: CouchDocument
- :param conflict_revs: A list of the revisions to be deleted.
- :param conflict_revs: [str]
- """
- doc._ensure_fetch_conflicts(self._get_conflicts)
- doc.delete_conflicts(conflict_revs)
-
- def _prune_conflicts(self, doc, doc_vcr):
- """
- Prune conflicts that are older then the current document's revision, or
- whose content match to the current document's content.
-
- :param doc: The document to have conflicts pruned.
- :type doc: CouchDocument
- :param doc_vcr: A vector clock representing the current document's
- revision.
- :type doc_vcr: u1db.vectorclock.VectorClock
- """
- if doc.has_conflicts is True:
- autoresolved = False
- c_revs_to_prune = []
- for c_doc in doc.get_conflicts():
- c_vcr = vectorclock.VectorClockRev(c_doc.rev)
- if doc_vcr.is_newer(c_vcr):
- c_revs_to_prune.append(c_doc.rev)
- elif doc.same_content_as(c_doc):
- c_revs_to_prune.append(c_doc.rev)
- doc_vcr.maximize(c_vcr)
- autoresolved = True
- if autoresolved:
- doc_vcr.increment(self._replica_uid)
- doc.rev = doc_vcr.as_str()
- self._delete_conflicts(doc, c_revs_to_prune)
-
def _force_doc_sync_conflict(self, doc):
"""
Add a conflict and force a document put.
@@ -1278,9 +1204,9 @@ class CouchDatabase(CommonBackend):
:type doc: CouchDocument
"""
my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev))
- self._add_conflict(doc, my_doc.rev, my_doc.get_json())
- doc.has_conflicts = True
+ doc.prune_conflicts(
+ vectorclock.VectorClockRev(doc.rev), self._replica_uid)
+ doc.add_conflict(my_doc)
self._put_doc(my_doc, doc)
def resolve_doc(self, doc, conflicted_doc_revs):
@@ -1325,14 +1251,14 @@ class CouchDatabase(CommonBackend):
# the newer doc version will supersede the one in the database, so
# we copy conflicts before updating the backend.
doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over.
- self._delete_conflicts(doc, superseded_revs)
+ doc.delete_conflicts(superseded_revs)
self._put_doc(cur_doc, doc)
else:
# the newer doc version does not supersede the one in the
# database, so we will add a conflict to the database and copy
# those over to the document the user has in her hands.
- self._add_conflict(cur_doc, new_rev, doc.get_json())
- self._delete_conflicts(cur_doc, superseded_revs)
+ cur_doc.add_conflict(doc)
+ cur_doc.delete_conflicts(superseded_revs)
self._put_doc(cur_doc, cur_doc) # just update conflicts
# backend has been updated with current conflicts, now copy them
# to the current document.
@@ -1392,65 +1318,33 @@ class CouchDatabase(CommonBackend):
'converged', at_gen is the insertion/current generation.
:rtype: (str, int)
"""
- cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- # at this point, `doc` has arrived from the other syncing party, and
- # we will decide what to do with it.
- # First, we prepare the arriving doc to update couch database.
- old_doc = doc
- doc = self._factory(doc.doc_id, doc.rev, doc.get_json())
- if cur_doc is not None:
- doc.couch_rev = cur_doc.couch_rev
- # fetch conflicts because we will eventually manipulate them
- doc._ensure_fetch_conflicts(self._get_conflicts)
- # from now on, it works just like u1db sqlite backend
- doc_vcr = vectorclock.VectorClockRev(doc.rev)
- if cur_doc is None:
- cur_vcr = vectorclock.VectorClockRev(None)
- else:
- cur_vcr = vectorclock.VectorClockRev(cur_doc.rev)
- self._validate_source(replica_uid, replica_gen, replica_trans_id)
- if doc_vcr.is_newer(cur_vcr):
- rev = doc.rev
- self._prune_conflicts(doc, doc_vcr)
- if doc.rev != rev:
- # conflicts have been autoresolved
- state = 'superseded'
- else:
- state = 'inserted'
- self._put_doc(cur_doc, doc)
- elif doc.rev == cur_doc.rev:
- # magical convergence
- state = 'converged'
- elif cur_vcr.is_newer(doc_vcr):
- # Don't add this to seen_ids, because we have something newer,
- # so we should send it back, and we should not generate a
- # conflict
- state = 'superseded'
- elif cur_doc.same_content_as(doc):
- # the documents have been edited to the same thing at both ends
- doc_vcr.maximize(cur_vcr)
- doc_vcr.increment(self._replica_uid)
- doc.rev = doc_vcr.as_str()
- self._put_doc(cur_doc, doc)
- state = 'superseded'
- else:
- state = 'conflicted'
- if save_conflict:
- self._force_doc_sync_conflict(doc)
- if replica_uid is not None and replica_gen is not None:
- self._set_replica_gen_and_trans_id(
- replica_uid, replica_gen, replica_trans_id,
- number_of_docs=number_of_docs, doc_idx=doc_idx,
- sync_id=sync_id)
- # update info
- old_doc.rev = doc.rev
- if doc.is_tombstone():
- old_doc.is_tombstone()
- else:
- old_doc.content = doc.content
- old_doc.has_conflicts = doc.has_conflicts
+ if not isinstance(doc, CouchDocument):
+ doc = self._factory(doc.doc_id, doc.rev, doc.get_json())
+ self._save_source_info(replica_uid, replica_gen,
+ replica_trans_id, number_of_docs,
+ doc_idx, sync_id)
+ my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
+ if my_doc is not None:
+ my_doc.set_conflicts(
+ self.get_doc_conflicts(my_doc.doc_id, my_doc.couch_rev))
+ state, save_doc = _process_incoming_doc(
+ my_doc, doc, save_conflict, self.replica_uid)
+ if save_doc:
+ self._put_doc(my_doc, save_doc)
+ doc.update(save_doc)
return state, self._get_generation()
+ def _save_source_info(self, replica_uid, replica_gen, replica_trans_id,
+ number_of_docs, doc_idx, sync_id):
+ """
+ Validate and save source information.
+ """
+ self._validate_source(replica_uid, replica_gen, replica_trans_id)
+ self._set_replica_gen_and_trans_id(
+ replica_uid, replica_gen, replica_trans_id,
+ number_of_docs=number_of_docs, doc_idx=doc_idx,
+ sync_id=sync_id)
+
def get_docs(self, doc_ids, check_for_conflicts=True,
include_deleted=False):
"""
@@ -1495,6 +1389,13 @@ class CouchDatabase(CommonBackend):
continue
yield t._doc
+ def _prune_conflicts(self, doc, doc_vcr):
+ """
+ Overrides original method, but it is implemented elsewhere for
+ simplicity.
+ """
+ doc.prune_conflicts(doc_vcr, self._replica_uid)
+
def _new_resource(self, *path):
"""
Return a new resource for accessing a couch database.
@@ -1546,7 +1447,7 @@ class CouchServerState(ServerState):
:param couch_url: The URL for the couch database.
:type couch_url: str
"""
- self._couch_url = couch_url
+ self.couch_url = couch_url
def open_database(self, dbname):
"""
@@ -1559,7 +1460,7 @@ class CouchServerState(ServerState):
:rtype: CouchDatabase
"""
return CouchDatabase(
- self._couch_url,
+ self.couch_url,
dbname,
ensure_ddocs=False)
@@ -1594,21 +1495,52 @@ class CouchServerState(ServerState):
"""
raise Unauthorized()
- def _set_couch_url(self, url):
- """
- Set the couchdb URL
-
- :param url: CouchDB URL
- :type url: str
- """
- self._couch_url = url
-
- def _get_couch_url(self):
- """
- Return CouchDB URL
- :rtype: str
- """
- return self._couch_url
-
- couch_url = property(_get_couch_url, _set_couch_url, doc='CouchDB URL')
+def _process_incoming_doc(my_doc, other_doc, save_conflict, replica_uid):
+ """
+ Check document, save and return state.
+ """
+ # at this point, `doc` has arrived from the other syncing party, and
+ # we will decide what to do with it.
+ # First, we prepare the arriving doc to update couch database.
+ new_doc = CouchDocument(
+ other_doc.doc_id, other_doc.rev, other_doc.get_json())
+ if my_doc is None:
+ return 'inserted', new_doc
+ new_doc.couch_rev = my_doc.couch_rev
+ new_doc.set_conflicts(my_doc.get_conflicts())
+ # fetch conflicts because we will eventually manipulate them
+ # from now on, it works just like u1db sqlite backend
+ doc_vcr = vectorclock.VectorClockRev(new_doc.rev)
+ cur_vcr = vectorclock.VectorClockRev(my_doc.rev)
+ if doc_vcr.is_newer(cur_vcr):
+ rev = new_doc.rev
+ new_doc.prune_conflicts(doc_vcr, replica_uid)
+ if new_doc.rev != rev:
+ # conflicts have been autoresolved
+ return 'superseded', new_doc
+ else:
+ return'inserted', new_doc
+ elif new_doc.rev == my_doc.rev:
+ # magical convergence
+ return 'converged', None
+ elif cur_vcr.is_newer(doc_vcr):
+ # Don't add this to seen_ids, because we have something newer,
+ # so we should send it back, and we should not generate a
+ # conflict
+ other_doc.update(new_doc)
+ return 'superseded', None
+ elif my_doc.same_content_as(new_doc):
+ # the documents have been edited to the same thing at both ends
+ doc_vcr.maximize(cur_vcr)
+ doc_vcr.increment(replica_uid)
+ new_doc.rev = doc_vcr.as_str()
+ return 'superseded', new_doc
+ else:
+ if save_conflict:
+ new_doc.prune_conflicts(
+ vectorclock.VectorClockRev(new_doc.rev), replica_uid)
+ new_doc.add_conflict(my_doc)
+ return 'conflicted', new_doc
+ other_doc.update(new_doc)
+ return 'conflicted', None
diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py
index 468ad8d8..a08ffd16 100644
--- a/common/src/leap/soledad/common/tests/test_couch.py
+++ b/common/src/leap/soledad/common/tests/test_couch.py
@@ -25,6 +25,7 @@ import json
from urlparse import urljoin
from couchdb.client import Server
+from uuid import uuid4
from testscenarios import TestWithScenarios
@@ -42,7 +43,6 @@ from leap.soledad.common.tests.util import sync_via_synchronizer
from leap.soledad.common.tests.u1db_tests import test_backends
from leap.soledad.common.tests.u1db_tests import DatabaseBaseTests
-from leap.soledad.common.tests.u1db_tests import TestCaseWithServer
from u1db.backends.inmemory import InMemoryIndex
@@ -56,8 +56,8 @@ class TestCouchBackendImpl(CouchDBTestCase):
def test__allocate_doc_id(self):
db = couch.CouchDatabase.open_database(
urljoin(
- 'http://localhost:' + str(self.wrapper.port),
- 'u1db_tests'
+ 'http://localhost:' + str(self.couch_port),
+ ('test-%s' % uuid4().hex)
),
create=True,
ensure_ddocs=True)
@@ -66,6 +66,7 @@ class TestCouchBackendImpl(CouchDBTestCase):
self.assertEqual(34, len(doc_id1))
int(doc_id1[len('D-'):], 16)
self.assertNotEqual(doc_id1, db._allocate_doc_id())
+ self.delete_db(db._dbname)
# -----------------------------------------------------------------------------
@@ -73,25 +74,28 @@ class TestCouchBackendImpl(CouchDBTestCase):
# -----------------------------------------------------------------------------
def make_couch_database_for_test(test, replica_uid):
- port = str(test.wrapper.port)
- return couch.CouchDatabase.open_database(
- urljoin('http://localhost:' + port, replica_uid),
+ port = str(test.couch_port)
+ dbname = ('test-%s' % uuid4().hex)
+ db = couch.CouchDatabase.open_database(
+ urljoin('http://localhost:' + port, dbname),
create=True,
replica_uid=replica_uid or 'test',
ensure_ddocs=True)
+ test.addCleanup(test.delete_db, dbname)
+ return db
def copy_couch_database_for_test(test, db):
- port = str(test.wrapper.port)
+ port = str(test.couch_port)
couch_url = 'http://localhost:' + port
- new_dbname = db._replica_uid + '_copy'
+ new_dbname = db._dbname + '_copy'
new_db = couch.CouchDatabase.open_database(
urljoin(couch_url, new_dbname),
create=True,
replica_uid=db._replica_uid or 'test')
# copy all docs
session = couch.Session()
- old_couch_db = Server(couch_url, session=session)[db._replica_uid]
+ old_couch_db = Server(couch_url, session=session)[db._dbname]
new_couch_db = Server(couch_url, session=session)[new_dbname]
for doc_id in old_couch_db:
doc = old_couch_db.get(doc_id)
@@ -143,24 +147,6 @@ class CouchTests(
scenarios = COUCH_SCENARIOS
- def setUp(self):
- test_backends.AllDatabaseTests.setUp(self)
- # save db info because of test_close
- self._url = self.db._url
- self._dbname = self.db._dbname
-
- def tearDown(self):
- # if current test is `test_close` we have to use saved objects to
- # delete the database because the close() method will have removed the
- # references needed to do it using the CouchDatabase.
- if self.id().endswith('test_couch.CouchTests.test_close(couch)'):
- session = couch.Session()
- server = Server(url=self._url, session=session)
- del(server[self._dbname])
- else:
- self.db.delete_database()
- test_backends.AllDatabaseTests.tearDown(self)
-
class CouchDatabaseTests(
TestWithScenarios,
@@ -169,10 +155,6 @@ class CouchDatabaseTests(
scenarios = COUCH_SCENARIOS
- def tearDown(self):
- self.db.delete_database()
- test_backends.LocalDatabaseTests.tearDown(self)
-
class CouchValidateGenNTransIdTests(
TestWithScenarios,
@@ -181,10 +163,6 @@ class CouchValidateGenNTransIdTests(
scenarios = COUCH_SCENARIOS
- def tearDown(self):
- self.db.delete_database()
- test_backends.LocalDatabaseValidateGenNTransIdTests.tearDown(self)
-
class CouchValidateSourceGenTests(
TestWithScenarios,
@@ -193,10 +171,6 @@ class CouchValidateSourceGenTests(
scenarios = COUCH_SCENARIOS
- def tearDown(self):
- self.db.delete_database()
- test_backends.LocalDatabaseValidateSourceGenTests.tearDown(self)
-
class CouchWithConflictsTests(
TestWithScenarios,
@@ -205,10 +179,6 @@ class CouchWithConflictsTests(
scenarios = COUCH_SCENARIOS
- def tearDown(self):
- self.db.delete_database()
- test_backends.LocalDatabaseWithConflictsTests.tearDown(self)
-
# Notice: the CouchDB backend does not have indexing capabilities, so we do
# not test indexing now.
@@ -237,7 +207,6 @@ nested_doc = tests.nested_doc
class CouchDatabaseSyncTargetTests(
TestWithScenarios,
DatabaseBaseTests,
- TestCaseWithServer,
CouchDBTestCase):
# TODO: implement _set_trace_hook(_shallow) in CouchSyncTarget so
@@ -260,26 +229,13 @@ class CouchDatabaseSyncTargetTests(
def setUp(self):
CouchDBTestCase.setUp(self)
- # from DatabaseBaseTests.setUp
- self.db = self.create_database('test')
- # from TestCaseWithServer.setUp
- self.server = self.server_thread = self.port = None
# other stuff
self.db, self.st = self.create_db_and_target(self)
self.other_changes = []
def tearDown(self):
+ self.db.close()
CouchDBTestCase.tearDown(self)
- # from TestCaseWithServer.tearDown
- if self.server is not None:
- self.server.shutdown()
- self.server_thread.join()
- self.server.server_close()
- if self.port:
- self.port.stopListening()
- # from DatabaseBaseTests.tearDown
- if hasattr(self, 'db') and self.db is not None:
- self.db.close()
def receive_doc(self, doc, gen, trans_id):
self.other_changes.append(
@@ -724,17 +680,8 @@ class CouchDatabaseSyncTests(
self.db3, self.db1_copy, self.db2_copy
]:
if db is not None:
- db.delete_database()
+ self.delete_db(db._dbname)
db.close()
- for replica_uid, dbname in [
- ('test1_copy', 'source'),
- ('test2_copy', 'target'),
- ('test3', 'target')
- ]:
- db = self.create_database(replica_uid, dbname)
- db.delete_database()
- # cleanup connections to avoid leaking of file descriptors
- db.close()
DatabaseBaseTests.tearDown(self)
def assertLastExchangeLog(self, db, expected):
@@ -1203,7 +1150,7 @@ class CouchDatabaseSyncTests(
self.db1 = self.create_database('test1', 'both')
self.db2 = self.create_database('test2', 'both')
doc1 = self.db1.create_doc_from_json('{"a": 1}', doc_id='the-doc')
- db3 = self.create_database('test3', 'both')
+ self.db3 = self.create_database('test3', 'both')
self.sync(self.db2, self.db1)
self.assertEqual(
self.db1._get_generation_info(),
@@ -1211,20 +1158,20 @@ class CouchDatabaseSyncTests(
self.assertEqual(
self.db2._get_generation_info(),
self.db1._get_replica_gen_and_trans_id(self.db2._replica_uid))
- self.sync(db3, self.db1)
+ self.sync(self.db3, self.db1)
# update on 2
doc2 = self.make_document('the-doc', doc1.rev, '{"a": 2}')
self.db2.put_doc(doc2)
- self.sync(self.db2, db3)
- self.assertEqual(db3.get_doc('the-doc').rev, doc2.rev)
+ self.sync(self.db2, self.db3)
+ self.assertEqual(self.db3.get_doc('the-doc').rev, doc2.rev)
# update on 1
doc1.set_json('{"a": 3}')
self.db1.put_doc(doc1)
# conflicts
self.sync(self.db2, self.db1)
- self.sync(db3, self.db1)
+ self.sync(self.db3, self.db1)
self.assertTrue(self.db2.get_doc('the-doc').has_conflicts)
- self.assertTrue(db3.get_doc('the-doc').has_conflicts)
+ self.assertTrue(self.db3.get_doc('the-doc').has_conflicts)
# resolve
conflicts = self.db2.get_doc_conflicts('the-doc')
doc4 = self.make_document('the-doc', None, '{"a": 4}')
@@ -1233,38 +1180,38 @@ class CouchDatabaseSyncTests(
doc2 = self.db2.get_doc('the-doc')
self.assertEqual(doc4.get_json(), doc2.get_json())
self.assertFalse(doc2.has_conflicts)
- self.sync(self.db2, db3)
- doc3 = db3.get_doc('the-doc')
+ self.sync(self.db2, self.db3)
+ doc3 = self.db3.get_doc('the-doc')
self.assertEqual(doc4.get_json(), doc3.get_json())
self.assertFalse(doc3.has_conflicts)
def test_sync_supersedes_conflicts(self):
self.db1 = self.create_database('test1', 'both')
self.db2 = self.create_database('test2', 'target')
- db3 = self.create_database('test3', 'both')
+ self.db3 = self.create_database('test3', 'both')
doc1 = self.db1.create_doc_from_json('{"a": 1}', doc_id='the-doc')
self.db2.create_doc_from_json('{"b": 1}', doc_id='the-doc')
- db3.create_doc_from_json('{"c": 1}', doc_id='the-doc')
- self.sync(db3, self.db1)
+ self.db3.create_doc_from_json('{"c": 1}', doc_id='the-doc')
+ self.sync(self.db3, self.db1)
self.assertEqual(
self.db1._get_generation_info(),
- db3._get_replica_gen_and_trans_id(self.db1._replica_uid))
+ self.db3._get_replica_gen_and_trans_id(self.db1._replica_uid))
self.assertEqual(
- db3._get_generation_info(),
- self.db1._get_replica_gen_and_trans_id(db3._replica_uid))
- self.sync(db3, self.db2)
+ self.db3._get_generation_info(),
+ self.db1._get_replica_gen_and_trans_id(self.db3._replica_uid))
+ self.sync(self.db3, self.db2)
self.assertEqual(
self.db2._get_generation_info(),
- db3._get_replica_gen_and_trans_id(self.db2._replica_uid))
+ self.db3._get_replica_gen_and_trans_id(self.db2._replica_uid))
self.assertEqual(
- db3._get_generation_info(),
- self.db2._get_replica_gen_and_trans_id(db3._replica_uid))
- self.assertEqual(3, len(db3.get_doc_conflicts('the-doc')))
+ self.db3._get_generation_info(),
+ self.db2._get_replica_gen_and_trans_id(self.db3._replica_uid))
+ self.assertEqual(3, len(self.db3.get_doc_conflicts('the-doc')))
doc1.set_json('{"a": 2}')
self.db1.put_doc(doc1)
- self.sync(db3, self.db1)
+ self.sync(self.db3, self.db1)
# original doc1 should have been removed from conflicts
- self.assertEqual(3, len(db3.get_doc_conflicts('the-doc')))
+ self.assertEqual(3, len(self.db3.get_doc_conflicts('the-doc')))
def test_sync_stops_after_get_sync_info(self):
self.db1 = self.create_database('test1', 'source')
@@ -1283,79 +1230,78 @@ class CouchDatabaseSyncTests(
self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1')
self.assertRaises(
u1db_errors.InvalidReplicaUID, self.sync, self.db1, self.db2)
- # remove the reference to db2 to avoid double deleting on tearDown
- self.db2.close()
- self.db2 = None
def test_sync_detects_rollback_in_source(self):
self.db1 = self.create_database('test1', 'source')
self.db2 = self.create_database('test2', 'target')
self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1')
self.sync(self.db1, self.db2)
- db1_copy = self.copy_database(self.db1)
+ self.db1_copy = self.copy_database(self.db1)
self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc2')
self.sync(self.db1, self.db2)
self.assertRaises(
- u1db_errors.InvalidGeneration, self.sync, db1_copy, self.db2)
+ u1db_errors.InvalidGeneration, self.sync, self.db1_copy, self.db2)
def test_sync_detects_rollback_in_target(self):
self.db1 = self.create_database('test1', 'source')
self.db2 = self.create_database('test2', 'target')
self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent")
self.sync(self.db1, self.db2)
- db2_copy = self.copy_database(self.db2)
+ self.db2_copy = self.copy_database(self.db2)
self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc2')
self.sync(self.db1, self.db2)
self.assertRaises(
- u1db_errors.InvalidGeneration, self.sync, self.db1, db2_copy)
+ u1db_errors.InvalidGeneration, self.sync, self.db1, self.db2_copy)
def test_sync_detects_diverged_source(self):
self.db1 = self.create_database('test1', 'source')
self.db2 = self.create_database('test2', 'target')
- db3 = self.copy_database(self.db1)
+ self.db3 = self.copy_database(self.db1)
self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent")
- db3.create_doc_from_json(tests.simple_doc, doc_id="divergent")
+ self.db3.create_doc_from_json(tests.simple_doc, doc_id="divergent")
self.sync(self.db1, self.db2)
self.assertRaises(
- u1db_errors.InvalidTransactionId, self.sync, db3, self.db2)
+ u1db_errors.InvalidTransactionId, self.sync, self.db3, self.db2)
def test_sync_detects_diverged_target(self):
self.db1 = self.create_database('test1', 'source')
self.db2 = self.create_database('test2', 'target')
- db3 = self.copy_database(self.db2)
- db3.create_doc_from_json(tests.nested_doc, doc_id="divergent")
+ self.db3 = self.copy_database(self.db2)
+ self.db3.create_doc_from_json(tests.nested_doc, doc_id="divergent")
self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent")
self.sync(self.db1, self.db2)
self.assertRaises(
- u1db_errors.InvalidTransactionId, self.sync, self.db1, db3)
+ u1db_errors.InvalidTransactionId, self.sync, self.db1, self.db3)
def test_sync_detects_rollback_and_divergence_in_source(self):
self.db1 = self.create_database('test1', 'source')
self.db2 = self.create_database('test2', 'target')
self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1')
self.sync(self.db1, self.db2)
- db1_copy = self.copy_database(self.db1)
+ self.db1_copy = self.copy_database(self.db1)
self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc2')
self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc3')
self.sync(self.db1, self.db2)
- db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2')
- db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3')
+ self.db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2')
+ self.db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3')
self.assertRaises(
- u1db_errors.InvalidTransactionId, self.sync, db1_copy, self.db2)
+ u1db_errors.InvalidTransactionId, self.sync,
+ self.db1_copy, self.db2)
def test_sync_detects_rollback_and_divergence_in_target(self):
self.db1 = self.create_database('test1', 'source')
self.db2 = self.create_database('test2', 'target')
self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent")
self.sync(self.db1, self.db2)
- db2_copy = self.copy_database(self.db2)
+ self.db2_copy = self.copy_database(self.db2)
self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc2')
self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc3')
self.sync(self.db1, self.db2)
- db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2')
- db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3')
+ self.db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2')
+ self.db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3')
self.assertRaises(
- u1db_errors.InvalidTransactionId, self.sync, self.db1, db2_copy)
+ u1db_errors.InvalidTransactionId, self.sync,
+ self.db1, self.db2_copy)
def test_optional_sync_preserve_json(self):
self.db1 = self.create_database('test1', 'source')
@@ -1373,10 +1319,14 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
def setUp(self):
CouchDBTestCase.setUp(self)
+
+ def create_db(self, ensure=True, dbname=None):
+ if not dbname:
+ dbname = ('test-%s' % uuid4().hex)
self.db = couch.CouchDatabase.open_database(
- urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'),
+ urljoin('http://127.0.0.1:%d' % self.couch_port, dbname),
create=True,
- ensure_ddocs=False) # note that we don't enforce ddocs here
+ ensure_ddocs=ensure)
def tearDown(self):
self.db.delete_database()
@@ -1388,6 +1338,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
Test that all methods that access design documents will raise if the
design docs are not present.
"""
+ self.create_db(ensure=False)
# _get_generation()
self.assertRaises(
errors.MissingDesignDocError,
@@ -1418,10 +1369,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
Test that all methods that access design documents list functions
will raise if the functions are not present.
"""
- self.db = couch.CouchDatabase.open_database(
- urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'),
- create=True,
- ensure_ddocs=True)
+ self.create_db(ensure=True)
# erase views from _design/transactions
transactions = self.db._database['_design/transactions']
transactions['lists'] = {}
@@ -1448,10 +1396,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
Test that all methods that access design documents list functions
will raise if the functions are not present.
"""
- self.db = couch.CouchDatabase.open_database(
- urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'),
- create=True,
- ensure_ddocs=True)
+ self.create_db(ensure=True)
# erase views from _design/transactions
transactions = self.db._database['_design/transactions']
del transactions['lists']
@@ -1478,10 +1423,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
Test that all methods that access design documents' named views will
raise if the views are not present.
"""
- self.db = couch.CouchDatabase.open_database(
- urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'),
- create=True,
- ensure_ddocs=True)
+ self.create_db(ensure=True)
# erase views from _design/docs
docs = self.db._database['_design/docs']
del docs['views']
@@ -1520,10 +1462,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
Test that all methods that access design documents will raise if the
design docs are not present.
"""
- self.db = couch.CouchDatabase.open_database(
- urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'),
- create=True,
- ensure_ddocs=True)
+ self.create_db(ensure=True)
# delete _design/docs
del self.db._database['_design/docs']
# delete _design/syncs
@@ -1554,3 +1493,16 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
self.assertRaises(
errors.MissingDesignDocDeletedError,
self.db._do_set_replica_gen_and_trans_id, 1, 2, 3)
+
+ def test_ensure_ddoc_independently(self):
+ """
+ Test that a missing ddocs other than _design/docs will be ensured
+ even if _design/docs is there.
+ """
+ self.create_db(ensure=True)
+ del self.db._database['_design/transactions']
+ self.assertRaises(
+ errors.MissingDesignDocDeletedError,
+ self.db._get_transaction_log)
+ self.create_db(ensure=True, dbname=self.db._dbname)
+ self.db._get_transaction_log()
diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
index c488822e..25f709ca 100644
--- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
+++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
@@ -23,6 +23,7 @@ import threading
from urlparse import urljoin
from twisted.internet import defer
+from uuid import uuid4
from leap.soledad.client import Soledad
from leap.soledad.common.couch import CouchDatabase, CouchServerState
@@ -55,7 +56,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
sync_target = soledad_sync_target
- def _soledad_instance(self, user='user-uuid', passphrase=u'123',
+ def _soledad_instance(self, user=None, passphrase=u'123',
prefix='',
secrets_path='secrets.json',
local_db_path='soledad.u1db', server_url='',
@@ -63,6 +64,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
"""
Instantiate Soledad.
"""
+ user = user or self.user
# this callback ensures we save a document which is sent to the shared
# db.
@@ -83,15 +85,15 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
return soledad
def make_app(self):
- self.request_state = CouchServerState(self._couch_url)
+ self.request_state = CouchServerState(self.couch_url)
return self.make_app_after_state(self.request_state)
def setUp(self):
TestCaseWithServer.setUp(self)
CouchDBTestCase.setUp(self)
- self._couch_url = 'http://localhost:' + str(self.wrapper.port)
+ self.user = ('user-%s' % uuid4().hex)
self.db = CouchDatabase.open_database(
- urljoin(self._couch_url, 'user-user-uuid'),
+ urljoin(self.couch_url, 'user-' + self.user),
create=True,
replica_uid='replica',
ensure_ddocs=True)
diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py
index 5ffa2a63..f512d6c1 100644
--- a/common/src/leap/soledad/common/tests/test_server.py
+++ b/common/src/leap/soledad/common/tests/test_server.py
@@ -50,7 +50,7 @@ from leap.soledad.server.auth import URLToAuthorization
def _couch_ensure_database(self, dbname):
db = CouchDatabase.open_database(
- self._couch_url + '/' + dbname,
+ self.couch_url + '/' + dbname,
create=True,
ensure_ddocs=True)
return db, db._replica_uid
@@ -325,7 +325,7 @@ class EncryptedSyncTestCase(
shared_db=self.get_default_shared_mock(_put_doc_side_effect))
def make_app(self):
- self.request_state = CouchServerState(self._couch_url)
+ self.request_state = CouchServerState(self.couch_url)
return self.make_app_with_state(self.request_state)
def setUp(self):
@@ -333,7 +333,6 @@ class EncryptedSyncTestCase(
# dependencies.
# XXX explain better
CouchDBTestCase.setUp(self)
- self._couch_url = 'http://localhost:' + str(self.wrapper.port)
self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
TestCaseWithServer.setUp(self)
@@ -368,7 +367,7 @@ class EncryptedSyncTestCase(
# ensure remote db exists before syncing
db = CouchDatabase.open_database(
- urljoin(self._couch_url, 'user-' + user),
+ urljoin(self.couch_url, 'user-' + user),
create=True,
ensure_ddocs=True)
@@ -494,27 +493,18 @@ class LockResourceTestCase(
# dependencies.
# XXX explain better
CouchDBTestCase.setUp(self)
- self._couch_url = 'http://localhost:' + str(self.wrapper.port)
self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
TestCaseWithServer.setUp(self)
# create the databases
- CouchDatabase.open_database(
- urljoin(self._couch_url, 'shared'),
- create=True,
- ensure_ddocs=True)
- CouchDatabase.open_database(
- urljoin(self._couch_url, 'tokens'),
+ db = CouchDatabase.open_database(
+ urljoin(self.couch_url, ('shared-%s' % (uuid4().hex))),
create=True,
ensure_ddocs=True)
- self._state = CouchServerState(self._couch_url)
+ self.addCleanup(db.delete_database)
+ self._state = CouchServerState(self.couch_url)
+ self._state.open_database = mock.Mock(return_value=db)
def tearDown(self):
- # delete remote database
- db = CouchDatabase.open_database(
- urljoin(self._couch_url, 'shared'),
- create=True,
- ensure_ddocs=True)
- db.delete_database()
CouchDBTestCase.tearDown(self)
TestCaseWithServer.tearDown(self)
diff --git a/common/src/leap/soledad/common/tests/test_soledad.py b/common/src/leap/soledad/common/tests/test_soledad.py
index bd356858..85d6734e 100644
--- a/common/src/leap/soledad/common/tests/test_soledad.py
+++ b/common/src/leap/soledad/common/tests/test_soledad.py
@@ -223,7 +223,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):
def setUp(self):
# mock signaling
soledad.client.signal = Mock()
- soledad.client.secrets.events.emit = Mock()
+ soledad.client.secrets.events.emit_async = Mock()
# run parent's setUp
BaseSoledadTest.setUp(self)
@@ -245,57 +245,57 @@ class SoledadSignalingTestCase(BaseSoledadTest):
- downloading keys / done downloading keys.
- uploading keys / done uploading keys.
"""
- soledad.client.secrets.events.emit.reset_mock()
+ soledad.client.secrets.events.emit_async.reset_mock()
# get a fresh instance so it emits all bootstrap signals
sol = self._soledad_instance(
secrets_path='alternative_stage3.json',
local_db_path='alternative_stage3.u1db')
# reverse call order so we can verify in the order the signals were
# expected
- soledad.client.secrets.events.emit.mock_calls.reverse()
- soledad.client.secrets.events.emit.call_args = \
- soledad.client.secrets.events.emit.call_args_list[0]
- soledad.client.secrets.events.emit.call_args_list.reverse()
+ soledad.client.secrets.events.emit_async.mock_calls.reverse()
+ soledad.client.secrets.events.emit_async.call_args = \
+ soledad.client.secrets.events.emit_async.call_args_list[0]
+ soledad.client.secrets.events.emit_async.call_args_list.reverse()
# downloading keys signals
- soledad.client.secrets.events.emit.assert_called_with(
+ soledad.client.secrets.events.emit_async.assert_called_with(
catalog.SOLEDAD_DOWNLOADING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.client.secrets.events.emit)
- soledad.client.secrets.events.emit.assert_called_with(
+ self._pop_mock_call(soledad.client.secrets.events.emit_async)
+ soledad.client.secrets.events.emit_async.assert_called_with(
catalog.SOLEDAD_DONE_DOWNLOADING_KEYS,
ADDRESS,
)
# creating keys signals
- self._pop_mock_call(soledad.client.secrets.events.emit)
- soledad.client.secrets.events.emit.assert_called_with(
+ self._pop_mock_call(soledad.client.secrets.events.emit_async)
+ soledad.client.secrets.events.emit_async.assert_called_with(
catalog.SOLEDAD_CREATING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.client.secrets.events.emit)
- soledad.client.secrets.events.emit.assert_called_with(
+ self._pop_mock_call(soledad.client.secrets.events.emit_async)
+ soledad.client.secrets.events.emit_async.assert_called_with(
catalog.SOLEDAD_DONE_CREATING_KEYS,
ADDRESS,
)
# downloading once more (inside _put_keys_in_shared_db)
- self._pop_mock_call(soledad.client.secrets.events.emit)
- soledad.client.secrets.events.emit.assert_called_with(
+ self._pop_mock_call(soledad.client.secrets.events.emit_async)
+ soledad.client.secrets.events.emit_async.assert_called_with(
catalog.SOLEDAD_DOWNLOADING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.client.secrets.events.emit)
- soledad.client.secrets.events.emit.assert_called_with(
+ self._pop_mock_call(soledad.client.secrets.events.emit_async)
+ soledad.client.secrets.events.emit_async.assert_called_with(
catalog.SOLEDAD_DONE_DOWNLOADING_KEYS,
ADDRESS,
)
# uploading keys signals
- self._pop_mock_call(soledad.client.secrets.events.emit)
- soledad.client.secrets.events.emit.assert_called_with(
+ self._pop_mock_call(soledad.client.secrets.events.emit_async)
+ soledad.client.secrets.events.emit_async.assert_called_with(
catalog.SOLEDAD_UPLOADING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.client.secrets.events.emit)
- soledad.client.secrets.events.emit.assert_called_with(
+ self._pop_mock_call(soledad.client.secrets.events.emit_async)
+ soledad.client.secrets.events.emit_async.assert_called_with(
catalog.SOLEDAD_DONE_UPLOADING_KEYS,
ADDRESS,
)
@@ -316,7 +316,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):
doc.content = sol.secrets._export_recovery_document()
sol.close()
# reset mock
- soledad.client.secrets.events.emit.reset_mock()
+ soledad.client.secrets.events.emit_async.reset_mock()
# get a fresh instance so it emits all bootstrap signals
shared_db = self.get_default_shared_mock(get_doc_return_value=doc)
sol = self._soledad_instance(
@@ -325,17 +325,17 @@ class SoledadSignalingTestCase(BaseSoledadTest):
shared_db_class=shared_db)
# reverse call order so we can verify in the order the signals were
# expected
- soledad.client.secrets.events.emit.mock_calls.reverse()
- soledad.client.secrets.events.emit.call_args = \
- soledad.client.secrets.events.emit.call_args_list[0]
- soledad.client.secrets.events.emit.call_args_list.reverse()
+ soledad.client.secrets.events.emit_async.mock_calls.reverse()
+ soledad.client.secrets.events.emit_async.call_args = \
+ soledad.client.secrets.events.emit_async.call_args_list[0]
+ soledad.client.secrets.events.emit_async.call_args_list.reverse()
# assert download keys signals
- soledad.client.secrets.events.emit.assert_called_with(
+ soledad.client.secrets.events.emit_async.assert_called_with(
catalog.SOLEDAD_DOWNLOADING_KEYS,
ADDRESS,
)
- self._pop_mock_call(soledad.client.secrets.events.emit)
- soledad.client.secrets.events.emit.assert_called_with(
+ self._pop_mock_call(soledad.client.secrets.events.emit_async)
+ soledad.client.secrets.events.emit_async.assert_called_with(
catalog.SOLEDAD_DONE_DOWNLOADING_KEYS,
ADDRESS,
)
@@ -369,7 +369,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):
yield sol.sync()
# assert the signal has been emitted
- soledad.client.events.emit.assert_called_with(
+ soledad.client.events.emit_async.assert_called_with(
catalog.SOLEDAD_DONE_DATA_SYNC,
ADDRESS,
)
diff --git a/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py b/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py
index c57d6f61..439fc070 100644
--- a/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py
+++ b/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py
@@ -19,29 +19,26 @@ Test sqlcipher backend sync.
"""
-import json
+import os
from u1db import sync
from u1db import vectorclock
from u1db import errors
+from uuid import uuid4
from testscenarios import TestWithScenarios
-from urlparse import urljoin
-from twisted.internet import defer
-
-from leap.soledad.common import couch
from leap.soledad.common.crypto import ENC_SCHEME_KEY
from leap.soledad.client.http_target import SoledadHTTPSyncTarget
from leap.soledad.client.crypto import decrypt_doc_dict
-from leap.soledad.client.sqlcipher import SQLCipherDatabase
from leap.soledad.common.tests import u1db_tests as tests
from leap.soledad.common.tests.test_sqlcipher import SQLCIPHER_SCENARIOS
from leap.soledad.common.tests.util import make_soledad_app
+from leap.soledad.common.tests.test_sync_target import \
+ SoledadDatabaseSyncTargetTests
from leap.soledad.common.tests.util import soledad_sync_target
from leap.soledad.common.tests.util import BaseSoledadTest
-from leap.soledad.common.tests.util import SoledadWithCouchServerMixin
# -----------------------------------------------------------------------------
@@ -97,23 +94,6 @@ class SQLCipherDatabaseSyncTests(
self._use_tracking = {}
super(tests.DatabaseBaseTests, self).setUp()
- def tearDown(self):
- super(tests.DatabaseBaseTests, self).tearDown()
- if hasattr(self, 'db1') and isinstance(self.db1, SQLCipherDatabase):
- self.db1.close()
- if hasattr(self, 'db1_copy') \
- and isinstance(self.db1_copy, SQLCipherDatabase):
- self.db1_copy.close()
- if hasattr(self, 'db2') \
- and isinstance(self.db2, SQLCipherDatabase):
- self.db2.close()
- if hasattr(self, 'db2_copy') \
- and isinstance(self.db2_copy, SQLCipherDatabase):
- self.db2_copy.close()
- if hasattr(self, 'db3') \
- and isinstance(self.db3, SQLCipherDatabase):
- self.db3.close()
-
def create_database(self, replica_uid, sync_role=None):
if replica_uid == 'test' and sync_role is None:
# created up the chain by base class but unused
@@ -121,6 +101,7 @@ class SQLCipherDatabaseSyncTests(
db = self.create_database_for_role(replica_uid, sync_role)
if sync_role:
self._use_tracking[db] = (replica_uid, sync_role)
+ self.addCleanup(db.close)
return db
def create_database_for_role(self, replica_uid, sync_role):
@@ -729,38 +710,30 @@ class SQLCipherDatabaseSyncTests(
errors.InvalidTransactionId, self.sync, self.db1, self.db2_copy)
-def _make_local_db_and_token_http_target(test, path='test'):
+def make_local_db_and_soledad_target(
+ test, path='test',
+ source_replica_uid=uuid4().hex):
test.startTwistedServer()
- # ensure remote db exists before syncing
- db = couch.CouchDatabase.open_database(
- urljoin(test._couch_url, 'test'),
- create=True,
- replica_uid='test',
- ensure_ddocs=True)
-
- replica_uid = test._soledad._dbpool.replica_uid
+ replica_uid = os.path.basename(path)
+ db = test.request_state._create_database(replica_uid)
sync_db = test._soledad._sync_db
sync_enc_pool = test._soledad._sync_enc_pool
st = soledad_sync_target(
- test, path,
- source_replica_uid=replica_uid,
+ test, db._dbname,
+ source_replica_uid=source_replica_uid,
sync_db=sync_db,
sync_enc_pool=sync_enc_pool)
return db, st
target_scenarios = [
('leap', {
- 'create_db_and_target': _make_local_db_and_token_http_target,
+ 'create_db_and_target': make_local_db_and_soledad_target,
'make_app_with_state': make_soledad_app,
'do_sync': sync_via_synchronizer_and_soledad}),
]
-class SQLCipherSyncTargetTests(
- TestWithScenarios,
- tests.DatabaseBaseTests,
- tests.TestCaseWithServer,
- SoledadWithCouchServerMixin):
+class SQLCipherSyncTargetTests(SoledadDatabaseSyncTargetTests):
# TODO: implement _set_trace_hook(_shallow) in SoledadHTTPSyncTarget so
# skipped tests can be succesfully executed.
@@ -769,368 +742,3 @@ class SQLCipherSyncTargetTests(
target_scenarios))
whitebox = False
-
- def setUp(self):
- super(tests.DatabaseBaseTests, self).setUp()
- self.db, self.st = self.create_db_and_target(self)
- self.addCleanup(self.st.close)
- self.other_changes = []
-
- def tearDown(self):
- super(tests.DatabaseBaseTests, self).tearDown()
-
- def assertLastExchangeLog(self, db, expected):
- log = getattr(db, '_last_exchange_log', None)
- if log is None:
- return
- self.assertEqual(expected, log)
-
- def receive_doc(self, doc, gen, trans_id):
- self.other_changes.append(
- (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
-
- def make_app(self):
- self.request_state = couch.CouchServerState(self._couch_url)
- return self.make_app_with_state(self.request_state)
-
- def set_trace_hook(self, callback, shallow=False):
- setter = (self.st._set_trace_hook if not shallow else
- self.st._set_trace_hook_shallow)
- try:
- setter(callback)
- except NotImplementedError:
- self.skipTest("%s does not implement _set_trace_hook"
- % (self.st.__class__.__name__,))
-
- def test_get_sync_target(self):
- self.assertIsNot(None, self.st)
-
- @defer.inlineCallbacks
- def test_get_sync_info(self):
- sync_info = yield self.st.get_sync_info('other')
- self.assertEqual(
- ('test', 0, '', 0, ''), sync_info)
-
- @defer.inlineCallbacks
- def test_create_doc_updates_sync_info(self):
- sync_info = yield self.st.get_sync_info('other')
- self.assertEqual(
- ('test', 0, '', 0, ''), sync_info)
- self.db.create_doc_from_json(tests.simple_doc)
- sync_info = yield self.st.get_sync_info('other')
- self.assertEqual(1, sync_info[1])
-
- @defer.inlineCallbacks
- def test_record_sync_info(self):
- yield self.st.record_sync_info('replica', 10, 'T-transid')
- sync_info = yield self.st.get_sync_info('other')
- self.assertEqual(
- ('test', 0, '', 10, 'T-transid'), sync_info)
-
- @defer.inlineCallbacks
- def test_sync_exchange(self):
- """
- Modified to account for possibly receiving encrypted documents from
- sever-side.
- """
-
- docs_by_gen = [
- (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10,
- 'T-sid')]
- new_gen, trans_id = yield self.st.sync_exchange(
- docs_by_gen, 'replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc)
- self.assertGetEncryptedDoc(
- self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
- self.assertTransactionLog(['doc-id'], self.db)
- last_trans_id = self.getLastTransId(self.db)
- self.assertEqual(([], 1, last_trans_id),
- (self.other_changes, new_gen, last_trans_id))
- sync_info = yield self.st.get_sync_info('replica')
- self.assertEqual(10, sync_info[3])
-
- @defer.inlineCallbacks
- def test_sync_exchange_push_many(self):
- """
- Modified to account for possibly receiving encrypted documents from
- sever-side.
- """
- docs_by_gen = [
- (self.make_document(
- 'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'),
- (self.make_document('doc-id2', 'replica:1', tests.nested_doc), 11,
- 'T-2')]
- new_gen, trans_id = yield self.st.sync_exchange(
- docs_by_gen, 'replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc)
- self.assertGetEncryptedDoc(
- self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
- self.assertGetEncryptedDoc(
- self.db, 'doc-id2', 'replica:1', tests.nested_doc, False)
- self.assertTransactionLog(['doc-id', 'doc-id2'], self.db)
- last_trans_id = self.getLastTransId(self.db)
- self.assertEqual(([], 2, last_trans_id),
- (self.other_changes, new_gen, trans_id))
- sync_info = yield self.st.get_sync_info('replica')
- self.assertEqual(11, sync_info[3])
-
- @defer.inlineCallbacks
- def test_sync_exchange_returns_many_new_docs(self):
- """
- Modified to account for JSON serialization differences.
- """
- doc = self.db.create_doc_from_json(tests.simple_doc)
- doc2 = self.db.create_doc_from_json(tests.nested_doc)
- self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
- new_gen, _ = yield self.st.sync_exchange(
- [], 'other-replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc)
- self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
- self.assertEqual(2, new_gen)
- self.assertEqual(
- [(doc.doc_id, doc.rev, 1),
- (doc2.doc_id, doc2.rev, 2)],
- [c[:2] + c[3:4] for c in self.other_changes])
- self.assertEqual(
- json.dumps(tests.simple_doc),
- json.dumps(self.other_changes[0][2]))
- self.assertEqual(
- json.loads(tests.nested_doc),
- json.loads(self.other_changes[1][2]))
- if self.whitebox:
- self.assertEqual(
- self.db._last_exchange_log['return'],
- {'last_gen': 2, 'docs':
- [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]})
-
- @defer.inlineCallbacks
- def test_sync_exchange_deleted(self):
- doc = self.db.create_doc_from_json('{}')
- edit_rev = 'replica:1|' + doc.rev
- docs_by_gen = [
- (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')]
- new_gen, trans_id = yield self.st.sync_exchange(
- docs_by_gen, 'replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc)
- self.assertGetDocIncludeDeleted(
- self.db, doc.doc_id, edit_rev, None, False)
- self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
- last_trans_id = self.getLastTransId(self.db)
- self.assertEqual(([], 2, last_trans_id),
- (self.other_changes, new_gen, trans_id))
- sync_info = yield self.st.get_sync_info('replica')
- self.assertEqual(10, sync_info[3])
-
- @defer.inlineCallbacks
- def test_sync_exchange_refuses_conflicts(self):
- doc = self.db.create_doc_from_json(tests.simple_doc)
- self.assertTransactionLog([doc.doc_id], self.db)
- new_doc = '{"key": "altval"}'
- docs_by_gen = [
- (self.make_document(doc.doc_id, 'replica:1', new_doc), 10,
- 'T-sid')]
- new_gen, _ = yield self.st.sync_exchange(
- docs_by_gen, 'replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc)
- self.assertTransactionLog([doc.doc_id], self.db)
- self.assertEqual(
- (doc.doc_id, doc.rev, tests.simple_doc, 1),
- self.other_changes[0][:-1])
- self.assertEqual(1, new_gen)
- if self.whitebox:
- self.assertEqual(self.db._last_exchange_log['return'],
- {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
-
- @defer.inlineCallbacks
- def test_sync_exchange_ignores_convergence(self):
- doc = self.db.create_doc_from_json(tests.simple_doc)
- self.assertTransactionLog([doc.doc_id], self.db)
- gen, txid = self.db._get_generation_info()
- docs_by_gen = [
- (self.make_document(
- doc.doc_id, doc.rev, tests.simple_doc), 10, 'T-sid')]
- new_gen, _ = yield self.st.sync_exchange(
- docs_by_gen, 'replica', last_known_generation=gen,
- last_known_trans_id=txid, insert_doc_cb=self.receive_doc)
- self.assertTransactionLog([doc.doc_id], self.db)
- self.assertEqual(([], 1), (self.other_changes, new_gen))
-
- @defer.inlineCallbacks
- def test_sync_exchange_returns_new_docs(self):
- doc = self.db.create_doc_from_json(tests.simple_doc)
- self.assertTransactionLog([doc.doc_id], self.db)
- new_gen, _ = yield self.st.sync_exchange(
- [], 'other-replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc)
- self.assertTransactionLog([doc.doc_id], self.db)
- self.assertEqual(
- (doc.doc_id, doc.rev, tests.simple_doc, 1),
- self.other_changes[0][:-1])
- self.assertEqual(1, new_gen)
- if self.whitebox:
- self.assertEqual(self.db._last_exchange_log['return'],
- {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
-
- @defer.inlineCallbacks
- def test_sync_exchange_returns_deleted_docs(self):
- doc = self.db.create_doc_from_json(tests.simple_doc)
- self.db.delete_doc(doc)
- self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
- new_gen, _ = yield self.st.sync_exchange(
- [], 'other-replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc)
- self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
- self.assertEqual(
- (doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1])
- self.assertEqual(2, new_gen)
- if self.whitebox:
- self.assertEqual(self.db._last_exchange_log['return'],
- {'last_gen': 2, 'docs': [(doc.doc_id, doc.rev)]})
-
- @defer.inlineCallbacks
- def test_sync_exchange_getting_newer_docs(self):
- doc = self.db.create_doc_from_json(tests.simple_doc)
- self.assertTransactionLog([doc.doc_id], self.db)
- new_doc = '{"key": "altval"}'
- docs_by_gen = [
- (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
- 'T-sid')]
- new_gen, _ = yield self.st.sync_exchange(
- docs_by_gen, 'other-replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc)
- self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
- self.assertEqual(([], 2), (self.other_changes, new_gen))
-
- @defer.inlineCallbacks
- def test_sync_exchange_with_concurrent_updates_of_synced_doc(self):
- expected = []
-
- def before_whatschanged_cb(state):
- if state != 'before whats_changed':
- return
- cont = '{"key": "cuncurrent"}'
- conc_rev = self.db.put_doc(
- self.make_document(doc.doc_id, 'test:1|z:2', cont))
- expected.append((doc.doc_id, conc_rev, cont, 3))
-
- self.set_trace_hook(before_whatschanged_cb)
- doc = self.db.create_doc_from_json(tests.simple_doc)
- self.assertTransactionLog([doc.doc_id], self.db)
- new_doc = '{"key": "altval"}'
- docs_by_gen = [
- (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
- 'T-sid')]
- new_gen, _ = yield self.st.sync_exchange(
- docs_by_gen, 'other-replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc)
- self.assertEqual(expected, [c[:-1] for c in self.other_changes])
- self.assertEqual(3, new_gen)
-
- @defer.inlineCallbacks
- def test_sync_exchange_with_concurrent_updates(self):
-
- def after_whatschanged_cb(state):
- if state != 'after whats_changed':
- return
- self.db.create_doc_from_json('{"new": "doc"}')
-
- self.set_trace_hook(after_whatschanged_cb)
- doc = self.db.create_doc_from_json(tests.simple_doc)
- self.assertTransactionLog([doc.doc_id], self.db)
- new_doc = '{"key": "altval"}'
- docs_by_gen = [
- (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
- 'T-sid')]
- new_gen, _ = yield self.st.sync_exchange(
- docs_by_gen, 'other-replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc)
- self.assertEqual(([], 2), (self.other_changes, new_gen))
-
- @defer.inlineCallbacks
- def test_sync_exchange_converged_handling(self):
- doc = self.db.create_doc_from_json(tests.simple_doc)
- docs_by_gen = [
- (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'),
- (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5,
- 'T-bar')]
- new_gen, _ = yield self.st.sync_exchange(
- docs_by_gen, 'other-replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc)
- self.assertEqual(([], 2), (self.other_changes, new_gen))
-
- @defer.inlineCallbacks
- def test_sync_exchange_detect_incomplete_exchange(self):
- def before_get_docs_explode(state):
- if state != 'before get_docs':
- return
- raise errors.U1DBError("fail")
- self.set_trace_hook(before_get_docs_explode)
- # suppress traceback printing in the wsgiref server
- # self.patch(simple_server.ServerHandler,
- # 'log_exception', lambda h, exc_info: None)
- doc = self.db.create_doc_from_json(tests.simple_doc)
- self.assertTransactionLog([doc.doc_id], self.db)
- with self.assertRaises((errors.U1DBError, errors.BrokenSyncStream)):
- yield self.st.sync_exchange(
- [], 'other-replica',
- last_known_generation=0, last_known_trans_id=None,
- insert_doc_cb=self.receive_doc)
-
- @defer.inlineCallbacks
- def test_sync_exchange_doc_ids(self):
- sync_exchange_doc_ids = getattr(self.st, 'sync_exchange_doc_ids', None)
- if sync_exchange_doc_ids is None:
- self.skipTest("sync_exchange_doc_ids not implemented")
- db2 = self.create_database('test2')
- doc = db2.create_doc_from_json(tests.simple_doc)
- new_gen, trans_id = sync_exchange_doc_ids(
- db2, [(doc.doc_id, 10, 'T-sid')], 0, None,
- insert_doc_cb=self.receive_doc)
- self.assertGetDoc(self.db, doc.doc_id, doc.rev,
- tests.simple_doc, False)
- self.assertTransactionLog([doc.doc_id], self.db)
- last_trans_id = self.getLastTransId(self.db)
- self.assertEqual(([], 1, last_trans_id),
- (self.other_changes, new_gen, trans_id))
- self.assertEqual(10, self.st.get_sync_info(db2._replica_uid)[3])
-
- @defer.inlineCallbacks
- def test__set_trace_hook(self):
- called = []
-
- def cb(state):
- called.append(state)
-
- self.set_trace_hook(cb)
- yield self.st.sync_exchange([], 'replica', 0, None, self.receive_doc)
- yield self.st.record_sync_info('replica', 0, 'T-sid')
- self.assertEqual(['before whats_changed',
- 'after whats_changed',
- 'before get_docs',
- 'record_sync_info',
- ],
- called)
-
- @defer.inlineCallbacks
- def test__set_trace_hook_shallow(self):
- if (self.st._set_trace_hook_shallow == self.st._set_trace_hook or
- self.st._set_trace_hook_shallow.im_func ==
- SoledadHTTPSyncTarget._set_trace_hook_shallow.im_func):
- # shallow same as full
- expected = ['before whats_changed',
- 'after whats_changed',
- 'before get_docs',
- 'record_sync_info',
- ]
- else:
- expected = ['sync_exchange', 'record_sync_info']
-
- called = []
-
- def cb(state):
- called.append(state)
-
- self.set_trace_hook(cb, shallow=True)
- self.st.sync_exchange([], 'replica', 0, None, self.receive_doc)
- self.st.record_sync_info('replica', 0, 'T-sid')
- self.assertEqual(expected, called)
diff --git a/common/src/leap/soledad/common/tests/test_sync.py b/common/src/leap/soledad/common/tests/test_sync.py
index 14152370..1041367b 100644
--- a/common/src/leap/soledad/common/tests/test_sync.py
+++ b/common/src/leap/soledad/common/tests/test_sync.py
@@ -56,14 +56,13 @@ class InterruptableSyncTestCase(
sync_target = soledad_sync_target
def make_app(self):
- self.request_state = couch.CouchServerState(self._couch_url)
+ self.request_state = couch.CouchServerState(self.couch_url)
return self.make_app_with_state(self.request_state)
def setUp(self):
TestCaseWithServer.setUp(self)
CouchDBTestCase.setUp(self)
self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
- self._couch_url = 'http://localhost:' + str(self.wrapper.port)
def tearDown(self):
CouchDBTestCase.tearDown(self)
@@ -103,7 +102,7 @@ class InterruptableSyncTestCase(
# ensure remote db exists before syncing
db = couch.CouchDatabase.open_database(
- urljoin(self._couch_url, 'user-user-uuid'),
+ urljoin(self.couch_url, 'user-user-uuid'),
create=True,
ensure_ddocs=True)
@@ -148,8 +147,8 @@ class InterruptableSyncTestCase(
class TestSoledadDbSync(
TestWithScenarios,
- tests.TestCaseWithServer,
- SoledadWithCouchServerMixin):
+ SoledadWithCouchServerMixin,
+ tests.TestCaseWithServer):
"""
Test db.sync remote sync shortcut
@@ -166,10 +165,6 @@ class TestSoledadDbSync(
oauth = False
token = False
- def make_app(self):
- self.request_state = couch.CouchServerState(self._couch_url)
- return self.make_app_with_state(self.request_state)
-
def setUp(self):
"""
Need to explicitely invoke inicialization on all bases.
@@ -177,29 +172,22 @@ class TestSoledadDbSync(
SoledadWithCouchServerMixin.setUp(self)
self.startTwistedServer()
self.db = self.make_database_for_test(self, 'test1')
- self.db2 = couch.CouchDatabase.open_database(
- urljoin(
- 'http://localhost:' + str(self.wrapper.port),
- 'test'
- ),
- create=True,
- ensure_ddocs=True)
+ self.db2 = self.request_state._create_database(replica_uid='test')
def tearDown(self):
"""
Need to explicitely invoke destruction on all bases.
"""
- self.db2.delete_database()
SoledadWithCouchServerMixin.tearDown(self)
# tests.TestCaseWithServer.tearDown(self)
- def do_sync(self, target_name):
+ def do_sync(self):
"""
Perform sync using SoledadSynchronizer, SoledadSyncTarget
and Token auth.
"""
target = soledad_sync_target(
- self, target_name,
+ self, self.db2._dbname,
source_replica_uid=self._soledad._dbpool.replica_uid)
self.addCleanup(target.close)
return sync.SoledadSynchronizer(
@@ -217,7 +205,7 @@ class TestSoledadDbSync(
doc1 = self.db.create_doc_from_json(tests.simple_doc)
doc2 = self.db2.create_doc_from_json(tests.nested_doc)
- local_gen_before_sync = yield self.do_sync('test')
+ local_gen_before_sync = yield self.do_sync()
gen, _, changes = self.db.whats_changed(local_gen_before_sync)
self.assertEqual(1, len(changes))
self.assertEqual(doc2.doc_id, changes[0][0])
diff --git a/common/src/leap/soledad/common/tests/test_sync_deferred.py b/common/src/leap/soledad/common/tests/test_sync_deferred.py
index ffb8a4ae..90b00670 100644
--- a/common/src/leap/soledad/common/tests/test_sync_deferred.py
+++ b/common/src/leap/soledad/common/tests/test_sync_deferred.py
@@ -59,6 +59,7 @@ class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin):
def setUp(self):
SoledadWithCouchServerMixin.setUp(self)
+ self.startTwistedServer()
# config info
self.db1_file = os.path.join(self.tempdir, "db1.u1db")
os.unlink(self.db1_file)
@@ -85,13 +86,7 @@ class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin):
defer_encryption=True, sync_db_key=sync_db_key)
self.db1 = SQLCipherDatabase(self.opts)
- self.db2 = couch.CouchDatabase.open_database(
- urljoin(
- 'http://localhost:' + str(self.wrapper.port),
- 'test'
- ),
- create=True,
- ensure_ddocs=True)
+ self.db2 = self.request_state._create_database('test')
def tearDown(self):
# XXX should not access "private" attrs
@@ -109,8 +104,8 @@ class SyncTimeoutError(Exception):
class TestSoledadDbSyncDeferredEncDecr(
TestWithScenarios,
- tests.TestCaseWithServer,
- BaseSoledadDeferredEncTest):
+ BaseSoledadDeferredEncTest,
+ tests.TestCaseWithServer):
"""
Test db.sync remote sync shortcut.
@@ -128,17 +123,12 @@ class TestSoledadDbSyncDeferredEncDecr(
oauth = False
token = True
- def make_app(self):
- self.request_state = couch.CouchServerState(self._couch_url)
- return self.make_app_with_state(self.request_state)
-
def setUp(self):
"""
Need to explicitely invoke inicialization on all bases.
"""
BaseSoledadDeferredEncTest.setUp(self)
self.server = self.server_thread = None
- self.startTwistedServer()
self.syncer = None
def tearDown(self):
@@ -150,7 +140,7 @@ class TestSoledadDbSyncDeferredEncDecr(
dbsyncer.close()
BaseSoledadDeferredEncTest.tearDown(self)
- def do_sync(self, target_name):
+ def do_sync(self):
"""
Perform sync using SoledadSynchronizer, SoledadSyncTarget
and Token auth.
@@ -159,7 +149,7 @@ class TestSoledadDbSyncDeferredEncDecr(
sync_db = self._soledad._sync_db
sync_enc_pool = self._soledad._sync_enc_pool
target = soledad_sync_target(
- self, target_name,
+ self, self.db2._dbname,
source_replica_uid=replica_uid,
sync_db=sync_db,
sync_enc_pool=sync_enc_pool)
@@ -190,7 +180,7 @@ class TestSoledadDbSyncDeferredEncDecr(
"""
doc1 = self.db1.create_doc_from_json(tests.simple_doc)
doc2 = self.db2.create_doc_from_json(tests.nested_doc)
- local_gen_before_sync = yield self.do_sync('test')
+ local_gen_before_sync = yield self.do_sync()
gen, _, changes = self.db1.whats_changed(local_gen_before_sync)
self.assertEqual(1, len(changes))
diff --git a/common/src/leap/soledad/common/tests/test_sync_mutex.py b/common/src/leap/soledad/common/tests/test_sync_mutex.py
index a904a940..2e2123a7 100644
--- a/common/src/leap/soledad/common/tests/test_sync_mutex.py
+++ b/common/src/leap/soledad/common/tests/test_sync_mutex.py
@@ -84,14 +84,14 @@ class TestSyncMutex(
sync_target = soledad_sync_target
def make_app(self):
- self.request_state = couch.CouchServerState(self._couch_url)
+ self.request_state = couch.CouchServerState(self.couch_url)
return self.make_app_with_state(self.request_state)
def setUp(self):
TestCaseWithServer.setUp(self)
CouchDBTestCase.setUp(self)
self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
- self._couch_url = 'http://localhost:' + str(self.wrapper.port)
+ self.user = ('user-%s' % uuid.uuid4().hex)
def tearDown(self):
CouchDBTestCase.tearDown(self)
@@ -103,12 +103,12 @@ class TestSyncMutex(
# ensure remote db exists before syncing
db = couch.CouchDatabase.open_database(
- urljoin(self._couch_url, 'user-user-uuid'),
+ urljoin(self.couch_url, 'user-' + self.user),
create=True,
ensure_ddocs=True)
sol = self._soledad_instance(
- user='user-uuid', server_url=self.getURL())
+ user=self.user, server_url=self.getURL())
d1 = sol.sync()
d2 = sol.sync()
diff --git a/common/src/leap/soledad/common/tests/test_sync_target.py b/common/src/leap/soledad/common/tests/test_sync_target.py
index d855fb52..c0987e90 100644
--- a/common/src/leap/soledad/common/tests/test_sync_target.py
+++ b/common/src/leap/soledad/common/tests/test_sync_target.py
@@ -63,13 +63,12 @@ class TestSoledadParseReceivedDocResponse(SoledadWithCouchServerMixin):
def setUp(self):
SoledadWithCouchServerMixin.setUp(self)
- self._couch_url = 'http://localhost:' + str(self.wrapper.port)
creds = {'token': {
'uuid': 'user-uuid',
'token': 'auth-token',
}}
self.target = target.SoledadHTTPSyncTarget(
- self._couch_url,
+ self.couch_url,
uuid4().hex,
creds,
self._soledad._crypto,
@@ -151,11 +150,12 @@ def make_local_db_and_soledad_target(
test, path='test',
source_replica_uid=uuid4().hex):
test.startTwistedServer()
- db = test.request_state._create_database(os.path.basename(path))
+ replica_uid = os.path.basename(path)
+ db = test.request_state._create_database(replica_uid)
sync_db = test._soledad._sync_db
sync_enc_pool = test._soledad._sync_enc_pool
st = soledad_sync_target(
- test, path,
+ test, db._dbname,
source_replica_uid=source_replica_uid,
sync_db=sync_db,
sync_enc_pool=sync_enc_pool)
@@ -191,6 +191,8 @@ class TestSoledadSyncTarget(
self.startTwistedServer()
sync_db = self._soledad._sync_db
sync_enc_pool = self._soledad._sync_enc_pool
+ if path is None:
+ path = self.db2._dbname
target = self.sync_target(
self, path,
source_replica_uid=source_replica_uid,
@@ -204,11 +206,11 @@ class TestSoledadSyncTarget(
SoledadWithCouchServerMixin.setUp(self)
self.startTwistedServer()
self.db1 = make_sqlcipher_database_for_test(self, 'test1')
- self.db2 = self.request_state._create_database('test2')
+ self.db2 = self.request_state._create_database('test')
def tearDown(self):
# db2, _ = self.request_state.ensure_database('test2')
- self.db2.delete_database()
+ self.delete_db(self.db2._dbname)
self.db1.close()
SoledadWithCouchServerMixin.tearDown(self)
TestWithScenarios.tearDown(self)
@@ -220,8 +222,8 @@ class TestSoledadSyncTarget(
This test was adapted to decrypt remote content before assert.
"""
- db = self.request_state._create_database('test')
- remote_target = self.getSyncTarget('test')
+ db = self.db2
+ remote_target = self.getSyncTarget()
other_docs = []
def receive_doc(doc, gen, trans_id):
@@ -247,7 +249,7 @@ class TestSoledadSyncTarget(
def blackhole_getstderr(inst):
return cStringIO.StringIO()
- db = self.request_state._create_database('test')
+ db = self.db2
_put_doc_if_newer = db._put_doc_if_newer
trigger_ids = ['doc-here2']
@@ -267,7 +269,6 @@ class TestSoledadSyncTarget(
self.patch(
IndexedCouchDatabase, '_put_doc_if_newer', bomb_put_doc_if_newer)
remote_target = self.getSyncTarget(
- 'test',
source_replica_uid='replica')
other_changes = []
@@ -317,7 +318,7 @@ class TestSoledadSyncTarget(
This test was adapted to decrypt remote content before assert.
"""
- remote_target = self.getSyncTarget('test')
+ remote_target = self.getSyncTarget()
other_docs = []
replica_uid_box = []
@@ -333,7 +334,7 @@ class TestSoledadSyncTarget(
last_known_trans_id=None, insert_doc_cb=receive_doc,
ensure_callback=ensure_cb, defer_decryption=False)
self.assertEqual(1, new_gen)
- db = self.request_state.open_database('test')
+ db = self.db2
self.assertEqual(1, len(replica_uid_box))
self.assertEqual(db._replica_uid, replica_uid_box[0])
self.assertGetEncryptedDoc(
@@ -346,10 +347,9 @@ class TestSoledadSyncTarget(
@defer.inlineCallbacks
def test_get_sync_info(self):
- db = self.request_state._create_database('test')
+ db = self.db2
db._set_replica_gen_and_trans_id('other-id', 1, 'T-transid')
remote_target = self.getSyncTarget(
- 'test',
source_replica_uid='other-id')
sync_info = yield remote_target.get_sync_info('other-id')
self.assertEqual(
@@ -358,19 +358,17 @@ class TestSoledadSyncTarget(
@defer.inlineCallbacks
def test_record_sync_info(self):
- db = self.request_state._create_database('test')
remote_target = self.getSyncTarget(
- 'test',
source_replica_uid='other-id')
yield remote_target.record_sync_info('other-id', 2, 'T-transid')
- self.assertEqual(
- (2, 'T-transid'), db._get_replica_gen_and_trans_id('other-id'))
+ self.assertEqual((2, 'T-transid'),
+ self.db2._get_replica_gen_and_trans_id('other-id'))
@defer.inlineCallbacks
def test_sync_exchange_receive(self):
- db = self.request_state._create_database('test')
+ db = self.db2
doc = db.create_doc_from_json('{"value": "there"}')
- remote_target = self.getSyncTarget('test')
+ remote_target = self.getSyncTarget()
other_changes = []
def receive_doc(doc, gen, trans_id):
@@ -423,10 +421,10 @@ class SoledadDatabaseSyncTargetTests(
self.db, self.st = make_local_db_and_soledad_target(self)
def tearDown(self):
- tests.TestCaseWithServer.tearDown(self)
- SoledadWithCouchServerMixin.tearDown(self)
self.db.close()
self.st.close()
+ tests.TestCaseWithServer.tearDown(self)
+ SoledadWithCouchServerMixin.tearDown(self)
def set_trace_hook(self, callback, shallow=False):
setter = (self.st._set_trace_hook if not shallow else
@@ -818,10 +816,6 @@ class TestSoledadDbSync(
oauth = False
token = False
- def make_app(self):
- self.request_state = couch.CouchServerState(self._couch_url)
- return self.make_app_with_state(self.request_state)
-
def setUp(self):
"""
Need to explicitely invoke inicialization on all bases.
@@ -857,13 +851,7 @@ class TestSoledadDbSync(
defer_encryption=True, sync_db_key=sync_db_key)
self.db1 = SQLCipherDatabase(self.opts)
- self.db2 = couch.CouchDatabase.open_database(
- urljoin(
- 'http://localhost:' + str(self.wrapper.port),
- 'test'
- ),
- create=True,
- ensure_ddocs=True)
+ self.db2 = self.request_state._create_database(replica_uid='test')
def tearDown(self):
"""
@@ -890,7 +878,7 @@ class TestSoledadDbSync(
'uuid': 'user-uuid',
'token': 'auth-token',
}}
- target_url = self.getURL(target_name)
+ target_url = self.getURL(self.db2._dbname)
# get a u1db syncer
crypto = self._soledad._crypto
diff --git a/common/src/leap/soledad/common/tests/util.py b/common/src/leap/soledad/common/tests/util.py
index daa9c558..1c7adb91 100644
--- a/common/src/leap/soledad/common/tests/util.py
+++ b/common/src/leap/soledad/common/tests/util.py
@@ -27,10 +27,8 @@ import shutil
import random
import string
import u1db
-import subprocess
-import time
-import re
import traceback
+import couchdb
from uuid import uuid4
from mock import Mock
@@ -337,119 +335,6 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):
self.assertEqual(exp_doc.content, doc.content)
-# -----------------------------------------------------------------------------
-# A wrapper for running couchdb locally.
-# -----------------------------------------------------------------------------
-
-# from: https://github.com/smcq/paisley/blob/master/paisley/test/util.py
-# TODO: include license of above project.
-class CouchDBWrapper(object):
-
- """
- Wrapper for external CouchDB instance which is started and stopped for
- testing.
- """
- BOOT_TIMEOUT_SECONDS = 5
- RETRY_LIMIT = 3
-
- def start(self):
- tries = 0
- while tries < self.RETRY_LIMIT and not hasattr(self, 'port'):
- try:
- self._try_start()
- return
- except Exception, e:
- print traceback.format_exc()
- self.stop()
- tries += 1
- raise Exception(
- "Check your couchdb: Tried to start 3 times and failed badly")
-
- def _try_start(self):
- """
- Start a CouchDB instance for a test.
- """
- self.tempdir = tempfile.mkdtemp(suffix='.couch.test')
-
- path = os.path.join(os.path.dirname(__file__),
- 'couchdb.ini.template')
- handle = open(path)
- conf = handle.read() % {
- 'tempdir': self.tempdir,
- }
- handle.close()
-
- shutil.copy('/etc/couchdb/default.ini', self.tempdir)
- defaultConfPath = os.path.join(self.tempdir, 'default.ini')
-
- confPath = os.path.join(self.tempdir, 'test.ini')
- handle = open(confPath, 'w')
- handle.write(conf)
- handle.close()
-
- # create the dirs from the template
- mkdir_p(os.path.join(self.tempdir, 'lib'))
- mkdir_p(os.path.join(self.tempdir, 'log'))
- args = ['/usr/bin/couchdb', '-n',
- '-a', defaultConfPath, '-a', confPath]
- null = open('/dev/null', 'w')
-
- self.process = subprocess.Popen(
- args, env=None, stdout=null.fileno(), stderr=null.fileno(),
- close_fds=True)
- boot_time = time.time()
- # find port
- logPath = os.path.join(self.tempdir, 'log', 'couch.log')
- while not os.path.exists(logPath):
- if self.process.poll() is not None:
- got_stdout, got_stderr = "", ""
- if self.process.stdout is not None:
- got_stdout = self.process.stdout.read()
-
- if self.process.stderr is not None:
- got_stderr = self.process.stderr.read()
- raise Exception("""
-couchdb exited with code %d.
-stdout:
-%s
-stderr:
-%s""" % (
- self.process.returncode, got_stdout, got_stderr))
- time.sleep(0.01)
- if (time.time() - boot_time) > self.BOOT_TIMEOUT_SECONDS:
- self.stop()
- raise Exception("Timeout starting couch")
- while os.stat(logPath).st_size == 0:
- time.sleep(0.01)
- if (time.time() - boot_time) > self.BOOT_TIMEOUT_SECONDS:
- self.stop()
- raise Exception("Timeout starting couch")
- PORT_RE = re.compile(
- 'Apache CouchDB has started on http://127.0.0.1:(?P<port>\d+)')
-
- handle = open(logPath)
- line = handle.read()
- handle.close()
- m = PORT_RE.search(line)
- if not m:
- self.stop()
- raise Exception("Cannot find port in line %s" % line)
- self.port = int(m.group('port'))
-
- def stop(self):
- """
- Terminate the CouchDB instance.
- """
- try:
- self.process.terminate()
- self.process.communicate()
- except:
- # just to clean up
- # if it can't, the process wasn't created anyway
- pass
- shutil.rmtree(self.tempdir)
-
-
class CouchDBTestCase(unittest.TestCase, MockedSharedDBTest):
"""
@@ -460,15 +345,16 @@ class CouchDBTestCase(unittest.TestCase, MockedSharedDBTest):
"""
Make sure we have a CouchDB instance for a test.
"""
- self.wrapper = CouchDBWrapper()
- self.wrapper.start()
- # self.db = self.wrapper.db
+ self.couch_port = 5984
+ self.couch_url = 'http://localhost:%d' % self.couch_port
+ self.couch_server = couchdb.Server(self.couch_url)
- def tearDown(self):
- """
- Stop CouchDB instance for test.
- """
- self.wrapper.stop()
+ def delete_db(self, name):
+ try:
+ self.couch_server.delete(name)
+ except:
+ # ignore if already missing
+ pass
class CouchServerStateForTests(CouchServerState):
@@ -484,15 +370,25 @@ class CouchServerStateForTests(CouchServerState):
which is less pleasant than allowing the db to be automatically created.
"""
- def _create_database(self, dbname):
- return CouchDatabase.open_database(
- urljoin(self._couch_url, dbname),
+ def __init__(self, *args, **kwargs):
+ self.dbs = []
+ super(CouchServerStateForTests, self).__init__(*args, **kwargs)
+
+ def _create_database(self, replica_uid=None, dbname=None):
+ """
+ Create db and append to a list, allowing test to close it later
+ """
+ dbname = dbname or ('test-%s' % uuid4().hex)
+ db = CouchDatabase.open_database(
+ urljoin(self.couch_url, dbname),
True,
- replica_uid=dbname,
+ replica_uid=replica_uid or 'test',
ensure_ddocs=True)
+ self.dbs.append(db)
+ return db
def ensure_database(self, dbname):
- db = self._create_database(dbname)
+ db = self._create_database(dbname=dbname)
return db, db.replica_uid
@@ -506,23 +402,20 @@ class SoledadWithCouchServerMixin(
main_test_class = getattr(self, 'main_test_class', None)
if main_test_class is not None:
main_test_class.setUp(self)
- self._couch_url = 'http://localhost:%d' % self.wrapper.port
def tearDown(self):
main_test_class = getattr(self, 'main_test_class', None)
if main_test_class is not None:
main_test_class.tearDown(self)
# delete the test database
- try:
- db = CouchDatabase(self._couch_url, 'test')
- db.delete_database()
- except DatabaseDoesNotExist:
- pass
BaseSoledadTest.tearDown(self)
CouchDBTestCase.tearDown(self)
def make_app(self):
- couch_url = urljoin(
- 'http://localhost:' + str(self.wrapper.port), 'tests')
- self.request_state = CouchServerStateForTests(couch_url)
+ self.request_state = CouchServerStateForTests(self.couch_url)
+ self.addCleanup(self.delete_dbs)
return self.make_app_with_state(self.request_state)
+
+ def delete_dbs(self):
+ for db in self.request_state.dbs:
+ self.delete_db(db._dbname)
diff --git a/docs/sphinx/sync.rst b/docs/sphinx/sync.rst
new file mode 100644
index 00000000..f243befb
--- /dev/null
+++ b/docs/sphinx/sync.rst
@@ -0,0 +1,32 @@
+Soledad sync process
+====================
+
+Phases of sync:
+
+(1) client acquires knowledge about server state. http GET
+(2) client sends its documents to the server. http POSTs, or a single POST.
+(3) client downloads documents from the server.
+(4) client records its new state on the server.
+
+Originally in u1db:
+    (1) is a GET,
+    (2) and (3) are one POST (send in body, receive in response),
+    (4) is a PUT.
+
+In soledad:
+
+(1) is a GET.
+(2) is either 1 or a series of sequential POSTS.
+  (2.1) encrypt asynchronously
+  (2.2) store in temp sync db
+  (2.3) upload sequentially ***THIS IS SLOW***
+(3) is a series of concurrent POSTS, insert sequentially on local client db.
+  (3.1) download concurrently
+  (3.2) store in temp sync db
+  (3.3) decrypt asynchronously
+  (3.4) insert sequentially in local client db
+(4) is a PUT.
+
+This difference between u1db and soledad was made in order to be able to gracefully interrupt the sync in the middle of the upload or the download.
+
+it is essential that all the uploads and downloads are sequential: documents must be added in order. the download happens in parallel, but then locally they are added sequentially to the local db.
diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py
index 7a03f6fb..1b795016 100644
--- a/server/src/leap/soledad/server/__init__.py
+++ b/server/src/leap/soledad/server/__init__.py
@@ -238,6 +238,7 @@ class HTTPInvocationByMethodWithBody(
if content_type == 'application/x-soledad-sync-put':
meth_put = self._lookup('%s_put' % method)
meth_end = self._lookup('%s_end' % method)
+ entries = []
while True:
line = body_getline()
entry = line.strip()
@@ -246,9 +247,11 @@ class HTTPInvocationByMethodWithBody(
if not entry or not comma: # empty or no prec comma
raise http_app.BadRequest
entry, comma = utils.check_and_strip_comma(entry)
- meth_put({}, entry)
+ entries.append(entry)
if comma or body_getline(): # extra comma or data
raise http_app.BadRequest
+ for entry in entries:
+ meth_put({}, entry)
return meth_end()
# handle outgoing documents
elif content_type == 'application/x-soledad-sync-get':