diff options
author | Kali Kaneko <kali@leap.se> | 2015-09-24 15:23:34 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2015-09-24 15:23:34 -0400 |
commit | 67d341b062640ace095fae835107ec677e9d7cae (patch) | |
tree | 0eaa17436ea5990dcae0737119050fa0db8f471a | |
parent | 363d960c3feddb93a0f660075d9b4b33f3713882 (diff) | |
parent | 4be6f05d91891122e83f74d21c83c5f8fcd3a618 (diff) |
Merge tag '0.7.3' into debian/experimental
Tag leap.soledad version 0.7.3
28 files changed, 1322 insertions, 1752 deletions
@@ -1,3 +1,19 @@ +0.7.3 Sep 22, 2015: +Client: + o Bugfix: refactor code loss. Closes #7412. + o Bugfix: Set active secret before saving local file. + o Split http_target into 4 modules, separating those responsibilities. + o Refactor details of making an HTTP request body and headers out of the + send/fetch logic. This also makes it easier to enable batching. + +Server: + o Fix a bug where BadRequest could be raised after everything was persisted. + +Common: + o Refactor couch.py to separate persistence from logic while saving uploaded + documents. Also simplify logic while checking for conflicts. + + 0.7.2 Aug 26, 2015: Client: o Remove MAC from secrets file. Closes #6980. @@ -6,7 +22,7 @@ Client: o Improve how we send information on SOLEDAD_SYNC_SEND_STATUS and in SOLEDAD_SYNC_RECEIVE_STATUS. Related to Feature #7353. o Fix hanging sync by properly waiting db initialization on sync decrypter - pool. Closes #7686. + pool. Closes #7386. o Avoid double decryption of documents. o Fix the order of the events emited for incoming documents. o bugfix: move sync db and encpool creation to api. @@ -13,18 +13,24 @@ repository: **leap.soledad.common** common pieces. -.. image:: https://pypip.in/v/leap.soledad.common/badge.png - :target: https://crate.io/packages/leap.soledad.common +.. image:: https://badge.fury.io/py/leap.soledad.common.svg + :target: http://badge.fury.io/py/leap.soledad.common +.. image:: https://img.shields.io/pypi/dm/leap.soledad.common.svg + :target: http://badge.fury.io/py/leap.soledad.common **leap.soledad.client** where the soledad client lives. -.. image:: https://pypip.in/v/leap.soledad.client/badge.png - :target: https://crate.io/packages/leap.soledad.client +.. image:: https://badge.fury.io/py/leap.soledad.client.svg + :target: http://badge.fury.io/py/leap.soledad.client +.. image:: https://img.shields.io/pypi/dm/leap.soledad.client.svg + :target: http://badge.fury.io/py/leap.soledad.client **leap.soledad.server** oh surprise! bits needed for the soledad server. -.. image:: https://pypip.in/v/leap.soledad.server/badge.png - :target: https://crate.io/packages/leap.soledad.server +.. image:: https://badge.fury.io/py/leap.soledad.server.svg + :target: http://badge.fury.io/py/leap.soledad.server +.. image:: https://img.shields.io/pypi/dm/leap.soledad.server.svg + :target: http://badge.fury.io/py/leap.soledad.server Compatibility diff --git a/client/pkg/requirements-leap.pip b/client/pkg/requirements-leap.pip index c5fbcd5f..52d1263b 100644 --- a/client/pkg/requirements-leap.pip +++ b/client/pkg/requirements-leap.pip @@ -1,2 +1,2 @@ -leap.common>=0.4.1 -leap.soledad.common>=0.6.5 +leap.common>=0.4.3 +leap.soledad.common>=0.7.0 diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 237159bd..77822247 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -285,7 +285,8 @@ class U1DBConnectionPool(adbapi.ConnectionPool): A final close, only called by the shutdown trigger. """ self.shutdownID = None - self.threadpool.stop() + if self.threadpool.started: + self.threadpool.stop() self.running = False for conn in self.connections.values(): self._close(conn) diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index a6a98551..a558addd 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -718,7 +718,7 @@ class Soledad(object): return failure def _emit_done_data_sync(passthrough): - soledad_events.emit( + soledad_events.emit_async( soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid) return passthrough diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 2ad98767..6d3c11b9 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -74,6 +74,8 @@ class SyncEncryptDecryptPool(object): self._started = True def stop(self): + if not self._started: + return self._started = False self._destroy_pool() # maybe cancel the next delayed call diff --git a/client/src/leap/soledad/client/events.py b/client/src/leap/soledad/client/events.py index b1379521..058be59c 100644 --- a/client/src/leap/soledad/client/events.py +++ b/client/src/leap/soledad/client/events.py @@ -20,7 +20,7 @@ Signaling functions. """ -from leap.common.events import emit +from leap.common.events import emit_async from leap.common.events import catalog @@ -40,7 +40,7 @@ SOLEDAD_SYNC_RECEIVE_STATUS = catalog.SOLEDAD_SYNC_RECEIVE_STATUS __all__ = [ "catalog", - "emit", + "emit_async", "SOLEDAD_CREATING_KEYS", "SOLEDAD_DONE_CREATING_KEYS", "SOLEDAD_DOWNLOADING_KEYS", diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py deleted file mode 100644 index a6ef2b0d..00000000 --- a/client/src/leap/soledad/client/http_target.py +++ /dev/null @@ -1,711 +0,0 @@ -# -*- coding: utf-8 -*- -# http_target.py -# Copyright (C) 2015 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -A U1DB backend for encrypting data before sending to server and decrypting -after receiving. -""" - - -import json -import base64 -import logging -import warnings - -from uuid import uuid4 - -from twisted.internet import defer -from twisted.web.error import Error -from twisted.web.client import _ReadBodyProtocol -from twisted.web.client import PartialDownloadError -from twisted.web._newclient import ResponseDone -from twisted.web._newclient import PotentialDataLoss - -from u1db import errors -from u1db import SyncTarget -from u1db.remote import utils -from u1db.remote import http_errors - -from leap.common.http import HTTPClient - -from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.errors import InvalidAuthTokenError - -from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS -from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS -from leap.soledad.client.events import emit -from leap.soledad.client.encdecpool import SyncDecrypterPool - - -logger = logging.getLogger(__name__) - - -# we want to make sure that HTTP errors will raise appropriate u1db errors, -# that is, fire errbacks with the appropriate failures, in the context of -# twisted. Because of that, we redefine the http body reader used by the HTTP -# client below. - -class ReadBodyProtocol(_ReadBodyProtocol): - - def __init__(self, response, deferred): - """ - Initialize the protocol, additionally storing the response headers. - """ - _ReadBodyProtocol.__init__( - self, response.code, response.phrase, deferred) - self.headers = response.headers - - # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks - def _error(self, respdic): - descr = respdic.get("error") - exc_cls = errors.wire_description_to_exc.get(descr) - if exc_cls is not None: - message = respdic.get("message") - self.deferred.errback(exc_cls(message)) - # ---8<--- end of snippet from u1db.remote.http_client - - def connectionLost(self, reason): - """ - Deliver the accumulated response bytes to the waiting L{Deferred}, if - the response body has been completely received without error. - """ - if reason.check(ResponseDone): - - body = b''.join(self.dataBuffer) - - # ---8<--- snippet from u1db.remote.http_client - if self.status in (200, 201): - self.deferred.callback(body) - elif self.status in http_errors.ERROR_STATUSES: - try: - respdic = json.loads(body) - except ValueError: - self.deferred.errback( - errors.HTTPError(self.status, body, self.headers)) - else: - self._error(respdic) - # special cases - elif self.status == 503: - self.deferred.errback(errors.Unavailable(body, self.headers)) - else: - self.deferred.errback( - errors.HTTPError(self.status, body, self.headers)) - # ---8<--- end of snippet from u1db.remote.http_client - - elif reason.check(PotentialDataLoss): - self.deferred.errback( - PartialDownloadError(self.status, self.message, - b''.join(self.dataBuffer))) - else: - self.deferred.errback(reason) - - -def readBody(response): - """ - Get the body of an L{IResponse} and return it as a byte string. - - This is a helper function for clients that don't want to incrementally - receive the body of an HTTP response. - - @param response: The HTTP response for which the body will be read. - @type response: L{IResponse} provider - - @return: A L{Deferred} which will fire with the body of the response. - Cancelling it will close the connection to the server immediately. - """ - def cancel(deferred): - """ - Cancel a L{readBody} call, close the connection to the HTTP server - immediately, if it is still open. - - @param deferred: The cancelled L{defer.Deferred}. - """ - abort = getAbort() - if abort is not None: - abort() - - d = defer.Deferred(cancel) - protocol = ReadBodyProtocol(response, d) - - def getAbort(): - return getattr(protocol.transport, 'abortConnection', None) - - response.deliverBody(protocol) - - if protocol.transport is not None and getAbort() is None: - warnings.warn( - 'Using readBody with a transport that does not have an ' - 'abortConnection method', - category=DeprecationWarning, - stacklevel=2) - - return d - - -class SoledadHTTPSyncTarget(SyncTarget): - - """ - A SyncTarget that encrypts data before sending and decrypts data after - receiving. - - Normally encryption will have been written to the sync database upon - document modification. The sync database is also used to write temporarily - the parsed documents that the remote send us, before being decrypted and - written to the main database. - """ - - def __init__(self, url, source_replica_uid, creds, crypto, cert_file, - sync_db=None, sync_enc_pool=None): - """ - Initialize the sync target. - - :param url: The server sync url. - :type url: str - :param source_replica_uid: The source replica uid which we use when - deferring decryption. - :type source_replica_uid: str - :param creds: A dictionary containing the uuid and token. - :type creds: creds - :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt - document contents when syncing. - :type crypto: soledad.crypto.SoledadCrypto - :param cert_file: Path to the certificate of the ca used to validate - the SSL certificate used by the remote soledad - server. - :type cert_file: str - :param sync_db: Optional. handler for the db with the symmetric - encryption of the syncing documents. If - None, encryption will be done in-place, - instead of retreiving it from the dedicated - database. - :type sync_db: Sqlite handler - :param sync_enc_pool: The encryption pool to use to defer encryption. - If None is passed the encryption will not be - deferred. - :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool - """ - if url.endswith("/"): - url = url[:-1] - self._url = str(url) + "/sync-from/" + str(source_replica_uid) - self.source_replica_uid = source_replica_uid - self._auth_header = None - self.set_creds(creds) - self._crypto = crypto - self._sync_db = sync_db - self._sync_enc_pool = sync_enc_pool - self._insert_doc_cb = None - # asynchronous encryption/decryption attributes - self._decryption_callback = None - self._sync_decr_pool = None - self._http = HTTPClient(cert_file) - - def close(self): - self._http.close() - - def set_creds(self, creds): - """ - Update credentials. - - :param creds: A dictionary containing the uuid and token. - :type creds: dict - """ - uuid = creds['token']['uuid'] - token = creds['token']['token'] - auth = '%s:%s' % (uuid, token) - b64_token = base64.b64encode(auth) - self._auth_header = {'Authorization': ['Token %s' % b64_token]} - - @property - def _defer_encryption(self): - return self._sync_enc_pool is not None - - # - # SyncTarget API - # - - @defer.inlineCallbacks - def get_sync_info(self, source_replica_uid): - """ - Return information about known state of remote database. - - Return the replica_uid and the current database generation of the - remote database, and its last-seen database generation for the client - replica. - - :param source_replica_uid: The client-size replica uid. - :type source_replica_uid: str - - :return: A deferred which fires with (target_replica_uid, - target_replica_generation, target_trans_id, - source_replica_last_known_generation, - source_replica_last_known_transaction_id) - :rtype: twisted.internet.defer.Deferred - """ - raw = yield self._http_request(self._url, headers=self._auth_header) - res = json.loads(raw) - defer.returnValue(( - res['target_replica_uid'], - res['target_replica_generation'], - res['target_replica_transaction_id'], - res['source_replica_generation'], - res['source_transaction_id'] - )) - - def record_sync_info( - self, source_replica_uid, source_replica_generation, - source_replica_transaction_id): - """ - Record tip information for another replica. - - After sync_exchange has been processed, the caller will have - received new content from this replica. This call allows the - source replica instigating the sync to inform us what their - generation became after applying the documents we returned. - - This is used to allow future sync operations to not need to repeat data - that we just talked about. It also means that if this is called at the - wrong time, there can be database records that will never be - synchronized. - - :param source_replica_uid: The identifier for the source replica. - :type source_replica_uid: str - :param source_replica_generation: The database generation for the - source replica. - :type source_replica_generation: int - :param source_replica_transaction_id: The transaction id associated - with the source replica - generation. - :type source_replica_transaction_id: str - - :return: A deferred which fires with the result of the query. - :rtype: twisted.internet.defer.Deferred - """ - data = json.dumps({ - 'generation': source_replica_generation, - 'transaction_id': source_replica_transaction_id - }) - headers = self._auth_header.copy() - headers.update({'content-type': ['application/json']}) - return self._http_request( - self._url, - method='PUT', - headers=headers, - body=data) - - @defer.inlineCallbacks - def sync_exchange(self, docs_by_generation, source_replica_uid, - last_known_generation, last_known_trans_id, - insert_doc_cb, ensure_callback=None, - defer_decryption=True, sync_id=None): - """ - Find out which documents the remote database does not know about, - encrypt and send them. After that, receive documents from the remote - database. - - :param docs_by_generations: A list of (doc_id, generation, trans_id) - of local documents that were changed since - the last local generation the remote - replica knows about. - :type docs_by_generations: list of tuples - - :param source_replica_uid: The uid of the source replica. - :type source_replica_uid: str - - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - - :param insert_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type insert_doc_cb: function - - :param ensure_callback: A callback that ensures we know the target - replica uid if the target replica was just - created. - :type ensure_callback: function - - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - - :return: A deferred which fires with the new generation and - transaction id of the target replica. - :rtype: twisted.internet.defer.Deferred - """ - - self._ensure_callback = ensure_callback - - if sync_id is None: - sync_id = str(uuid4()) - self.source_replica_uid = source_replica_uid - - # save a reference to the callback so we can use it after decrypting - self._insert_doc_cb = insert_doc_cb - - gen_after_send, trans_id_after_send = yield self._send_docs( - docs_by_generation, - last_known_generation, - last_known_trans_id, - sync_id) - - cur_target_gen, cur_target_trans_id = yield self._receive_docs( - last_known_generation, last_known_trans_id, - ensure_callback, sync_id, - defer_decryption=defer_decryption) - - # update gen and trans id info in case we just sent and did not - # receive docs. - if gen_after_send is not None and gen_after_send > cur_target_gen: - cur_target_gen = gen_after_send - cur_target_trans_id = trans_id_after_send - - defer.returnValue([cur_target_gen, cur_target_trans_id]) - - # - # methods to send docs - # - - def _prepare(self, comma, entries, **dic): - entry = comma + '\r\n' + json.dumps(dic) - entries.append(entry) - return len(entry) - - @defer.inlineCallbacks - def _send_docs(self, docs_by_generation, last_known_generation, - last_known_trans_id, sync_id): - - if not docs_by_generation: - defer.returnValue([None, None]) - - headers = self._auth_header.copy() - headers.update({'content-type': ['application/x-soledad-sync-put']}) - # add remote replica metadata to the request - first_entries = ['['] - self._prepare( - '', first_entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=self._ensure_callback is not None) - idx = 0 - total = len(docs_by_generation) - for doc, gen, trans_id in docs_by_generation: - idx += 1 - result = yield self._send_one_doc( - headers, first_entries, doc, - gen, trans_id, total, idx) - if self._defer_encryption: - self._sync_enc_pool.delete_encrypted_doc( - doc.doc_id, doc.rev) - - msg = "%d/%d" % (idx, total) - content = {'sent': idx, 'total': total} - emit(SOLEDAD_SYNC_SEND_STATUS, content) - logger.debug("Sync send status: %s" % msg) - - response_dict = json.loads(result)[0] - gen_after_send = response_dict['new_generation'] - trans_id_after_send = response_dict['new_transaction_id'] - defer.returnValue([gen_after_send, trans_id_after_send]) - - @defer.inlineCallbacks - def _send_one_doc(self, headers, first_entries, doc, gen, trans_id, - number_of_docs, doc_idx): - entries = first_entries[:] - # add the document to the request - content = yield self._encrypt_doc(doc) - self._prepare( - ',', entries, - id=doc.doc_id, rev=doc.rev, content=content, gen=gen, - trans_id=trans_id, number_of_docs=number_of_docs, - doc_idx=doc_idx) - entries.append('\r\n]') - data = ''.join(entries) - result = yield self._http_request( - self._url, - method='POST', - headers=headers, - body=data) - defer.returnValue(result) - - def _encrypt_doc(self, doc): - d = None - if doc.is_tombstone(): - d = defer.succeed(None) - elif not self._defer_encryption: - # fallback case, for tests - d = defer.succeed(self._crypto.encrypt_doc(doc)) - else: - - def _maybe_encrypt_doc_inline(doc_json): - if doc_json is None: - # the document is not marked as tombstone, but we got - # nothing from the sync db. As it is not encrypted - # yet, we force inline encryption. - return self._crypto.encrypt_doc(doc) - return doc_json - - d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) - d.addCallback(_maybe_encrypt_doc_inline) - return d - - # - # methods to receive doc - # - - @defer.inlineCallbacks - def _receive_docs(self, last_known_generation, last_known_trans_id, - ensure_callback, sync_id, defer_decryption): - - self._queue_for_decrypt = defer_decryption \ - and self._sync_db is not None - - new_generation = last_known_generation - new_transaction_id = last_known_trans_id - - if self._queue_for_decrypt: - logger.debug( - "Soledad sync: will queue received docs for decrypting.") - - if defer_decryption: - self._setup_sync_decr_pool() - - headers = self._auth_header.copy() - headers.update({'content-type': ['application/x-soledad-sync-get']}) - - # --------------------------------------------------------------------- - # maybe receive the first document - # --------------------------------------------------------------------- - - # we fetch the first document before fetching the rest because we need - # to know the total number of documents to be received, and this - # information comes as metadata to each request. - - doc = yield self._receive_one_doc( - headers, last_known_generation, last_known_trans_id, - sync_id, 0) - self._received_docs = 0 - number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) - - # update the target gen and trans_id in case a document was received - if ngen: - new_generation = ngen - new_transaction_id = ntrans - - if defer_decryption: - self._sync_decr_pool.start(number_of_changes) - - # --------------------------------------------------------------------- - # maybe receive the rest of the documents - # --------------------------------------------------------------------- - - # launch many asynchronous fetches and inserts of received documents - # in the temporary sync db. Will wait for all results before - # continuing. - - received = 1 - deferreds = [] - while received < number_of_changes: - d = self._receive_one_doc( - headers, last_known_generation, - last_known_trans_id, sync_id, received) - d.addCallback( - self._insert_received_doc, - received + 1, # the index of the current received doc - number_of_changes) - deferreds.append(d) - received += 1 - results = yield defer.gatherResults(deferreds) - - # get generation and transaction id of target after insertions - if deferreds: - _, new_generation, new_transaction_id = results.pop() - - # --------------------------------------------------------------------- - # wait for async decryption to finish - # --------------------------------------------------------------------- - - if defer_decryption: - yield self._sync_decr_pool.deferred - self._sync_decr_pool.stop() - - defer.returnValue([new_generation, new_transaction_id]) - - def _receive_one_doc(self, headers, last_known_generation, - last_known_trans_id, sync_id, received): - entries = ['['] - # add remote replica metadata to the request - self._prepare( - '', entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=self._ensure_callback is not None) - # inform server of how many documents have already been received - self._prepare( - ',', entries, received=received) - entries.append('\r\n]') - # send headers - return self._http_request( - self._url, - method='POST', - headers=headers, - body=''.join(entries)) - - def _insert_received_doc(self, response, idx, total): - """ - Insert a received document into the local replica. - - :param response: The body and headers of the response. - :type response: tuple(str, dict) - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - """ - new_generation, new_transaction_id, number_of_changes, doc_id, \ - rev, content, gen, trans_id = \ - self._parse_received_doc_response(response) - if doc_id is not None: - # decrypt incoming document and insert into local database - # ------------------------------------------------------------- - # symmetric decryption of document's contents - # ------------------------------------------------------------- - # If arriving content was symmetrically encrypted, we decrypt it. - # We do it inline if defer_decryption flag is False or no sync_db - # was defined, otherwise we defer it writing it to the received - # docs table. - doc = SoledadDocument(doc_id, rev, content) - if is_symmetrically_encrypted(doc): - if self._queue_for_decrypt: - self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - # defer_decryption is False or no-sync-db fallback - doc.set_json(self._crypto.decrypt_doc(doc)) - self._insert_doc_cb(doc, gen, trans_id) - else: - # not symmetrically encrypted doc, insert it directly - # or save it in the decrypted stage. - if self._queue_for_decrypt: - self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - self._insert_doc_cb(doc, gen, trans_id) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- - self._received_docs += 1 - msg = "%d/%d" % (self._received_docs, total) - content = {'received': self._received_docs, 'total': total} - emit(SOLEDAD_SYNC_RECEIVE_STATUS, content) - logger.debug("Sync receive status: %s" % msg) - return number_of_changes, new_generation, new_transaction_id - - def _parse_received_doc_response(self, response): - """ - Parse the response from the server containing the received document. - - :param response: The body and headers of the response. - :type response: tuple(str, dict) - - :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, - content, gen, trans_id) - :rtype: tuple - """ - # decode incoming stream - parts = response.splitlines() - if not parts or parts[0] != '[' or parts[-1] != ']': - raise errors.BrokenSyncStream - data = parts[1:-1] - # decode metadata - try: - line, comma = utils.check_and_strip_comma(data[0]) - metadata = None - except (IndexError): - raise errors.BrokenSyncStream - try: - metadata = json.loads(line) - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - number_of_changes = metadata['number_of_changes'] - except (ValueError, KeyError): - raise errors.BrokenSyncStream - # make sure we have replica_uid from fresh new dbs - if self._ensure_callback and 'replica_uid' in metadata: - self._ensure_callback(metadata['replica_uid']) - # parse incoming document info - doc_id = None - rev = None - content = None - gen = None - trans_id = None - if number_of_changes > 0: - try: - entry = json.loads(data[1]) - doc_id = entry['id'] - rev = entry['rev'] - content = entry['content'] - gen = entry['gen'] - trans_id = entry['trans_id'] - except (IndexError, KeyError): - raise errors.BrokenSyncStream - return new_generation, new_transaction_id, number_of_changes, \ - doc_id, rev, content, gen, trans_id - - def _setup_sync_decr_pool(self): - """ - Set up the SyncDecrypterPool for deferred decryption. - """ - if self._sync_decr_pool is None and self._sync_db is not None: - # initialize syncing queue decryption pool - self._sync_decr_pool = SyncDecrypterPool( - self._crypto, - self._sync_db, - insert_doc_cb=self._insert_doc_cb, - source_replica_uid=self.source_replica_uid) - - def _http_request(self, url, method='GET', body=None, headers={}): - d = self._http.request(url, method, body, headers, readBody) - d.addErrback(_unauth_to_invalid_token_error) - return d - - -def _unauth_to_invalid_token_error(failure): - """ - An errback to translate unauthorized errors to our own invalid token - class. - - :param failure: The original failure. - :type failure: twisted.python.failure.Failure - - :return: Either the original failure or an invalid auth token error. - :rtype: twisted.python.failure.Failure - """ - failure.trap(Error) - if failure.getErrorMessage() == "401 Unauthorized": - raise InvalidAuthTokenError - return failure diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py new file mode 100644 index 00000000..7a5cea9f --- /dev/null +++ b/client/src/leap/soledad/client/http_target/__init__.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +A U1DB backend for encrypting data before sending to server and decrypting +after receiving. +""" + + +import logging + +from leap.common.http import HTTPClient +from leap.soledad.client.http_target.send import HTTPDocSender +from leap.soledad.client.http_target.api import SyncTargetAPI +from leap.soledad.client.http_target.fetch import HTTPDocFetcher + + +logger = logging.getLogger(__name__) + + +class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): + + """ + A SyncTarget that encrypts data before sending and decrypts data after + receiving. + + Normally encryption will have been written to the sync database upon + document modification. The sync database is also used to write temporarily + the parsed documents that the remote send us, before being decrypted and + written to the main database. + """ + def __init__(self, url, source_replica_uid, creds, crypto, cert_file, + sync_db=None, sync_enc_pool=None): + """ + Initialize the sync target. + + :param url: The server sync url. + :type url: str + :param source_replica_uid: The source replica uid which we use when + deferring decryption. + :type source_replica_uid: str + :param creds: A dictionary containing the uuid and token. + :type creds: creds + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt + document contents when syncing. + :type crypto: soledad.crypto.SoledadCrypto + :param cert_file: Path to the certificate of the ca used to validate + the SSL certificate used by the remote soledad + server. + :type cert_file: str + :param sync_db: Optional. handler for the db with the symmetric + encryption of the syncing documents. If + None, encryption will be done in-place, + instead of retreiving it from the dedicated + database. + :type sync_db: Sqlite handler + :param sync_enc_pool: The encryption pool to use to defer encryption. + If None is passed the encryption will not be + deferred. + :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool + """ + if url.endswith("/"): + url = url[:-1] + self._url = str(url) + "/sync-from/" + str(source_replica_uid) + self.source_replica_uid = source_replica_uid + self._auth_header = None + self.set_creds(creds) + self._crypto = crypto + self._sync_db = sync_db + self._sync_enc_pool = sync_enc_pool + self._insert_doc_cb = None + # asynchronous encryption/decryption attributes + self._decryption_callback = None + self._sync_decr_pool = None + self._http = HTTPClient(cert_file) diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py new file mode 100644 index 00000000..dcc762f6 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/api.py @@ -0,0 +1,229 @@ +# -*- coding: utf-8 -*- +# api.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +import base64 + +from uuid import uuid4 +from u1db import SyncTarget + +from twisted.web.error import Error +from twisted.internet import defer + +from leap.soledad.common.errors import InvalidAuthTokenError +from leap.soledad.client.http_target.support import readBody + + +class SyncTargetAPI(SyncTarget): + """ + Declares public methods and implements u1db.SyncTarget. + """ + + @defer.inlineCallbacks + def close(self): + if self._sync_enc_pool: + self._sync_enc_pool.stop() + if self._sync_decr_pool: + self._sync_decr_pool.stop() + yield self._http.close() + + def set_creds(self, creds): + """ + Update credentials. + + :param creds: A dictionary containing the uuid and token. + :type creds: dict + """ + uuid = creds['token']['uuid'] + token = creds['token']['token'] + auth = '%s:%s' % (uuid, token) + b64_token = base64.b64encode(auth) + self._auth_header = {'Authorization': ['Token %s' % b64_token]} + + @property + def _base_header(self): + return self._auth_header.copy() if self._auth_header else {} + + @property + def _defer_encryption(self): + return self._sync_enc_pool is not None + + def _http_request(self, url, method='GET', body=None, headers=None, + content_type=None): + headers = headers or self._base_header + if content_type: + headers.update({'content-type': [content_type]}) + d = self._http.request(url, method, body, headers, readBody) + d.addErrback(_unauth_to_invalid_token_error) + return d + + @defer.inlineCallbacks + def get_sync_info(self, source_replica_uid): + """ + Return information about known state of remote database. + + Return the replica_uid and the current database generation of the + remote database, and its last-seen database generation for the client + replica. + + :param source_replica_uid: The client-size replica uid. + :type source_replica_uid: str + + :return: A deferred which fires with (target_replica_uid, + target_replica_generation, target_trans_id, + source_replica_last_known_generation, + source_replica_last_known_transaction_id) + :rtype: twisted.internet.defer.Deferred + """ + raw = yield self._http_request(self._url) + res = json.loads(raw) + defer.returnValue(( + res['target_replica_uid'], + res['target_replica_generation'], + res['target_replica_transaction_id'], + res['source_replica_generation'], + res['source_transaction_id'] + )) + + def record_sync_info( + self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + """ + Record tip information for another replica. + + After sync_exchange has been processed, the caller will have + received new content from this replica. This call allows the + source replica instigating the sync to inform us what their + generation became after applying the documents we returned. + + This is used to allow future sync operations to not need to repeat data + that we just talked about. It also means that if this is called at the + wrong time, there can be database records that will never be + synchronized. + + :param source_replica_uid: The identifier for the source replica. + :type source_replica_uid: str + :param source_replica_generation: The database generation for the + source replica. + :type source_replica_generation: int + :param source_replica_transaction_id: The transaction id associated + with the source replica + generation. + :type source_replica_transaction_id: str + + :return: A deferred which fires with the result of the query. + :rtype: twisted.internet.defer.Deferred + """ + data = json.dumps({ + 'generation': source_replica_generation, + 'transaction_id': source_replica_transaction_id + }) + return self._http_request( + self._url, + method='PUT', + body=data, + content_type='application/json') + + @defer.inlineCallbacks + def sync_exchange(self, docs_by_generation, source_replica_uid, + last_known_generation, last_known_trans_id, + insert_doc_cb, ensure_callback=None, + defer_decryption=True, sync_id=None): + """ + Find out which documents the remote database does not know about, + encrypt and send them. After that, receive documents from the remote + database. + + :param docs_by_generations: A list of (doc_id, generation, trans_id) + of local documents that were changed since + the last local generation the remote + replica knows about. + :type docs_by_generations: list of tuples + + :param source_replica_uid: The uid of the source replica. + :type source_replica_uid: str + + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + + :param insert_doc_cb: A callback for inserting received documents from + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. + :type insert_doc_cb: function + + :param ensure_callback: A callback that ensures we know the target + replica uid if the target replica was just + created. + :type ensure_callback: function + + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + + :return: A deferred which fires with the new generation and + transaction id of the target replica. + :rtype: twisted.internet.defer.Deferred + """ + + self._ensure_callback = ensure_callback + + if sync_id is None: + sync_id = str(uuid4()) + self.source_replica_uid = source_replica_uid + + # save a reference to the callback so we can use it after decrypting + self._insert_doc_cb = insert_doc_cb + + gen_after_send, trans_id_after_send = yield self._send_docs( + docs_by_generation, + last_known_generation, + last_known_trans_id, + sync_id) + + cur_target_gen, cur_target_trans_id = yield self._receive_docs( + last_known_generation, last_known_trans_id, + ensure_callback, sync_id, + defer_decryption=defer_decryption) + + # update gen and trans id info in case we just sent and did not + # receive docs. + if gen_after_send is not None and gen_after_send > cur_target_gen: + cur_target_gen = gen_after_send + cur_target_trans_id = trans_id_after_send + + defer.returnValue([cur_target_gen, cur_target_trans_id]) + + +def _unauth_to_invalid_token_error(failure): + """ + An errback to translate unauthorized errors to our own invalid token + class. + + :param failure: The original failure. + :type failure: twisted.python.failure.Failure + + :return: Either the original failure or an invalid auth token error. + :rtype: twisted.python.failure.Failure + """ + failure.trap(Error) + if failure.getErrorMessage() == "401 Unauthorized": + raise InvalidAuthTokenError + return failure diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py new file mode 100644 index 00000000..65e576d9 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -0,0 +1,252 @@ +# -*- coding: utf-8 -*- +# fetch.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import logging +import json +from u1db import errors +from u1db.remote import utils +from twisted.internet import defer +from leap.soledad.common.document import SoledadDocument +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import emit_async +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.encdecpool import SyncDecrypterPool +from leap.soledad.client.http_target.support import RequestBody + +logger = logging.getLogger(__name__) + + +class HTTPDocFetcher(object): + """ + Handles Document fetching from Soledad server, using HTTP as transport. + Steps: + * Prepares metadata by asking server for one document + * Fetch the total on response and prepare to ask all remaining + * (async) Documents will come encrypted. + So we parse, decrypt and insert locally as they arrive. + """ + + @defer.inlineCallbacks + def _receive_docs(self, last_known_generation, last_known_trans_id, + ensure_callback, sync_id, defer_decryption): + + self._queue_for_decrypt = defer_decryption \ + and self._sync_db is not None + + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + + if self._queue_for_decrypt: + logger.debug( + "Soledad sync: will queue received docs for decrypting.") + + if defer_decryption: + self._setup_sync_decr_pool() + + # --------------------------------------------------------------------- + # maybe receive the first document + # --------------------------------------------------------------------- + + # we fetch the first document before fetching the rest because we need + # to know the total number of documents to be received, and this + # information comes as metadata to each request. + + doc = yield self._receive_one_doc( + last_known_generation, last_known_trans_id, + sync_id, 0) + self._received_docs = 0 + number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) + + if ngen: + new_generation = ngen + new_transaction_id = ntrans + + if defer_decryption: + self._sync_decr_pool.start(number_of_changes) + + # --------------------------------------------------------------------- + # maybe receive the rest of the documents + # --------------------------------------------------------------------- + + # launch many asynchronous fetches and inserts of received documents + # in the temporary sync db. Will wait for all results before + # continuing. + + received = 1 + deferreds = [] + while received < number_of_changes: + d = self._receive_one_doc( + last_known_generation, + last_known_trans_id, sync_id, received) + d.addCallback( + self._insert_received_doc, + received + 1, # the index of the current received doc + number_of_changes) + deferreds.append(d) + received += 1 + results = yield defer.gatherResults(deferreds) + + # get generation and transaction id of target after insertions + if deferreds: + _, new_generation, new_transaction_id = results.pop() + + # --------------------------------------------------------------------- + # wait for async decryption to finish + # --------------------------------------------------------------------- + + if defer_decryption: + yield self._sync_decr_pool.deferred + self._sync_decr_pool.stop() + + defer.returnValue([new_generation, new_transaction_id]) + + def _receive_one_doc(self, last_known_generation, + last_known_trans_id, sync_id, received): + # add remote replica metadata to the request + body = RequestBody( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + # inform server of how many documents have already been received + body.insert_info(received=received) + # send headers + return self._http_request( + self._url, + method='POST', + body=str(body), + content_type='application/x-soledad-sync-get') + + def _insert_received_doc(self, response, idx, total): + """ + Insert a received document into the local replica. + + :param response: The body and headers of the response. + :type response: tuple(str, dict) + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + """ + new_generation, new_transaction_id, number_of_changes, doc_id, \ + rev, content, gen, trans_id = \ + self._parse_received_doc_response(response) + if doc_id is not None: + # decrypt incoming document and insert into local database + # ------------------------------------------------------------- + # symmetric decryption of document's contents + # ------------------------------------------------------------- + # If arriving content was symmetrically encrypted, we decrypt it. + # We do it inline if defer_decryption flag is False or no sync_db + # was defined, otherwise we defer it writing it to the received + # docs table. + doc = SoledadDocument(doc_id, rev, content) + if is_symmetrically_encrypted(doc): + if self._queue_for_decrypt: + self._sync_decr_pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + # defer_decryption is False or no-sync-db fallback + doc.set_json(self._crypto.decrypt_doc(doc)) + self._insert_doc_cb(doc, gen, trans_id) + else: + # not symmetrically encrypted doc, insert it directly + # or save it in the decrypted stage. + if self._queue_for_decrypt: + self._sync_decr_pool.insert_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + self._insert_doc_cb(doc, gen, trans_id) + # ------------------------------------------------------------- + # end of symmetric decryption + # ------------------------------------------------------------- + self._received_docs += 1 + _emit_receive_status(self._received_docs, total) + return number_of_changes, new_generation, new_transaction_id + + def _parse_received_doc_response(self, response): + """ + Parse the response from the server containing the received document. + + :param response: The body and headers of the response. + :type response: tuple(str, dict) + + :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, + content, gen, trans_id) + :rtype: tuple + """ + # decode incoming stream + parts = response.splitlines() + if not parts or parts[0] != '[' or parts[-1] != ']': + raise errors.BrokenSyncStream + data = parts[1:-1] + # decode metadata + try: + line, comma = utils.check_and_strip_comma(data[0]) + metadata = None + except (IndexError): + raise errors.BrokenSyncStream + try: + metadata = json.loads(line) + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + number_of_changes = metadata['number_of_changes'] + except (ValueError, KeyError): + raise errors.BrokenSyncStream + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + # parse incoming document info + doc_id = None + rev = None + content = None + gen = None + trans_id = None + if number_of_changes > 0: + try: + entry = json.loads(data[1]) + doc_id = entry['id'] + rev = entry['rev'] + content = entry['content'] + gen = entry['gen'] + trans_id = entry['trans_id'] + except (IndexError, KeyError): + raise errors.BrokenSyncStream + return new_generation, new_transaction_id, number_of_changes, \ + doc_id, rev, content, gen, trans_id + + def _setup_sync_decr_pool(self): + """ + Set up the SyncDecrypterPool for deferred decryption. + """ + if self._sync_decr_pool is None and self._sync_db is not None: + # initialize syncing queue decryption pool + self._sync_decr_pool = SyncDecrypterPool( + self._crypto, + self._sync_db, + insert_doc_cb=self._insert_doc_cb, + source_replica_uid=self.source_replica_uid) + + +def _emit_receive_status(received_docs, total): + content = {'received': received_docs, 'total': total} + emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, content) + + if received_docs % 20 == 0: + msg = "%d/%d" % (received_docs, total) + logger.debug("Sync receive status: %s" % msg) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py new file mode 100644 index 00000000..80483f0d --- /dev/null +++ b/client/src/leap/soledad/client/http_target/send.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# send.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +import logging +from twisted.internet import defer +from leap.soledad.client.events import emit_async +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.http_target.support import RequestBody +logger = logging.getLogger(__name__) + + +class HTTPDocSender(object): + """ + Handles Document uploading from Soledad server, using HTTP as transport. + They need to be encrypted and metadata prepared before sending. + """ + + @defer.inlineCallbacks + def _send_docs(self, docs_by_generation, last_known_generation, + last_known_trans_id, sync_id): + + if not docs_by_generation: + defer.returnValue([None, None]) + + # add remote replica metadata to the request + body = RequestBody( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + total = len(docs_by_generation) + for idx, entry in enumerate(docs_by_generation, 1): + yield self._prepare_one_doc(entry, body, idx, total) + result = yield self._http_request( + self._url, + method='POST', + body=body.pop(1), + content_type='application/x-soledad-sync-put') + if self._defer_encryption: + self._delete_sent(idx, docs_by_generation) + _emit_send_status(idx, total) + response_dict = json.loads(result)[0] + gen_after_send = response_dict['new_generation'] + trans_id_after_send = response_dict['new_transaction_id'] + defer.returnValue([gen_after_send, trans_id_after_send]) + + def _delete_sent(self, idx, docs_by_generation): + doc = docs_by_generation[idx - 1][0] + self._sync_enc_pool.delete_encrypted_doc( + doc.doc_id, doc.rev) + + @defer.inlineCallbacks + def _prepare_one_doc(self, entry, body, idx, total): + doc, gen, trans_id = entry + content = yield self._encrypt_doc(doc) + body.insert_info( + id=doc.doc_id, rev=doc.rev, content=content, gen=gen, + trans_id=trans_id, number_of_docs=total, + doc_idx=idx) + + def _encrypt_doc(self, doc): + d = None + if doc.is_tombstone(): + d = defer.succeed(None) + elif not self._defer_encryption: + # fallback case, for tests + d = defer.succeed(self._crypto.encrypt_doc(doc)) + else: + + def _maybe_encrypt_doc_inline(doc_json): + if doc_json is None: + # the document is not marked as tombstone, but we got + # nothing from the sync db. As it is not encrypted + # yet, we force inline encryption. + return self._crypto.encrypt_doc(doc) + return doc_json + + d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) + d.addCallback(_maybe_encrypt_doc_inline) + return d + + +def _emit_send_status(idx, total): + content = {'sent': idx, 'total': total} + emit_async(SOLEDAD_SYNC_SEND_STATUS, content) + + msg = "%d/%d" % (idx, total) + logger.debug("Sync send status: %s" % msg) diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py new file mode 100644 index 00000000..44cd7089 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/support.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- +# support.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import warnings +import json +from u1db import errors +from u1db.remote import http_errors +from twisted.internet import defer +from twisted.web.client import _ReadBodyProtocol +from twisted.web.client import PartialDownloadError +from twisted.web._newclient import ResponseDone +from twisted.web._newclient import PotentialDataLoss + + +# we want to make sure that HTTP errors will raise appropriate u1db errors, +# that is, fire errbacks with the appropriate failures, in the context of +# twisted. Because of that, we redefine the http body reader used by the HTTP +# client below. + +class ReadBodyProtocol(_ReadBodyProtocol): + """ + From original Twisted implementation, focused on adding our error + handling and ensuring that the proper u1db error is raised. + """ + + def __init__(self, response, deferred): + """ + Initialize the protocol, additionally storing the response headers. + """ + _ReadBodyProtocol.__init__( + self, response.code, response.phrase, deferred) + self.headers = response.headers + + # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks + def _error(self, respdic): + descr = respdic.get("error") + exc_cls = errors.wire_description_to_exc.get(descr) + if exc_cls is not None: + message = respdic.get("message") + self.deferred.errback(exc_cls(message)) + # ---8<--- end of snippet from u1db.remote.http_client + + def connectionLost(self, reason): + """ + Deliver the accumulated response bytes to the waiting L{Deferred}, if + the response body has been completely received without error. + """ + if reason.check(ResponseDone): + + body = b''.join(self.dataBuffer) + + # ---8<--- snippet from u1db.remote.http_client + if self.status in (200, 201): + self.deferred.callback(body) + elif self.status in http_errors.ERROR_STATUSES: + try: + respdic = json.loads(body) + except ValueError: + self.deferred.errback( + errors.HTTPError(self.status, body, self.headers)) + else: + self._error(respdic) + # special cases + elif self.status == 503: + self.deferred.errback(errors.Unavailable(body, self.headers)) + else: + self.deferred.errback( + errors.HTTPError(self.status, body, self.headers)) + # ---8<--- end of snippet from u1db.remote.http_client + + elif reason.check(PotentialDataLoss): + self.deferred.errback( + PartialDownloadError(self.status, self.message, + b''.join(self.dataBuffer))) + else: + self.deferred.errback(reason) + + +def readBody(response): + """ + Get the body of an L{IResponse} and return it as a byte string. + + This is a helper function for clients that don't want to incrementally + receive the body of an HTTP response. + + @param response: The HTTP response for which the body will be read. + @type response: L{IResponse} provider + + @return: A L{Deferred} which will fire with the body of the response. + Cancelling it will close the connection to the server immediately. + """ + def cancel(deferred): + """ + Cancel a L{readBody} call, close the connection to the HTTP server + immediately, if it is still open. + + @param deferred: The cancelled L{defer.Deferred}. + """ + abort = getAbort() + if abort is not None: + abort() + + d = defer.Deferred(cancel) + protocol = ReadBodyProtocol(response, d) + + def getAbort(): + return getattr(protocol.transport, 'abortConnection', None) + + response.deliverBody(protocol) + + if protocol.transport is not None and getAbort() is None: + warnings.warn( + 'Using readBody with a transport that does not have an ' + 'abortConnection method', + category=DeprecationWarning, + stacklevel=2) + + return d + + +class RequestBody(object): + """ + This class is a helper to generate send and fetch requests. + The expected format is something like: + [ + {headers}, + {entry1}, + {...}, + {entryN}, + ] + """ + + def __init__(self, **header_dict): + """ + Creates a new RequestBody holding header information. + + :param header_dict: A dictionary with the headers. + :type header_dict: dict + """ + self.headers = header_dict + self.entries = [] + + def insert_info(self, **entry_dict): + """ + Dumps an entry into JSON format and add it to entries list. + + :param entry_dict: Entry as a dictionary + :type entry_dict: dict + + :return: length of the entry after JSON dumps + :rtype: int + """ + entry = json.dumps(entry_dict) + self.entries.append(entry) + return len(entry) + + def pop(self, number=1): + """ + Removes an amount of entries and returns it formatted and ready + to be sent. + + :param number: number of entries to pop and format + :type number: int + + :return: formatted body ready to be sent + :rtype: str + """ + entries = [self.entries.pop(0) for i in xrange(number)] + return self.entries_to_str(entries) + + def __str__(self): + return self.entries_to_str(self.entries) + + def __len__(self): + return len(self.entries) + + def entries_to_str(self, entries=None): + """ + Format a list of entries into the body format expected + by the server. + + :param entries: entries to format + :type entries: list + + :return: formatted body ready to be sent + :rtype: str + """ + data = '[\r\n' + json.dumps(self.headers) + data += ''.join(',\r\n' + entry for entry in entries) + return data + '\r\n]' diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index ee3aacdb..c3c3dff5 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -261,6 +261,16 @@ class SoledadSecrets(object): logger.info("Could not find a secret in local storage.") return False + def _maybe_set_active_secret(self, active_secret): + """ + If no secret_id is already set, choose the passed active secret, or + just choose first secret available if none. + """ + if not self._secret_id: + if not active_secret: + active_secret = self._secrets.items()[0][0] + self.set_secret_id(active_secret) + def _load_secrets(self): """ Load storage secrets from local file. @@ -270,12 +280,7 @@ class SoledadSecrets(object): with open(self._secrets_path, 'r') as f: content = json.loads(f.read()) _, active_secret = self._import_recovery_document(content) - # choose first secret if no secret_id was given - if self._secret_id is None: - if active_secret is None: - self.set_secret_id(self._secrets.items()[0][0]) - else: - self.set_secret_id(active_secret) + self._maybe_set_active_secret(active_secret) # enlarge secret if needed enlarged = False if len(self._secrets[self._secret_id]) < self.GEN_SECRET_LENGTH: @@ -306,12 +311,8 @@ class SoledadSecrets(object): 'Found cryptographic secrets in shared recovery ' 'database.') _, active_secret = self._import_recovery_document(doc.content) + self._maybe_set_active_secret(active_secret) self._store_secrets() # save new secrets in local file - if self._secret_id is None: - if active_secret is None: - self.set_secret_id(self._secrets.items()[0][0]) - else: - self.set_secret_id(active_secret) else: # STAGE 3 - there are no secrets in server also, so # generate a secret and store it in remote db. @@ -432,13 +433,13 @@ class SoledadSecrets(object): :return: a document with encrypted key material in its contents :rtype: document.SoledadDocument """ - events.emit(events.SOLEDAD_DOWNLOADING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_DOWNLOADING_KEYS, self._uuid) db = self._shared_db if not db: logger.warning('No shared db found') return doc = db.get_doc(self._shared_db_doc_id()) - events.emit(events.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid) return doc def _put_secrets_in_shared_db(self): @@ -461,13 +462,13 @@ class SoledadSecrets(object): # fill doc with encrypted secrets doc.content = self._export_recovery_document() # upload secrets to server - events.emit(events.SOLEDAD_UPLOADING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_UPLOADING_KEYS, self._uuid) db = self._shared_db if not db: logger.warning('No shared db found') return db.put_doc(doc) - events.emit(events.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid) # # Management of secret for symmetric encryption. @@ -587,13 +588,13 @@ class SoledadSecrets(object): :return: The id of the generated secret. :rtype: str """ - events.emit(events.SOLEDAD_CREATING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_CREATING_KEYS, self._uuid) # generate random secret secret = os.urandom(self.GEN_SECRET_LENGTH) secret_id = sha256(secret).hexdigest() self._secrets[secret_id] = secret self._store_secrets() - events.emit(events.SOLEDAD_DONE_CREATING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_DONE_CREATING_KEYS, self._uuid) return secret_id def _store_secrets(self): diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 2151884a..22ddc87d 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -559,6 +559,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): """ Close the syncer and syncdb orderly """ + super(SQLCipherU1DBSync, self).close() # close all open syncers for url in self._syncers.keys(): _, syncer = self._syncers[url] diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 6c28e0be..1c762036 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -87,8 +87,7 @@ class CouchDocument(SoledadDocument): atomic and consistent update of the database. """ - def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, - syncable=True): + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False): """ Container for handling a document that is stored in couch backend. @@ -100,27 +99,10 @@ class CouchDocument(SoledadDocument): :type json: str :param has_conflicts: Boolean indicating if this document has conflicts :type has_conflicts: bool - :param syncable: Should this document be synced with remote replicas? - :type syncable: bool """ SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) - self._couch_rev = None - self._conflicts = None - self._transactions = None - - def _ensure_fetch_conflicts(self, get_conflicts_fun): - """ - Ensure conflict data has been fetched from the server. - - :param get_conflicts_fun: A function which, given the document id and - the couch revision, return the conflicted - versions of the current document. - :type get_conflicts_fun: function - """ - if self._conflicts is None: - self._conflicts = get_conflicts_fun(self.doc_id, - couch_rev=self.couch_rev) - self.has_conflicts = len(self._conflicts) > 0 + self.couch_rev = None + self.transactions = None def get_conflicts(self): """ @@ -149,7 +131,7 @@ class CouchDocument(SoledadDocument): :type doc: CouchDocument """ if self._conflicts is None: - raise Exception("Run self._ensure_fetch_conflicts first!") + raise Exception("Fetch conflicts first!") self._conflicts.append(doc) self.has_conflicts = len(self._conflicts) > 0 @@ -161,27 +143,48 @@ class CouchDocument(SoledadDocument): :type conflict_revs: [str] """ if self._conflicts is None: - raise Exception("Run self._ensure_fetch_conflicts first!") + raise Exception("Fetch conflicts first!") self._conflicts = filter( lambda doc: doc.rev not in conflict_revs, self._conflicts) self.has_conflicts = len(self._conflicts) > 0 - def _get_couch_rev(self): - return self._couch_rev - - def _set_couch_rev(self, rev): - self._couch_rev = rev - - couch_rev = property(_get_couch_rev, _set_couch_rev) - - def _get_transactions(self): - return self._transactions + def update(self, new_doc): + # update info + self.rev = new_doc.rev + if new_doc.is_tombstone(): + self.is_tombstone() + else: + self.content = new_doc.content + self.has_conflicts = new_doc.has_conflicts - def _set_transactions(self, rev): - self._transactions = rev + def prune_conflicts(self, doc_vcr, autoresolved_increment): + """ + Prune conflicts that are older then the current document's revision, or + whose content match to the current document's content. + Originally in u1db.CommonBackend - transactions = property(_get_transactions, _set_transactions) + :param doc: The document to have conflicts pruned. + :type doc: CouchDocument + :param doc_vcr: A vector clock representing the current document's + revision. + :type doc_vcr: u1db.vectorclock.VectorClock + """ + if self.has_conflicts: + autoresolved = False + c_revs_to_prune = [] + for c_doc in self._conflicts: + c_vcr = vectorclock.VectorClockRev(c_doc.rev) + if doc_vcr.is_newer(c_vcr): + c_revs_to_prune.append(c_doc.rev) + elif self.same_content_as(c_doc): + c_revs_to_prune.append(c_doc.rev) + doc_vcr.maximize(c_vcr) + autoresolved = True + if autoresolved: + doc_vcr.increment(autoresolved_increment) + self.rev = doc_vcr.as_str() + self.delete_conflicts(c_revs_to_prune) # monkey-patch the u1db http app to use CouchDocument @@ -482,13 +485,10 @@ class CouchDatabase(CommonBackend): Ensure that the design documents used by the backend exist on the couch database. """ - # we check for existence of one of the files, and put all of them if - # that one does not exist - try: - self._database['_design/docs'] - return - except ResourceNotFound: - for ddoc_name in ['docs', 'syncs', 'transactions']: + for ddoc_name in ['docs', 'syncs', 'transactions']: + try: + self._database.info(ddoc_name) + except ResourceNotFound: ddoc = json.loads( binascii.a2b_base64( getattr(ddocs, ddoc_name))) @@ -750,7 +750,6 @@ class CouchDatabase(CommonBackend): if check_for_conflicts \ and '_attachments' in result \ and 'u1db_conflicts' in result['_attachments']: - doc.has_conflicts = True doc.set_conflicts( self._build_conflicts( doc.doc_id, @@ -1044,7 +1043,7 @@ class CouchDatabase(CommonBackend): conflicts.append(doc) return conflicts - def _get_conflicts(self, doc_id, couch_rev=None): + def get_doc_conflicts(self, doc_id, couch_rev=None): """ Get the conflicted versions of a document. @@ -1059,32 +1058,21 @@ class CouchDatabase(CommonBackend): """ # request conflicts attachment from server params = {} + conflicts = [] if couch_rev is not None: params['rev'] = couch_rev # restric document's couch revision + else: + # TODO: move into resource logic! + first_entry = self._get_doc(doc_id, check_for_conflicts=True) + conflicts.append(first_entry) resource = self._database.resource(doc_id, 'u1db_conflicts') try: response = resource.get_json(**params) - return self._build_conflicts( + return conflicts + self._build_conflicts( doc_id, json.loads(response[2].read())) except ResourceNotFound: return [] - def get_doc_conflicts(self, doc_id): - """ - Get the list of conflicts for the given document. - - The order of the conflicts is such that the first entry is the value - that would be returned by "get_doc". - - :return: A list of the document entries that are conflicted. - :rtype: [CouchDocument] - """ - conflict_docs = self._get_conflicts(doc_id) - if len(conflict_docs) == 0: - return [] - this_doc = self._get_doc(doc_id, check_for_conflicts=True) - return [this_doc] + conflict_docs - def _get_replica_gen_and_trans_id(self, other_replica_uid): """ Return the last known generation and transaction id for the other db @@ -1140,9 +1128,11 @@ class CouchDatabase(CommonBackend): :param sync_id: The id of the current sync session. :type sync_id: str """ - self._do_set_replica_gen_and_trans_id( - other_replica_uid, other_generation, other_transaction_id, - number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id) + if other_replica_uid is not None and other_generation is not None: + self._do_set_replica_gen_and_trans_id( + other_replica_uid, other_generation, other_transaction_id, + number_of_docs=number_of_docs, doc_idx=doc_idx, + sync_id=sync_id) def _do_set_replica_gen_and_trans_id( self, other_replica_uid, other_generation, other_transaction_id, @@ -1206,70 +1196,6 @@ class CouchDatabase(CommonBackend): except ResourceNotFound as e: raise_missing_design_doc_error(e, ddoc_path) - def _add_conflict(self, doc, my_doc_rev, my_content): - """ - Add a conflict to the document. - - Note that this method does not actually update the backend; rather, it - updates the CouchDocument object which will provide the conflict data - when the atomic document update is made. - - :param doc: The document to have conflicts added to. - :type doc: CouchDocument - :param my_doc_rev: The revision of the conflicted document. - :type my_doc_rev: str - :param my_content: The content of the conflicted document as a JSON - serialized string. - :type my_content: str - """ - doc._ensure_fetch_conflicts(self._get_conflicts) - doc.add_conflict( - self._factory(doc_id=doc.doc_id, rev=my_doc_rev, - json=my_content)) - - def _delete_conflicts(self, doc, conflict_revs): - """ - Delete the conflicted revisions from the list of conflicts of C{doc}. - - Note that this method does not actually update the backend; rather, it - updates the CouchDocument object which will provide the conflict data - when the atomic document update is made. - - :param doc: The document to have conflicts deleted. - :type doc: CouchDocument - :param conflict_revs: A list of the revisions to be deleted. - :param conflict_revs: [str] - """ - doc._ensure_fetch_conflicts(self._get_conflicts) - doc.delete_conflicts(conflict_revs) - - def _prune_conflicts(self, doc, doc_vcr): - """ - Prune conflicts that are older then the current document's revision, or - whose content match to the current document's content. - - :param doc: The document to have conflicts pruned. - :type doc: CouchDocument - :param doc_vcr: A vector clock representing the current document's - revision. - :type doc_vcr: u1db.vectorclock.VectorClock - """ - if doc.has_conflicts is True: - autoresolved = False - c_revs_to_prune = [] - for c_doc in doc.get_conflicts(): - c_vcr = vectorclock.VectorClockRev(c_doc.rev) - if doc_vcr.is_newer(c_vcr): - c_revs_to_prune.append(c_doc.rev) - elif doc.same_content_as(c_doc): - c_revs_to_prune.append(c_doc.rev) - doc_vcr.maximize(c_vcr) - autoresolved = True - if autoresolved: - doc_vcr.increment(self._replica_uid) - doc.rev = doc_vcr.as_str() - self._delete_conflicts(doc, c_revs_to_prune) - def _force_doc_sync_conflict(self, doc): """ Add a conflict and force a document put. @@ -1278,9 +1204,9 @@ class CouchDatabase(CommonBackend): :type doc: CouchDocument """ my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) - self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) - self._add_conflict(doc, my_doc.rev, my_doc.get_json()) - doc.has_conflicts = True + doc.prune_conflicts( + vectorclock.VectorClockRev(doc.rev), self._replica_uid) + doc.add_conflict(my_doc) self._put_doc(my_doc, doc) def resolve_doc(self, doc, conflicted_doc_revs): @@ -1325,14 +1251,14 @@ class CouchDatabase(CommonBackend): # the newer doc version will supersede the one in the database, so # we copy conflicts before updating the backend. doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over. - self._delete_conflicts(doc, superseded_revs) + doc.delete_conflicts(superseded_revs) self._put_doc(cur_doc, doc) else: # the newer doc version does not supersede the one in the # database, so we will add a conflict to the database and copy # those over to the document the user has in her hands. - self._add_conflict(cur_doc, new_rev, doc.get_json()) - self._delete_conflicts(cur_doc, superseded_revs) + cur_doc.add_conflict(doc) + cur_doc.delete_conflicts(superseded_revs) self._put_doc(cur_doc, cur_doc) # just update conflicts # backend has been updated with current conflicts, now copy them # to the current document. @@ -1392,65 +1318,33 @@ class CouchDatabase(CommonBackend): 'converged', at_gen is the insertion/current generation. :rtype: (str, int) """ - cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) - # at this point, `doc` has arrived from the other syncing party, and - # we will decide what to do with it. - # First, we prepare the arriving doc to update couch database. - old_doc = doc - doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) - if cur_doc is not None: - doc.couch_rev = cur_doc.couch_rev - # fetch conflicts because we will eventually manipulate them - doc._ensure_fetch_conflicts(self._get_conflicts) - # from now on, it works just like u1db sqlite backend - doc_vcr = vectorclock.VectorClockRev(doc.rev) - if cur_doc is None: - cur_vcr = vectorclock.VectorClockRev(None) - else: - cur_vcr = vectorclock.VectorClockRev(cur_doc.rev) - self._validate_source(replica_uid, replica_gen, replica_trans_id) - if doc_vcr.is_newer(cur_vcr): - rev = doc.rev - self._prune_conflicts(doc, doc_vcr) - if doc.rev != rev: - # conflicts have been autoresolved - state = 'superseded' - else: - state = 'inserted' - self._put_doc(cur_doc, doc) - elif doc.rev == cur_doc.rev: - # magical convergence - state = 'converged' - elif cur_vcr.is_newer(doc_vcr): - # Don't add this to seen_ids, because we have something newer, - # so we should send it back, and we should not generate a - # conflict - state = 'superseded' - elif cur_doc.same_content_as(doc): - # the documents have been edited to the same thing at both ends - doc_vcr.maximize(cur_vcr) - doc_vcr.increment(self._replica_uid) - doc.rev = doc_vcr.as_str() - self._put_doc(cur_doc, doc) - state = 'superseded' - else: - state = 'conflicted' - if save_conflict: - self._force_doc_sync_conflict(doc) - if replica_uid is not None and replica_gen is not None: - self._set_replica_gen_and_trans_id( - replica_uid, replica_gen, replica_trans_id, - number_of_docs=number_of_docs, doc_idx=doc_idx, - sync_id=sync_id) - # update info - old_doc.rev = doc.rev - if doc.is_tombstone(): - old_doc.is_tombstone() - else: - old_doc.content = doc.content - old_doc.has_conflicts = doc.has_conflicts + if not isinstance(doc, CouchDocument): + doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) + self._save_source_info(replica_uid, replica_gen, + replica_trans_id, number_of_docs, + doc_idx, sync_id) + my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if my_doc is not None: + my_doc.set_conflicts( + self.get_doc_conflicts(my_doc.doc_id, my_doc.couch_rev)) + state, save_doc = _process_incoming_doc( + my_doc, doc, save_conflict, self.replica_uid) + if save_doc: + self._put_doc(my_doc, save_doc) + doc.update(save_doc) return state, self._get_generation() + def _save_source_info(self, replica_uid, replica_gen, replica_trans_id, + number_of_docs, doc_idx, sync_id): + """ + Validate and save source information. + """ + self._validate_source(replica_uid, replica_gen, replica_trans_id) + self._set_replica_gen_and_trans_id( + replica_uid, replica_gen, replica_trans_id, + number_of_docs=number_of_docs, doc_idx=doc_idx, + sync_id=sync_id) + def get_docs(self, doc_ids, check_for_conflicts=True, include_deleted=False): """ @@ -1495,6 +1389,13 @@ class CouchDatabase(CommonBackend): continue yield t._doc + def _prune_conflicts(self, doc, doc_vcr): + """ + Overrides original method, but it is implemented elsewhere for + simplicity. + """ + doc.prune_conflicts(doc_vcr, self._replica_uid) + def _new_resource(self, *path): """ Return a new resource for accessing a couch database. @@ -1546,7 +1447,7 @@ class CouchServerState(ServerState): :param couch_url: The URL for the couch database. :type couch_url: str """ - self._couch_url = couch_url + self.couch_url = couch_url def open_database(self, dbname): """ @@ -1559,7 +1460,7 @@ class CouchServerState(ServerState): :rtype: CouchDatabase """ return CouchDatabase( - self._couch_url, + self.couch_url, dbname, ensure_ddocs=False) @@ -1594,21 +1495,52 @@ class CouchServerState(ServerState): """ raise Unauthorized() - def _set_couch_url(self, url): - """ - Set the couchdb URL - - :param url: CouchDB URL - :type url: str - """ - self._couch_url = url - - def _get_couch_url(self): - """ - Return CouchDB URL - :rtype: str - """ - return self._couch_url - - couch_url = property(_get_couch_url, _set_couch_url, doc='CouchDB URL') +def _process_incoming_doc(my_doc, other_doc, save_conflict, replica_uid): + """ + Check document, save and return state. + """ + # at this point, `doc` has arrived from the other syncing party, and + # we will decide what to do with it. + # First, we prepare the arriving doc to update couch database. + new_doc = CouchDocument( + other_doc.doc_id, other_doc.rev, other_doc.get_json()) + if my_doc is None: + return 'inserted', new_doc + new_doc.couch_rev = my_doc.couch_rev + new_doc.set_conflicts(my_doc.get_conflicts()) + # fetch conflicts because we will eventually manipulate them + # from now on, it works just like u1db sqlite backend + doc_vcr = vectorclock.VectorClockRev(new_doc.rev) + cur_vcr = vectorclock.VectorClockRev(my_doc.rev) + if doc_vcr.is_newer(cur_vcr): + rev = new_doc.rev + new_doc.prune_conflicts(doc_vcr, replica_uid) + if new_doc.rev != rev: + # conflicts have been autoresolved + return 'superseded', new_doc + else: + return'inserted', new_doc + elif new_doc.rev == my_doc.rev: + # magical convergence + return 'converged', None + elif cur_vcr.is_newer(doc_vcr): + # Don't add this to seen_ids, because we have something newer, + # so we should send it back, and we should not generate a + # conflict + other_doc.update(new_doc) + return 'superseded', None + elif my_doc.same_content_as(new_doc): + # the documents have been edited to the same thing at both ends + doc_vcr.maximize(cur_vcr) + doc_vcr.increment(replica_uid) + new_doc.rev = doc_vcr.as_str() + return 'superseded', new_doc + else: + if save_conflict: + new_doc.prune_conflicts( + vectorclock.VectorClockRev(new_doc.rev), replica_uid) + new_doc.add_conflict(my_doc) + return 'conflicted', new_doc + other_doc.update(new_doc) + return 'conflicted', None diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 468ad8d8..a08ffd16 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -25,6 +25,7 @@ import json from urlparse import urljoin from couchdb.client import Server +from uuid import uuid4 from testscenarios import TestWithScenarios @@ -42,7 +43,6 @@ from leap.soledad.common.tests.util import sync_via_synchronizer from leap.soledad.common.tests.u1db_tests import test_backends from leap.soledad.common.tests.u1db_tests import DatabaseBaseTests -from leap.soledad.common.tests.u1db_tests import TestCaseWithServer from u1db.backends.inmemory import InMemoryIndex @@ -56,8 +56,8 @@ class TestCouchBackendImpl(CouchDBTestCase): def test__allocate_doc_id(self): db = couch.CouchDatabase.open_database( urljoin( - 'http://localhost:' + str(self.wrapper.port), - 'u1db_tests' + 'http://localhost:' + str(self.couch_port), + ('test-%s' % uuid4().hex) ), create=True, ensure_ddocs=True) @@ -66,6 +66,7 @@ class TestCouchBackendImpl(CouchDBTestCase): self.assertEqual(34, len(doc_id1)) int(doc_id1[len('D-'):], 16) self.assertNotEqual(doc_id1, db._allocate_doc_id()) + self.delete_db(db._dbname) # ----------------------------------------------------------------------------- @@ -73,25 +74,28 @@ class TestCouchBackendImpl(CouchDBTestCase): # ----------------------------------------------------------------------------- def make_couch_database_for_test(test, replica_uid): - port = str(test.wrapper.port) - return couch.CouchDatabase.open_database( - urljoin('http://localhost:' + port, replica_uid), + port = str(test.couch_port) + dbname = ('test-%s' % uuid4().hex) + db = couch.CouchDatabase.open_database( + urljoin('http://localhost:' + port, dbname), create=True, replica_uid=replica_uid or 'test', ensure_ddocs=True) + test.addCleanup(test.delete_db, dbname) + return db def copy_couch_database_for_test(test, db): - port = str(test.wrapper.port) + port = str(test.couch_port) couch_url = 'http://localhost:' + port - new_dbname = db._replica_uid + '_copy' + new_dbname = db._dbname + '_copy' new_db = couch.CouchDatabase.open_database( urljoin(couch_url, new_dbname), create=True, replica_uid=db._replica_uid or 'test') # copy all docs session = couch.Session() - old_couch_db = Server(couch_url, session=session)[db._replica_uid] + old_couch_db = Server(couch_url, session=session)[db._dbname] new_couch_db = Server(couch_url, session=session)[new_dbname] for doc_id in old_couch_db: doc = old_couch_db.get(doc_id) @@ -143,24 +147,6 @@ class CouchTests( scenarios = COUCH_SCENARIOS - def setUp(self): - test_backends.AllDatabaseTests.setUp(self) - # save db info because of test_close - self._url = self.db._url - self._dbname = self.db._dbname - - def tearDown(self): - # if current test is `test_close` we have to use saved objects to - # delete the database because the close() method will have removed the - # references needed to do it using the CouchDatabase. - if self.id().endswith('test_couch.CouchTests.test_close(couch)'): - session = couch.Session() - server = Server(url=self._url, session=session) - del(server[self._dbname]) - else: - self.db.delete_database() - test_backends.AllDatabaseTests.tearDown(self) - class CouchDatabaseTests( TestWithScenarios, @@ -169,10 +155,6 @@ class CouchDatabaseTests( scenarios = COUCH_SCENARIOS - def tearDown(self): - self.db.delete_database() - test_backends.LocalDatabaseTests.tearDown(self) - class CouchValidateGenNTransIdTests( TestWithScenarios, @@ -181,10 +163,6 @@ class CouchValidateGenNTransIdTests( scenarios = COUCH_SCENARIOS - def tearDown(self): - self.db.delete_database() - test_backends.LocalDatabaseValidateGenNTransIdTests.tearDown(self) - class CouchValidateSourceGenTests( TestWithScenarios, @@ -193,10 +171,6 @@ class CouchValidateSourceGenTests( scenarios = COUCH_SCENARIOS - def tearDown(self): - self.db.delete_database() - test_backends.LocalDatabaseValidateSourceGenTests.tearDown(self) - class CouchWithConflictsTests( TestWithScenarios, @@ -205,10 +179,6 @@ class CouchWithConflictsTests( scenarios = COUCH_SCENARIOS - def tearDown(self): - self.db.delete_database() - test_backends.LocalDatabaseWithConflictsTests.tearDown(self) - # Notice: the CouchDB backend does not have indexing capabilities, so we do # not test indexing now. @@ -237,7 +207,6 @@ nested_doc = tests.nested_doc class CouchDatabaseSyncTargetTests( TestWithScenarios, DatabaseBaseTests, - TestCaseWithServer, CouchDBTestCase): # TODO: implement _set_trace_hook(_shallow) in CouchSyncTarget so @@ -260,26 +229,13 @@ class CouchDatabaseSyncTargetTests( def setUp(self): CouchDBTestCase.setUp(self) - # from DatabaseBaseTests.setUp - self.db = self.create_database('test') - # from TestCaseWithServer.setUp - self.server = self.server_thread = self.port = None # other stuff self.db, self.st = self.create_db_and_target(self) self.other_changes = [] def tearDown(self): + self.db.close() CouchDBTestCase.tearDown(self) - # from TestCaseWithServer.tearDown - if self.server is not None: - self.server.shutdown() - self.server_thread.join() - self.server.server_close() - if self.port: - self.port.stopListening() - # from DatabaseBaseTests.tearDown - if hasattr(self, 'db') and self.db is not None: - self.db.close() def receive_doc(self, doc, gen, trans_id): self.other_changes.append( @@ -724,17 +680,8 @@ class CouchDatabaseSyncTests( self.db3, self.db1_copy, self.db2_copy ]: if db is not None: - db.delete_database() + self.delete_db(db._dbname) db.close() - for replica_uid, dbname in [ - ('test1_copy', 'source'), - ('test2_copy', 'target'), - ('test3', 'target') - ]: - db = self.create_database(replica_uid, dbname) - db.delete_database() - # cleanup connections to avoid leaking of file descriptors - db.close() DatabaseBaseTests.tearDown(self) def assertLastExchangeLog(self, db, expected): @@ -1203,7 +1150,7 @@ class CouchDatabaseSyncTests( self.db1 = self.create_database('test1', 'both') self.db2 = self.create_database('test2', 'both') doc1 = self.db1.create_doc_from_json('{"a": 1}', doc_id='the-doc') - db3 = self.create_database('test3', 'both') + self.db3 = self.create_database('test3', 'both') self.sync(self.db2, self.db1) self.assertEqual( self.db1._get_generation_info(), @@ -1211,20 +1158,20 @@ class CouchDatabaseSyncTests( self.assertEqual( self.db2._get_generation_info(), self.db1._get_replica_gen_and_trans_id(self.db2._replica_uid)) - self.sync(db3, self.db1) + self.sync(self.db3, self.db1) # update on 2 doc2 = self.make_document('the-doc', doc1.rev, '{"a": 2}') self.db2.put_doc(doc2) - self.sync(self.db2, db3) - self.assertEqual(db3.get_doc('the-doc').rev, doc2.rev) + self.sync(self.db2, self.db3) + self.assertEqual(self.db3.get_doc('the-doc').rev, doc2.rev) # update on 1 doc1.set_json('{"a": 3}') self.db1.put_doc(doc1) # conflicts self.sync(self.db2, self.db1) - self.sync(db3, self.db1) + self.sync(self.db3, self.db1) self.assertTrue(self.db2.get_doc('the-doc').has_conflicts) - self.assertTrue(db3.get_doc('the-doc').has_conflicts) + self.assertTrue(self.db3.get_doc('the-doc').has_conflicts) # resolve conflicts = self.db2.get_doc_conflicts('the-doc') doc4 = self.make_document('the-doc', None, '{"a": 4}') @@ -1233,38 +1180,38 @@ class CouchDatabaseSyncTests( doc2 = self.db2.get_doc('the-doc') self.assertEqual(doc4.get_json(), doc2.get_json()) self.assertFalse(doc2.has_conflicts) - self.sync(self.db2, db3) - doc3 = db3.get_doc('the-doc') + self.sync(self.db2, self.db3) + doc3 = self.db3.get_doc('the-doc') self.assertEqual(doc4.get_json(), doc3.get_json()) self.assertFalse(doc3.has_conflicts) def test_sync_supersedes_conflicts(self): self.db1 = self.create_database('test1', 'both') self.db2 = self.create_database('test2', 'target') - db3 = self.create_database('test3', 'both') + self.db3 = self.create_database('test3', 'both') doc1 = self.db1.create_doc_from_json('{"a": 1}', doc_id='the-doc') self.db2.create_doc_from_json('{"b": 1}', doc_id='the-doc') - db3.create_doc_from_json('{"c": 1}', doc_id='the-doc') - self.sync(db3, self.db1) + self.db3.create_doc_from_json('{"c": 1}', doc_id='the-doc') + self.sync(self.db3, self.db1) self.assertEqual( self.db1._get_generation_info(), - db3._get_replica_gen_and_trans_id(self.db1._replica_uid)) + self.db3._get_replica_gen_and_trans_id(self.db1._replica_uid)) self.assertEqual( - db3._get_generation_info(), - self.db1._get_replica_gen_and_trans_id(db3._replica_uid)) - self.sync(db3, self.db2) + self.db3._get_generation_info(), + self.db1._get_replica_gen_and_trans_id(self.db3._replica_uid)) + self.sync(self.db3, self.db2) self.assertEqual( self.db2._get_generation_info(), - db3._get_replica_gen_and_trans_id(self.db2._replica_uid)) + self.db3._get_replica_gen_and_trans_id(self.db2._replica_uid)) self.assertEqual( - db3._get_generation_info(), - self.db2._get_replica_gen_and_trans_id(db3._replica_uid)) - self.assertEqual(3, len(db3.get_doc_conflicts('the-doc'))) + self.db3._get_generation_info(), + self.db2._get_replica_gen_and_trans_id(self.db3._replica_uid)) + self.assertEqual(3, len(self.db3.get_doc_conflicts('the-doc'))) doc1.set_json('{"a": 2}') self.db1.put_doc(doc1) - self.sync(db3, self.db1) + self.sync(self.db3, self.db1) # original doc1 should have been removed from conflicts - self.assertEqual(3, len(db3.get_doc_conflicts('the-doc'))) + self.assertEqual(3, len(self.db3.get_doc_conflicts('the-doc'))) def test_sync_stops_after_get_sync_info(self): self.db1 = self.create_database('test1', 'source') @@ -1283,79 +1230,78 @@ class CouchDatabaseSyncTests( self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1') self.assertRaises( u1db_errors.InvalidReplicaUID, self.sync, self.db1, self.db2) - # remove the reference to db2 to avoid double deleting on tearDown - self.db2.close() - self.db2 = None def test_sync_detects_rollback_in_source(self): self.db1 = self.create_database('test1', 'source') self.db2 = self.create_database('test2', 'target') self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1') self.sync(self.db1, self.db2) - db1_copy = self.copy_database(self.db1) + self.db1_copy = self.copy_database(self.db1) self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc2') self.sync(self.db1, self.db2) self.assertRaises( - u1db_errors.InvalidGeneration, self.sync, db1_copy, self.db2) + u1db_errors.InvalidGeneration, self.sync, self.db1_copy, self.db2) def test_sync_detects_rollback_in_target(self): self.db1 = self.create_database('test1', 'source') self.db2 = self.create_database('test2', 'target') self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent") self.sync(self.db1, self.db2) - db2_copy = self.copy_database(self.db2) + self.db2_copy = self.copy_database(self.db2) self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc2') self.sync(self.db1, self.db2) self.assertRaises( - u1db_errors.InvalidGeneration, self.sync, self.db1, db2_copy) + u1db_errors.InvalidGeneration, self.sync, self.db1, self.db2_copy) def test_sync_detects_diverged_source(self): self.db1 = self.create_database('test1', 'source') self.db2 = self.create_database('test2', 'target') - db3 = self.copy_database(self.db1) + self.db3 = self.copy_database(self.db1) self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent") - db3.create_doc_from_json(tests.simple_doc, doc_id="divergent") + self.db3.create_doc_from_json(tests.simple_doc, doc_id="divergent") self.sync(self.db1, self.db2) self.assertRaises( - u1db_errors.InvalidTransactionId, self.sync, db3, self.db2) + u1db_errors.InvalidTransactionId, self.sync, self.db3, self.db2) def test_sync_detects_diverged_target(self): self.db1 = self.create_database('test1', 'source') self.db2 = self.create_database('test2', 'target') - db3 = self.copy_database(self.db2) - db3.create_doc_from_json(tests.nested_doc, doc_id="divergent") + self.db3 = self.copy_database(self.db2) + self.db3.create_doc_from_json(tests.nested_doc, doc_id="divergent") self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent") self.sync(self.db1, self.db2) self.assertRaises( - u1db_errors.InvalidTransactionId, self.sync, self.db1, db3) + u1db_errors.InvalidTransactionId, self.sync, self.db1, self.db3) def test_sync_detects_rollback_and_divergence_in_source(self): self.db1 = self.create_database('test1', 'source') self.db2 = self.create_database('test2', 'target') self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1') self.sync(self.db1, self.db2) - db1_copy = self.copy_database(self.db1) + self.db1_copy = self.copy_database(self.db1) self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc2') self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc3') self.sync(self.db1, self.db2) - db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2') - db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3') + self.db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2') + self.db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3') self.assertRaises( - u1db_errors.InvalidTransactionId, self.sync, db1_copy, self.db2) + u1db_errors.InvalidTransactionId, self.sync, + self.db1_copy, self.db2) def test_sync_detects_rollback_and_divergence_in_target(self): self.db1 = self.create_database('test1', 'source') self.db2 = self.create_database('test2', 'target') self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent") self.sync(self.db1, self.db2) - db2_copy = self.copy_database(self.db2) + self.db2_copy = self.copy_database(self.db2) self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc2') self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc3') self.sync(self.db1, self.db2) - db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2') - db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3') + self.db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2') + self.db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3') self.assertRaises( - u1db_errors.InvalidTransactionId, self.sync, self.db1, db2_copy) + u1db_errors.InvalidTransactionId, self.sync, + self.db1, self.db2_copy) def test_optional_sync_preserve_json(self): self.db1 = self.create_database('test1', 'source') @@ -1373,10 +1319,14 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): def setUp(self): CouchDBTestCase.setUp(self) + + def create_db(self, ensure=True, dbname=None): + if not dbname: + dbname = ('test-%s' % uuid4().hex) self.db = couch.CouchDatabase.open_database( - urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'), + urljoin('http://127.0.0.1:%d' % self.couch_port, dbname), create=True, - ensure_ddocs=False) # note that we don't enforce ddocs here + ensure_ddocs=ensure) def tearDown(self): self.db.delete_database() @@ -1388,6 +1338,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): Test that all methods that access design documents will raise if the design docs are not present. """ + self.create_db(ensure=False) # _get_generation() self.assertRaises( errors.MissingDesignDocError, @@ -1418,10 +1369,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): Test that all methods that access design documents list functions will raise if the functions are not present. """ - self.db = couch.CouchDatabase.open_database( - urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'), - create=True, - ensure_ddocs=True) + self.create_db(ensure=True) # erase views from _design/transactions transactions = self.db._database['_design/transactions'] transactions['lists'] = {} @@ -1448,10 +1396,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): Test that all methods that access design documents list functions will raise if the functions are not present. """ - self.db = couch.CouchDatabase.open_database( - urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'), - create=True, - ensure_ddocs=True) + self.create_db(ensure=True) # erase views from _design/transactions transactions = self.db._database['_design/transactions'] del transactions['lists'] @@ -1478,10 +1423,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): Test that all methods that access design documents' named views will raise if the views are not present. """ - self.db = couch.CouchDatabase.open_database( - urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'), - create=True, - ensure_ddocs=True) + self.create_db(ensure=True) # erase views from _design/docs docs = self.db._database['_design/docs'] del docs['views'] @@ -1520,10 +1462,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): Test that all methods that access design documents will raise if the design docs are not present. """ - self.db = couch.CouchDatabase.open_database( - urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'), - create=True, - ensure_ddocs=True) + self.create_db(ensure=True) # delete _design/docs del self.db._database['_design/docs'] # delete _design/syncs @@ -1554,3 +1493,16 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocDeletedError, self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) + + def test_ensure_ddoc_independently(self): + """ + Test that a missing ddocs other than _design/docs will be ensured + even if _design/docs is there. + """ + self.create_db(ensure=True) + del self.db._database['_design/transactions'] + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._get_transaction_log) + self.create_db(ensure=True, dbname=self.db._dbname) + self.db._get_transaction_log() diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index c488822e..25f709ca 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -23,6 +23,7 @@ import threading from urlparse import urljoin from twisted.internet import defer +from uuid import uuid4 from leap.soledad.client import Soledad from leap.soledad.common.couch import CouchDatabase, CouchServerState @@ -55,7 +56,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): sync_target = soledad_sync_target - def _soledad_instance(self, user='user-uuid', passphrase=u'123', + def _soledad_instance(self, user=None, passphrase=u'123', prefix='', secrets_path='secrets.json', local_db_path='soledad.u1db', server_url='', @@ -63,6 +64,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): """ Instantiate Soledad. """ + user = user or self.user # this callback ensures we save a document which is sent to the shared # db. @@ -83,15 +85,15 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): return soledad def make_app(self): - self.request_state = CouchServerState(self._couch_url) + self.request_state = CouchServerState(self.couch_url) return self.make_app_after_state(self.request_state) def setUp(self): TestCaseWithServer.setUp(self) CouchDBTestCase.setUp(self) - self._couch_url = 'http://localhost:' + str(self.wrapper.port) + self.user = ('user-%s' % uuid4().hex) self.db = CouchDatabase.open_database( - urljoin(self._couch_url, 'user-user-uuid'), + urljoin(self.couch_url, 'user-' + self.user), create=True, replica_uid='replica', ensure_ddocs=True) diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index 5ffa2a63..f512d6c1 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -50,7 +50,7 @@ from leap.soledad.server.auth import URLToAuthorization def _couch_ensure_database(self, dbname): db = CouchDatabase.open_database( - self._couch_url + '/' + dbname, + self.couch_url + '/' + dbname, create=True, ensure_ddocs=True) return db, db._replica_uid @@ -325,7 +325,7 @@ class EncryptedSyncTestCase( shared_db=self.get_default_shared_mock(_put_doc_side_effect)) def make_app(self): - self.request_state = CouchServerState(self._couch_url) + self.request_state = CouchServerState(self.couch_url) return self.make_app_with_state(self.request_state) def setUp(self): @@ -333,7 +333,6 @@ class EncryptedSyncTestCase( # dependencies. # XXX explain better CouchDBTestCase.setUp(self) - self._couch_url = 'http://localhost:' + str(self.wrapper.port) self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") TestCaseWithServer.setUp(self) @@ -368,7 +367,7 @@ class EncryptedSyncTestCase( # ensure remote db exists before syncing db = CouchDatabase.open_database( - urljoin(self._couch_url, 'user-' + user), + urljoin(self.couch_url, 'user-' + user), create=True, ensure_ddocs=True) @@ -494,27 +493,18 @@ class LockResourceTestCase( # dependencies. # XXX explain better CouchDBTestCase.setUp(self) - self._couch_url = 'http://localhost:' + str(self.wrapper.port) self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") TestCaseWithServer.setUp(self) # create the databases - CouchDatabase.open_database( - urljoin(self._couch_url, 'shared'), - create=True, - ensure_ddocs=True) - CouchDatabase.open_database( - urljoin(self._couch_url, 'tokens'), + db = CouchDatabase.open_database( + urljoin(self.couch_url, ('shared-%s' % (uuid4().hex))), create=True, ensure_ddocs=True) - self._state = CouchServerState(self._couch_url) + self.addCleanup(db.delete_database) + self._state = CouchServerState(self.couch_url) + self._state.open_database = mock.Mock(return_value=db) def tearDown(self): - # delete remote database - db = CouchDatabase.open_database( - urljoin(self._couch_url, 'shared'), - create=True, - ensure_ddocs=True) - db.delete_database() CouchDBTestCase.tearDown(self) TestCaseWithServer.tearDown(self) diff --git a/common/src/leap/soledad/common/tests/test_soledad.py b/common/src/leap/soledad/common/tests/test_soledad.py index bd356858..85d6734e 100644 --- a/common/src/leap/soledad/common/tests/test_soledad.py +++ b/common/src/leap/soledad/common/tests/test_soledad.py @@ -223,7 +223,7 @@ class SoledadSignalingTestCase(BaseSoledadTest): def setUp(self): # mock signaling soledad.client.signal = Mock() - soledad.client.secrets.events.emit = Mock() + soledad.client.secrets.events.emit_async = Mock() # run parent's setUp BaseSoledadTest.setUp(self) @@ -245,57 +245,57 @@ class SoledadSignalingTestCase(BaseSoledadTest): - downloading keys / done downloading keys. - uploading keys / done uploading keys. """ - soledad.client.secrets.events.emit.reset_mock() + soledad.client.secrets.events.emit_async.reset_mock() # get a fresh instance so it emits all bootstrap signals sol = self._soledad_instance( secrets_path='alternative_stage3.json', local_db_path='alternative_stage3.u1db') # reverse call order so we can verify in the order the signals were # expected - soledad.client.secrets.events.emit.mock_calls.reverse() - soledad.client.secrets.events.emit.call_args = \ - soledad.client.secrets.events.emit.call_args_list[0] - soledad.client.secrets.events.emit.call_args_list.reverse() + soledad.client.secrets.events.emit_async.mock_calls.reverse() + soledad.client.secrets.events.emit_async.call_args = \ + soledad.client.secrets.events.emit_async.call_args_list[0] + soledad.client.secrets.events.emit_async.call_args_list.reverse() # downloading keys signals - soledad.client.secrets.events.emit.assert_called_with( + soledad.client.secrets.events.emit_async.assert_called_with( catalog.SOLEDAD_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.client.secrets.events.emit) - soledad.client.secrets.events.emit.assert_called_with( + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, ADDRESS, ) # creating keys signals - self._pop_mock_call(soledad.client.secrets.events.emit) - soledad.client.secrets.events.emit.assert_called_with( + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( catalog.SOLEDAD_CREATING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.client.secrets.events.emit) - soledad.client.secrets.events.emit.assert_called_with( + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( catalog.SOLEDAD_DONE_CREATING_KEYS, ADDRESS, ) # downloading once more (inside _put_keys_in_shared_db) - self._pop_mock_call(soledad.client.secrets.events.emit) - soledad.client.secrets.events.emit.assert_called_with( + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( catalog.SOLEDAD_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.client.secrets.events.emit) - soledad.client.secrets.events.emit.assert_called_with( + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, ADDRESS, ) # uploading keys signals - self._pop_mock_call(soledad.client.secrets.events.emit) - soledad.client.secrets.events.emit.assert_called_with( + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( catalog.SOLEDAD_UPLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.client.secrets.events.emit) - soledad.client.secrets.events.emit.assert_called_with( + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( catalog.SOLEDAD_DONE_UPLOADING_KEYS, ADDRESS, ) @@ -316,7 +316,7 @@ class SoledadSignalingTestCase(BaseSoledadTest): doc.content = sol.secrets._export_recovery_document() sol.close() # reset mock - soledad.client.secrets.events.emit.reset_mock() + soledad.client.secrets.events.emit_async.reset_mock() # get a fresh instance so it emits all bootstrap signals shared_db = self.get_default_shared_mock(get_doc_return_value=doc) sol = self._soledad_instance( @@ -325,17 +325,17 @@ class SoledadSignalingTestCase(BaseSoledadTest): shared_db_class=shared_db) # reverse call order so we can verify in the order the signals were # expected - soledad.client.secrets.events.emit.mock_calls.reverse() - soledad.client.secrets.events.emit.call_args = \ - soledad.client.secrets.events.emit.call_args_list[0] - soledad.client.secrets.events.emit.call_args_list.reverse() + soledad.client.secrets.events.emit_async.mock_calls.reverse() + soledad.client.secrets.events.emit_async.call_args = \ + soledad.client.secrets.events.emit_async.call_args_list[0] + soledad.client.secrets.events.emit_async.call_args_list.reverse() # assert download keys signals - soledad.client.secrets.events.emit.assert_called_with( + soledad.client.secrets.events.emit_async.assert_called_with( catalog.SOLEDAD_DOWNLOADING_KEYS, ADDRESS, ) - self._pop_mock_call(soledad.client.secrets.events.emit) - soledad.client.secrets.events.emit.assert_called_with( + self._pop_mock_call(soledad.client.secrets.events.emit_async) + soledad.client.secrets.events.emit_async.assert_called_with( catalog.SOLEDAD_DONE_DOWNLOADING_KEYS, ADDRESS, ) @@ -369,7 +369,7 @@ class SoledadSignalingTestCase(BaseSoledadTest): yield sol.sync() # assert the signal has been emitted - soledad.client.events.emit.assert_called_with( + soledad.client.events.emit_async.assert_called_with( catalog.SOLEDAD_DONE_DATA_SYNC, ADDRESS, ) diff --git a/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py b/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py index c57d6f61..439fc070 100644 --- a/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py +++ b/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py @@ -19,29 +19,26 @@ Test sqlcipher backend sync. """ -import json +import os from u1db import sync from u1db import vectorclock from u1db import errors +from uuid import uuid4 from testscenarios import TestWithScenarios -from urlparse import urljoin -from twisted.internet import defer - -from leap.soledad.common import couch from leap.soledad.common.crypto import ENC_SCHEME_KEY from leap.soledad.client.http_target import SoledadHTTPSyncTarget from leap.soledad.client.crypto import decrypt_doc_dict -from leap.soledad.client.sqlcipher import SQLCipherDatabase from leap.soledad.common.tests import u1db_tests as tests from leap.soledad.common.tests.test_sqlcipher import SQLCIPHER_SCENARIOS from leap.soledad.common.tests.util import make_soledad_app +from leap.soledad.common.tests.test_sync_target import \ + SoledadDatabaseSyncTargetTests from leap.soledad.common.tests.util import soledad_sync_target from leap.soledad.common.tests.util import BaseSoledadTest -from leap.soledad.common.tests.util import SoledadWithCouchServerMixin # ----------------------------------------------------------------------------- @@ -97,23 +94,6 @@ class SQLCipherDatabaseSyncTests( self._use_tracking = {} super(tests.DatabaseBaseTests, self).setUp() - def tearDown(self): - super(tests.DatabaseBaseTests, self).tearDown() - if hasattr(self, 'db1') and isinstance(self.db1, SQLCipherDatabase): - self.db1.close() - if hasattr(self, 'db1_copy') \ - and isinstance(self.db1_copy, SQLCipherDatabase): - self.db1_copy.close() - if hasattr(self, 'db2') \ - and isinstance(self.db2, SQLCipherDatabase): - self.db2.close() - if hasattr(self, 'db2_copy') \ - and isinstance(self.db2_copy, SQLCipherDatabase): - self.db2_copy.close() - if hasattr(self, 'db3') \ - and isinstance(self.db3, SQLCipherDatabase): - self.db3.close() - def create_database(self, replica_uid, sync_role=None): if replica_uid == 'test' and sync_role is None: # created up the chain by base class but unused @@ -121,6 +101,7 @@ class SQLCipherDatabaseSyncTests( db = self.create_database_for_role(replica_uid, sync_role) if sync_role: self._use_tracking[db] = (replica_uid, sync_role) + self.addCleanup(db.close) return db def create_database_for_role(self, replica_uid, sync_role): @@ -729,38 +710,30 @@ class SQLCipherDatabaseSyncTests( errors.InvalidTransactionId, self.sync, self.db1, self.db2_copy) -def _make_local_db_and_token_http_target(test, path='test'): +def make_local_db_and_soledad_target( + test, path='test', + source_replica_uid=uuid4().hex): test.startTwistedServer() - # ensure remote db exists before syncing - db = couch.CouchDatabase.open_database( - urljoin(test._couch_url, 'test'), - create=True, - replica_uid='test', - ensure_ddocs=True) - - replica_uid = test._soledad._dbpool.replica_uid + replica_uid = os.path.basename(path) + db = test.request_state._create_database(replica_uid) sync_db = test._soledad._sync_db sync_enc_pool = test._soledad._sync_enc_pool st = soledad_sync_target( - test, path, - source_replica_uid=replica_uid, + test, db._dbname, + source_replica_uid=source_replica_uid, sync_db=sync_db, sync_enc_pool=sync_enc_pool) return db, st target_scenarios = [ ('leap', { - 'create_db_and_target': _make_local_db_and_token_http_target, + 'create_db_and_target': make_local_db_and_soledad_target, 'make_app_with_state': make_soledad_app, 'do_sync': sync_via_synchronizer_and_soledad}), ] -class SQLCipherSyncTargetTests( - TestWithScenarios, - tests.DatabaseBaseTests, - tests.TestCaseWithServer, - SoledadWithCouchServerMixin): +class SQLCipherSyncTargetTests(SoledadDatabaseSyncTargetTests): # TODO: implement _set_trace_hook(_shallow) in SoledadHTTPSyncTarget so # skipped tests can be succesfully executed. @@ -769,368 +742,3 @@ class SQLCipherSyncTargetTests( target_scenarios)) whitebox = False - - def setUp(self): - super(tests.DatabaseBaseTests, self).setUp() - self.db, self.st = self.create_db_and_target(self) - self.addCleanup(self.st.close) - self.other_changes = [] - - def tearDown(self): - super(tests.DatabaseBaseTests, self).tearDown() - - def assertLastExchangeLog(self, db, expected): - log = getattr(db, '_last_exchange_log', None) - if log is None: - return - self.assertEqual(expected, log) - - def receive_doc(self, doc, gen, trans_id): - self.other_changes.append( - (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id)) - - def make_app(self): - self.request_state = couch.CouchServerState(self._couch_url) - return self.make_app_with_state(self.request_state) - - def set_trace_hook(self, callback, shallow=False): - setter = (self.st._set_trace_hook if not shallow else - self.st._set_trace_hook_shallow) - try: - setter(callback) - except NotImplementedError: - self.skipTest("%s does not implement _set_trace_hook" - % (self.st.__class__.__name__,)) - - def test_get_sync_target(self): - self.assertIsNot(None, self.st) - - @defer.inlineCallbacks - def test_get_sync_info(self): - sync_info = yield self.st.get_sync_info('other') - self.assertEqual( - ('test', 0, '', 0, ''), sync_info) - - @defer.inlineCallbacks - def test_create_doc_updates_sync_info(self): - sync_info = yield self.st.get_sync_info('other') - self.assertEqual( - ('test', 0, '', 0, ''), sync_info) - self.db.create_doc_from_json(tests.simple_doc) - sync_info = yield self.st.get_sync_info('other') - self.assertEqual(1, sync_info[1]) - - @defer.inlineCallbacks - def test_record_sync_info(self): - yield self.st.record_sync_info('replica', 10, 'T-transid') - sync_info = yield self.st.get_sync_info('other') - self.assertEqual( - ('test', 0, '', 10, 'T-transid'), sync_info) - - @defer.inlineCallbacks - def test_sync_exchange(self): - """ - Modified to account for possibly receiving encrypted documents from - sever-side. - """ - - docs_by_gen = [ - (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10, - 'T-sid')] - new_gen, trans_id = yield self.st.sync_exchange( - docs_by_gen, 'replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc) - self.assertGetEncryptedDoc( - self.db, 'doc-id', 'replica:1', tests.simple_doc, False) - self.assertTransactionLog(['doc-id'], self.db) - last_trans_id = self.getLastTransId(self.db) - self.assertEqual(([], 1, last_trans_id), - (self.other_changes, new_gen, last_trans_id)) - sync_info = yield self.st.get_sync_info('replica') - self.assertEqual(10, sync_info[3]) - - @defer.inlineCallbacks - def test_sync_exchange_push_many(self): - """ - Modified to account for possibly receiving encrypted documents from - sever-side. - """ - docs_by_gen = [ - (self.make_document( - 'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'), - (self.make_document('doc-id2', 'replica:1', tests.nested_doc), 11, - 'T-2')] - new_gen, trans_id = yield self.st.sync_exchange( - docs_by_gen, 'replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc) - self.assertGetEncryptedDoc( - self.db, 'doc-id', 'replica:1', tests.simple_doc, False) - self.assertGetEncryptedDoc( - self.db, 'doc-id2', 'replica:1', tests.nested_doc, False) - self.assertTransactionLog(['doc-id', 'doc-id2'], self.db) - last_trans_id = self.getLastTransId(self.db) - self.assertEqual(([], 2, last_trans_id), - (self.other_changes, new_gen, trans_id)) - sync_info = yield self.st.get_sync_info('replica') - self.assertEqual(11, sync_info[3]) - - @defer.inlineCallbacks - def test_sync_exchange_returns_many_new_docs(self): - """ - Modified to account for JSON serialization differences. - """ - doc = self.db.create_doc_from_json(tests.simple_doc) - doc2 = self.db.create_doc_from_json(tests.nested_doc) - self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) - new_gen, _ = yield self.st.sync_exchange( - [], 'other-replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc) - self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) - self.assertEqual(2, new_gen) - self.assertEqual( - [(doc.doc_id, doc.rev, 1), - (doc2.doc_id, doc2.rev, 2)], - [c[:2] + c[3:4] for c in self.other_changes]) - self.assertEqual( - json.dumps(tests.simple_doc), - json.dumps(self.other_changes[0][2])) - self.assertEqual( - json.loads(tests.nested_doc), - json.loads(self.other_changes[1][2])) - if self.whitebox: - self.assertEqual( - self.db._last_exchange_log['return'], - {'last_gen': 2, 'docs': - [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]}) - - @defer.inlineCallbacks - def test_sync_exchange_deleted(self): - doc = self.db.create_doc_from_json('{}') - edit_rev = 'replica:1|' + doc.rev - docs_by_gen = [ - (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')] - new_gen, trans_id = yield self.st.sync_exchange( - docs_by_gen, 'replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc) - self.assertGetDocIncludeDeleted( - self.db, doc.doc_id, edit_rev, None, False) - self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) - last_trans_id = self.getLastTransId(self.db) - self.assertEqual(([], 2, last_trans_id), - (self.other_changes, new_gen, trans_id)) - sync_info = yield self.st.get_sync_info('replica') - self.assertEqual(10, sync_info[3]) - - @defer.inlineCallbacks - def test_sync_exchange_refuses_conflicts(self): - doc = self.db.create_doc_from_json(tests.simple_doc) - self.assertTransactionLog([doc.doc_id], self.db) - new_doc = '{"key": "altval"}' - docs_by_gen = [ - (self.make_document(doc.doc_id, 'replica:1', new_doc), 10, - 'T-sid')] - new_gen, _ = yield self.st.sync_exchange( - docs_by_gen, 'replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc) - self.assertTransactionLog([doc.doc_id], self.db) - self.assertEqual( - (doc.doc_id, doc.rev, tests.simple_doc, 1), - self.other_changes[0][:-1]) - self.assertEqual(1, new_gen) - if self.whitebox: - self.assertEqual(self.db._last_exchange_log['return'], - {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]}) - - @defer.inlineCallbacks - def test_sync_exchange_ignores_convergence(self): - doc = self.db.create_doc_from_json(tests.simple_doc) - self.assertTransactionLog([doc.doc_id], self.db) - gen, txid = self.db._get_generation_info() - docs_by_gen = [ - (self.make_document( - doc.doc_id, doc.rev, tests.simple_doc), 10, 'T-sid')] - new_gen, _ = yield self.st.sync_exchange( - docs_by_gen, 'replica', last_known_generation=gen, - last_known_trans_id=txid, insert_doc_cb=self.receive_doc) - self.assertTransactionLog([doc.doc_id], self.db) - self.assertEqual(([], 1), (self.other_changes, new_gen)) - - @defer.inlineCallbacks - def test_sync_exchange_returns_new_docs(self): - doc = self.db.create_doc_from_json(tests.simple_doc) - self.assertTransactionLog([doc.doc_id], self.db) - new_gen, _ = yield self.st.sync_exchange( - [], 'other-replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc) - self.assertTransactionLog([doc.doc_id], self.db) - self.assertEqual( - (doc.doc_id, doc.rev, tests.simple_doc, 1), - self.other_changes[0][:-1]) - self.assertEqual(1, new_gen) - if self.whitebox: - self.assertEqual(self.db._last_exchange_log['return'], - {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]}) - - @defer.inlineCallbacks - def test_sync_exchange_returns_deleted_docs(self): - doc = self.db.create_doc_from_json(tests.simple_doc) - self.db.delete_doc(doc) - self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) - new_gen, _ = yield self.st.sync_exchange( - [], 'other-replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc) - self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) - self.assertEqual( - (doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1]) - self.assertEqual(2, new_gen) - if self.whitebox: - self.assertEqual(self.db._last_exchange_log['return'], - {'last_gen': 2, 'docs': [(doc.doc_id, doc.rev)]}) - - @defer.inlineCallbacks - def test_sync_exchange_getting_newer_docs(self): - doc = self.db.create_doc_from_json(tests.simple_doc) - self.assertTransactionLog([doc.doc_id], self.db) - new_doc = '{"key": "altval"}' - docs_by_gen = [ - (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, - 'T-sid')] - new_gen, _ = yield self.st.sync_exchange( - docs_by_gen, 'other-replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc) - self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) - self.assertEqual(([], 2), (self.other_changes, new_gen)) - - @defer.inlineCallbacks - def test_sync_exchange_with_concurrent_updates_of_synced_doc(self): - expected = [] - - def before_whatschanged_cb(state): - if state != 'before whats_changed': - return - cont = '{"key": "cuncurrent"}' - conc_rev = self.db.put_doc( - self.make_document(doc.doc_id, 'test:1|z:2', cont)) - expected.append((doc.doc_id, conc_rev, cont, 3)) - - self.set_trace_hook(before_whatschanged_cb) - doc = self.db.create_doc_from_json(tests.simple_doc) - self.assertTransactionLog([doc.doc_id], self.db) - new_doc = '{"key": "altval"}' - docs_by_gen = [ - (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, - 'T-sid')] - new_gen, _ = yield self.st.sync_exchange( - docs_by_gen, 'other-replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc) - self.assertEqual(expected, [c[:-1] for c in self.other_changes]) - self.assertEqual(3, new_gen) - - @defer.inlineCallbacks - def test_sync_exchange_with_concurrent_updates(self): - - def after_whatschanged_cb(state): - if state != 'after whats_changed': - return - self.db.create_doc_from_json('{"new": "doc"}') - - self.set_trace_hook(after_whatschanged_cb) - doc = self.db.create_doc_from_json(tests.simple_doc) - self.assertTransactionLog([doc.doc_id], self.db) - new_doc = '{"key": "altval"}' - docs_by_gen = [ - (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, - 'T-sid')] - new_gen, _ = yield self.st.sync_exchange( - docs_by_gen, 'other-replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc) - self.assertEqual(([], 2), (self.other_changes, new_gen)) - - @defer.inlineCallbacks - def test_sync_exchange_converged_handling(self): - doc = self.db.create_doc_from_json(tests.simple_doc) - docs_by_gen = [ - (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'), - (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5, - 'T-bar')] - new_gen, _ = yield self.st.sync_exchange( - docs_by_gen, 'other-replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc) - self.assertEqual(([], 2), (self.other_changes, new_gen)) - - @defer.inlineCallbacks - def test_sync_exchange_detect_incomplete_exchange(self): - def before_get_docs_explode(state): - if state != 'before get_docs': - return - raise errors.U1DBError("fail") - self.set_trace_hook(before_get_docs_explode) - # suppress traceback printing in the wsgiref server - # self.patch(simple_server.ServerHandler, - # 'log_exception', lambda h, exc_info: None) - doc = self.db.create_doc_from_json(tests.simple_doc) - self.assertTransactionLog([doc.doc_id], self.db) - with self.assertRaises((errors.U1DBError, errors.BrokenSyncStream)): - yield self.st.sync_exchange( - [], 'other-replica', - last_known_generation=0, last_known_trans_id=None, - insert_doc_cb=self.receive_doc) - - @defer.inlineCallbacks - def test_sync_exchange_doc_ids(self): - sync_exchange_doc_ids = getattr(self.st, 'sync_exchange_doc_ids', None) - if sync_exchange_doc_ids is None: - self.skipTest("sync_exchange_doc_ids not implemented") - db2 = self.create_database('test2') - doc = db2.create_doc_from_json(tests.simple_doc) - new_gen, trans_id = sync_exchange_doc_ids( - db2, [(doc.doc_id, 10, 'T-sid')], 0, None, - insert_doc_cb=self.receive_doc) - self.assertGetDoc(self.db, doc.doc_id, doc.rev, - tests.simple_doc, False) - self.assertTransactionLog([doc.doc_id], self.db) - last_trans_id = self.getLastTransId(self.db) - self.assertEqual(([], 1, last_trans_id), - (self.other_changes, new_gen, trans_id)) - self.assertEqual(10, self.st.get_sync_info(db2._replica_uid)[3]) - - @defer.inlineCallbacks - def test__set_trace_hook(self): - called = [] - - def cb(state): - called.append(state) - - self.set_trace_hook(cb) - yield self.st.sync_exchange([], 'replica', 0, None, self.receive_doc) - yield self.st.record_sync_info('replica', 0, 'T-sid') - self.assertEqual(['before whats_changed', - 'after whats_changed', - 'before get_docs', - 'record_sync_info', - ], - called) - - @defer.inlineCallbacks - def test__set_trace_hook_shallow(self): - if (self.st._set_trace_hook_shallow == self.st._set_trace_hook or - self.st._set_trace_hook_shallow.im_func == - SoledadHTTPSyncTarget._set_trace_hook_shallow.im_func): - # shallow same as full - expected = ['before whats_changed', - 'after whats_changed', - 'before get_docs', - 'record_sync_info', - ] - else: - expected = ['sync_exchange', 'record_sync_info'] - - called = [] - - def cb(state): - called.append(state) - - self.set_trace_hook(cb, shallow=True) - self.st.sync_exchange([], 'replica', 0, None, self.receive_doc) - self.st.record_sync_info('replica', 0, 'T-sid') - self.assertEqual(expected, called) diff --git a/common/src/leap/soledad/common/tests/test_sync.py b/common/src/leap/soledad/common/tests/test_sync.py index 14152370..1041367b 100644 --- a/common/src/leap/soledad/common/tests/test_sync.py +++ b/common/src/leap/soledad/common/tests/test_sync.py @@ -56,14 +56,13 @@ class InterruptableSyncTestCase( sync_target = soledad_sync_target def make_app(self): - self.request_state = couch.CouchServerState(self._couch_url) + self.request_state = couch.CouchServerState(self.couch_url) return self.make_app_with_state(self.request_state) def setUp(self): TestCaseWithServer.setUp(self) CouchDBTestCase.setUp(self) self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") - self._couch_url = 'http://localhost:' + str(self.wrapper.port) def tearDown(self): CouchDBTestCase.tearDown(self) @@ -103,7 +102,7 @@ class InterruptableSyncTestCase( # ensure remote db exists before syncing db = couch.CouchDatabase.open_database( - urljoin(self._couch_url, 'user-user-uuid'), + urljoin(self.couch_url, 'user-user-uuid'), create=True, ensure_ddocs=True) @@ -148,8 +147,8 @@ class InterruptableSyncTestCase( class TestSoledadDbSync( TestWithScenarios, - tests.TestCaseWithServer, - SoledadWithCouchServerMixin): + SoledadWithCouchServerMixin, + tests.TestCaseWithServer): """ Test db.sync remote sync shortcut @@ -166,10 +165,6 @@ class TestSoledadDbSync( oauth = False token = False - def make_app(self): - self.request_state = couch.CouchServerState(self._couch_url) - return self.make_app_with_state(self.request_state) - def setUp(self): """ Need to explicitely invoke inicialization on all bases. @@ -177,29 +172,22 @@ class TestSoledadDbSync( SoledadWithCouchServerMixin.setUp(self) self.startTwistedServer() self.db = self.make_database_for_test(self, 'test1') - self.db2 = couch.CouchDatabase.open_database( - urljoin( - 'http://localhost:' + str(self.wrapper.port), - 'test' - ), - create=True, - ensure_ddocs=True) + self.db2 = self.request_state._create_database(replica_uid='test') def tearDown(self): """ Need to explicitely invoke destruction on all bases. """ - self.db2.delete_database() SoledadWithCouchServerMixin.tearDown(self) # tests.TestCaseWithServer.tearDown(self) - def do_sync(self, target_name): + def do_sync(self): """ Perform sync using SoledadSynchronizer, SoledadSyncTarget and Token auth. """ target = soledad_sync_target( - self, target_name, + self, self.db2._dbname, source_replica_uid=self._soledad._dbpool.replica_uid) self.addCleanup(target.close) return sync.SoledadSynchronizer( @@ -217,7 +205,7 @@ class TestSoledadDbSync( doc1 = self.db.create_doc_from_json(tests.simple_doc) doc2 = self.db2.create_doc_from_json(tests.nested_doc) - local_gen_before_sync = yield self.do_sync('test') + local_gen_before_sync = yield self.do_sync() gen, _, changes = self.db.whats_changed(local_gen_before_sync) self.assertEqual(1, len(changes)) self.assertEqual(doc2.doc_id, changes[0][0]) diff --git a/common/src/leap/soledad/common/tests/test_sync_deferred.py b/common/src/leap/soledad/common/tests/test_sync_deferred.py index ffb8a4ae..90b00670 100644 --- a/common/src/leap/soledad/common/tests/test_sync_deferred.py +++ b/common/src/leap/soledad/common/tests/test_sync_deferred.py @@ -59,6 +59,7 @@ class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin): def setUp(self): SoledadWithCouchServerMixin.setUp(self) + self.startTwistedServer() # config info self.db1_file = os.path.join(self.tempdir, "db1.u1db") os.unlink(self.db1_file) @@ -85,13 +86,7 @@ class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin): defer_encryption=True, sync_db_key=sync_db_key) self.db1 = SQLCipherDatabase(self.opts) - self.db2 = couch.CouchDatabase.open_database( - urljoin( - 'http://localhost:' + str(self.wrapper.port), - 'test' - ), - create=True, - ensure_ddocs=True) + self.db2 = self.request_state._create_database('test') def tearDown(self): # XXX should not access "private" attrs @@ -109,8 +104,8 @@ class SyncTimeoutError(Exception): class TestSoledadDbSyncDeferredEncDecr( TestWithScenarios, - tests.TestCaseWithServer, - BaseSoledadDeferredEncTest): + BaseSoledadDeferredEncTest, + tests.TestCaseWithServer): """ Test db.sync remote sync shortcut. @@ -128,17 +123,12 @@ class TestSoledadDbSyncDeferredEncDecr( oauth = False token = True - def make_app(self): - self.request_state = couch.CouchServerState(self._couch_url) - return self.make_app_with_state(self.request_state) - def setUp(self): """ Need to explicitely invoke inicialization on all bases. """ BaseSoledadDeferredEncTest.setUp(self) self.server = self.server_thread = None - self.startTwistedServer() self.syncer = None def tearDown(self): @@ -150,7 +140,7 @@ class TestSoledadDbSyncDeferredEncDecr( dbsyncer.close() BaseSoledadDeferredEncTest.tearDown(self) - def do_sync(self, target_name): + def do_sync(self): """ Perform sync using SoledadSynchronizer, SoledadSyncTarget and Token auth. @@ -159,7 +149,7 @@ class TestSoledadDbSyncDeferredEncDecr( sync_db = self._soledad._sync_db sync_enc_pool = self._soledad._sync_enc_pool target = soledad_sync_target( - self, target_name, + self, self.db2._dbname, source_replica_uid=replica_uid, sync_db=sync_db, sync_enc_pool=sync_enc_pool) @@ -190,7 +180,7 @@ class TestSoledadDbSyncDeferredEncDecr( """ doc1 = self.db1.create_doc_from_json(tests.simple_doc) doc2 = self.db2.create_doc_from_json(tests.nested_doc) - local_gen_before_sync = yield self.do_sync('test') + local_gen_before_sync = yield self.do_sync() gen, _, changes = self.db1.whats_changed(local_gen_before_sync) self.assertEqual(1, len(changes)) diff --git a/common/src/leap/soledad/common/tests/test_sync_mutex.py b/common/src/leap/soledad/common/tests/test_sync_mutex.py index a904a940..2e2123a7 100644 --- a/common/src/leap/soledad/common/tests/test_sync_mutex.py +++ b/common/src/leap/soledad/common/tests/test_sync_mutex.py @@ -84,14 +84,14 @@ class TestSyncMutex( sync_target = soledad_sync_target def make_app(self): - self.request_state = couch.CouchServerState(self._couch_url) + self.request_state = couch.CouchServerState(self.couch_url) return self.make_app_with_state(self.request_state) def setUp(self): TestCaseWithServer.setUp(self) CouchDBTestCase.setUp(self) self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") - self._couch_url = 'http://localhost:' + str(self.wrapper.port) + self.user = ('user-%s' % uuid.uuid4().hex) def tearDown(self): CouchDBTestCase.tearDown(self) @@ -103,12 +103,12 @@ class TestSyncMutex( # ensure remote db exists before syncing db = couch.CouchDatabase.open_database( - urljoin(self._couch_url, 'user-user-uuid'), + urljoin(self.couch_url, 'user-' + self.user), create=True, ensure_ddocs=True) sol = self._soledad_instance( - user='user-uuid', server_url=self.getURL()) + user=self.user, server_url=self.getURL()) d1 = sol.sync() d2 = sol.sync() diff --git a/common/src/leap/soledad/common/tests/test_sync_target.py b/common/src/leap/soledad/common/tests/test_sync_target.py index d855fb52..c0987e90 100644 --- a/common/src/leap/soledad/common/tests/test_sync_target.py +++ b/common/src/leap/soledad/common/tests/test_sync_target.py @@ -63,13 +63,12 @@ class TestSoledadParseReceivedDocResponse(SoledadWithCouchServerMixin): def setUp(self): SoledadWithCouchServerMixin.setUp(self) - self._couch_url = 'http://localhost:' + str(self.wrapper.port) creds = {'token': { 'uuid': 'user-uuid', 'token': 'auth-token', }} self.target = target.SoledadHTTPSyncTarget( - self._couch_url, + self.couch_url, uuid4().hex, creds, self._soledad._crypto, @@ -151,11 +150,12 @@ def make_local_db_and_soledad_target( test, path='test', source_replica_uid=uuid4().hex): test.startTwistedServer() - db = test.request_state._create_database(os.path.basename(path)) + replica_uid = os.path.basename(path) + db = test.request_state._create_database(replica_uid) sync_db = test._soledad._sync_db sync_enc_pool = test._soledad._sync_enc_pool st = soledad_sync_target( - test, path, + test, db._dbname, source_replica_uid=source_replica_uid, sync_db=sync_db, sync_enc_pool=sync_enc_pool) @@ -191,6 +191,8 @@ class TestSoledadSyncTarget( self.startTwistedServer() sync_db = self._soledad._sync_db sync_enc_pool = self._soledad._sync_enc_pool + if path is None: + path = self.db2._dbname target = self.sync_target( self, path, source_replica_uid=source_replica_uid, @@ -204,11 +206,11 @@ class TestSoledadSyncTarget( SoledadWithCouchServerMixin.setUp(self) self.startTwistedServer() self.db1 = make_sqlcipher_database_for_test(self, 'test1') - self.db2 = self.request_state._create_database('test2') + self.db2 = self.request_state._create_database('test') def tearDown(self): # db2, _ = self.request_state.ensure_database('test2') - self.db2.delete_database() + self.delete_db(self.db2._dbname) self.db1.close() SoledadWithCouchServerMixin.tearDown(self) TestWithScenarios.tearDown(self) @@ -220,8 +222,8 @@ class TestSoledadSyncTarget( This test was adapted to decrypt remote content before assert. """ - db = self.request_state._create_database('test') - remote_target = self.getSyncTarget('test') + db = self.db2 + remote_target = self.getSyncTarget() other_docs = [] def receive_doc(doc, gen, trans_id): @@ -247,7 +249,7 @@ class TestSoledadSyncTarget( def blackhole_getstderr(inst): return cStringIO.StringIO() - db = self.request_state._create_database('test') + db = self.db2 _put_doc_if_newer = db._put_doc_if_newer trigger_ids = ['doc-here2'] @@ -267,7 +269,6 @@ class TestSoledadSyncTarget( self.patch( IndexedCouchDatabase, '_put_doc_if_newer', bomb_put_doc_if_newer) remote_target = self.getSyncTarget( - 'test', source_replica_uid='replica') other_changes = [] @@ -317,7 +318,7 @@ class TestSoledadSyncTarget( This test was adapted to decrypt remote content before assert. """ - remote_target = self.getSyncTarget('test') + remote_target = self.getSyncTarget() other_docs = [] replica_uid_box = [] @@ -333,7 +334,7 @@ class TestSoledadSyncTarget( last_known_trans_id=None, insert_doc_cb=receive_doc, ensure_callback=ensure_cb, defer_decryption=False) self.assertEqual(1, new_gen) - db = self.request_state.open_database('test') + db = self.db2 self.assertEqual(1, len(replica_uid_box)) self.assertEqual(db._replica_uid, replica_uid_box[0]) self.assertGetEncryptedDoc( @@ -346,10 +347,9 @@ class TestSoledadSyncTarget( @defer.inlineCallbacks def test_get_sync_info(self): - db = self.request_state._create_database('test') + db = self.db2 db._set_replica_gen_and_trans_id('other-id', 1, 'T-transid') remote_target = self.getSyncTarget( - 'test', source_replica_uid='other-id') sync_info = yield remote_target.get_sync_info('other-id') self.assertEqual( @@ -358,19 +358,17 @@ class TestSoledadSyncTarget( @defer.inlineCallbacks def test_record_sync_info(self): - db = self.request_state._create_database('test') remote_target = self.getSyncTarget( - 'test', source_replica_uid='other-id') yield remote_target.record_sync_info('other-id', 2, 'T-transid') - self.assertEqual( - (2, 'T-transid'), db._get_replica_gen_and_trans_id('other-id')) + self.assertEqual((2, 'T-transid'), + self.db2._get_replica_gen_and_trans_id('other-id')) @defer.inlineCallbacks def test_sync_exchange_receive(self): - db = self.request_state._create_database('test') + db = self.db2 doc = db.create_doc_from_json('{"value": "there"}') - remote_target = self.getSyncTarget('test') + remote_target = self.getSyncTarget() other_changes = [] def receive_doc(doc, gen, trans_id): @@ -423,10 +421,10 @@ class SoledadDatabaseSyncTargetTests( self.db, self.st = make_local_db_and_soledad_target(self) def tearDown(self): - tests.TestCaseWithServer.tearDown(self) - SoledadWithCouchServerMixin.tearDown(self) self.db.close() self.st.close() + tests.TestCaseWithServer.tearDown(self) + SoledadWithCouchServerMixin.tearDown(self) def set_trace_hook(self, callback, shallow=False): setter = (self.st._set_trace_hook if not shallow else @@ -818,10 +816,6 @@ class TestSoledadDbSync( oauth = False token = False - def make_app(self): - self.request_state = couch.CouchServerState(self._couch_url) - return self.make_app_with_state(self.request_state) - def setUp(self): """ Need to explicitely invoke inicialization on all bases. @@ -857,13 +851,7 @@ class TestSoledadDbSync( defer_encryption=True, sync_db_key=sync_db_key) self.db1 = SQLCipherDatabase(self.opts) - self.db2 = couch.CouchDatabase.open_database( - urljoin( - 'http://localhost:' + str(self.wrapper.port), - 'test' - ), - create=True, - ensure_ddocs=True) + self.db2 = self.request_state._create_database(replica_uid='test') def tearDown(self): """ @@ -890,7 +878,7 @@ class TestSoledadDbSync( 'uuid': 'user-uuid', 'token': 'auth-token', }} - target_url = self.getURL(target_name) + target_url = self.getURL(self.db2._dbname) # get a u1db syncer crypto = self._soledad._crypto diff --git a/common/src/leap/soledad/common/tests/util.py b/common/src/leap/soledad/common/tests/util.py index daa9c558..1c7adb91 100644 --- a/common/src/leap/soledad/common/tests/util.py +++ b/common/src/leap/soledad/common/tests/util.py @@ -27,10 +27,8 @@ import shutil import random import string import u1db -import subprocess -import time -import re import traceback +import couchdb from uuid import uuid4 from mock import Mock @@ -337,119 +335,6 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest): self.assertEqual(exp_doc.content, doc.content) -# ----------------------------------------------------------------------------- -# A wrapper for running couchdb locally. -# ----------------------------------------------------------------------------- - -# from: https://github.com/smcq/paisley/blob/master/paisley/test/util.py -# TODO: include license of above project. -class CouchDBWrapper(object): - - """ - Wrapper for external CouchDB instance which is started and stopped for - testing. - """ - BOOT_TIMEOUT_SECONDS = 5 - RETRY_LIMIT = 3 - - def start(self): - tries = 0 - while tries < self.RETRY_LIMIT and not hasattr(self, 'port'): - try: - self._try_start() - return - except Exception, e: - print traceback.format_exc() - self.stop() - tries += 1 - raise Exception( - "Check your couchdb: Tried to start 3 times and failed badly") - - def _try_start(self): - """ - Start a CouchDB instance for a test. - """ - self.tempdir = tempfile.mkdtemp(suffix='.couch.test') - - path = os.path.join(os.path.dirname(__file__), - 'couchdb.ini.template') - handle = open(path) - conf = handle.read() % { - 'tempdir': self.tempdir, - } - handle.close() - - shutil.copy('/etc/couchdb/default.ini', self.tempdir) - defaultConfPath = os.path.join(self.tempdir, 'default.ini') - - confPath = os.path.join(self.tempdir, 'test.ini') - handle = open(confPath, 'w') - handle.write(conf) - handle.close() - - # create the dirs from the template - mkdir_p(os.path.join(self.tempdir, 'lib')) - mkdir_p(os.path.join(self.tempdir, 'log')) - args = ['/usr/bin/couchdb', '-n', - '-a', defaultConfPath, '-a', confPath] - null = open('/dev/null', 'w') - - self.process = subprocess.Popen( - args, env=None, stdout=null.fileno(), stderr=null.fileno(), - close_fds=True) - boot_time = time.time() - # find port - logPath = os.path.join(self.tempdir, 'log', 'couch.log') - while not os.path.exists(logPath): - if self.process.poll() is not None: - got_stdout, got_stderr = "", "" - if self.process.stdout is not None: - got_stdout = self.process.stdout.read() - - if self.process.stderr is not None: - got_stderr = self.process.stderr.read() - raise Exception(""" -couchdb exited with code %d. -stdout: -%s -stderr: -%s""" % ( - self.process.returncode, got_stdout, got_stderr)) - time.sleep(0.01) - if (time.time() - boot_time) > self.BOOT_TIMEOUT_SECONDS: - self.stop() - raise Exception("Timeout starting couch") - while os.stat(logPath).st_size == 0: - time.sleep(0.01) - if (time.time() - boot_time) > self.BOOT_TIMEOUT_SECONDS: - self.stop() - raise Exception("Timeout starting couch") - PORT_RE = re.compile( - 'Apache CouchDB has started on http://127.0.0.1:(?P<port>\d+)') - - handle = open(logPath) - line = handle.read() - handle.close() - m = PORT_RE.search(line) - if not m: - self.stop() - raise Exception("Cannot find port in line %s" % line) - self.port = int(m.group('port')) - - def stop(self): - """ - Terminate the CouchDB instance. - """ - try: - self.process.terminate() - self.process.communicate() - except: - # just to clean up - # if it can't, the process wasn't created anyway - pass - shutil.rmtree(self.tempdir) - - class CouchDBTestCase(unittest.TestCase, MockedSharedDBTest): """ @@ -460,15 +345,16 @@ class CouchDBTestCase(unittest.TestCase, MockedSharedDBTest): """ Make sure we have a CouchDB instance for a test. """ - self.wrapper = CouchDBWrapper() - self.wrapper.start() - # self.db = self.wrapper.db + self.couch_port = 5984 + self.couch_url = 'http://localhost:%d' % self.couch_port + self.couch_server = couchdb.Server(self.couch_url) - def tearDown(self): - """ - Stop CouchDB instance for test. - """ - self.wrapper.stop() + def delete_db(self, name): + try: + self.couch_server.delete(name) + except: + # ignore if already missing + pass class CouchServerStateForTests(CouchServerState): @@ -484,15 +370,25 @@ class CouchServerStateForTests(CouchServerState): which is less pleasant than allowing the db to be automatically created. """ - def _create_database(self, dbname): - return CouchDatabase.open_database( - urljoin(self._couch_url, dbname), + def __init__(self, *args, **kwargs): + self.dbs = [] + super(CouchServerStateForTests, self).__init__(*args, **kwargs) + + def _create_database(self, replica_uid=None, dbname=None): + """ + Create db and append to a list, allowing test to close it later + """ + dbname = dbname or ('test-%s' % uuid4().hex) + db = CouchDatabase.open_database( + urljoin(self.couch_url, dbname), True, - replica_uid=dbname, + replica_uid=replica_uid or 'test', ensure_ddocs=True) + self.dbs.append(db) + return db def ensure_database(self, dbname): - db = self._create_database(dbname) + db = self._create_database(dbname=dbname) return db, db.replica_uid @@ -506,23 +402,20 @@ class SoledadWithCouchServerMixin( main_test_class = getattr(self, 'main_test_class', None) if main_test_class is not None: main_test_class.setUp(self) - self._couch_url = 'http://localhost:%d' % self.wrapper.port def tearDown(self): main_test_class = getattr(self, 'main_test_class', None) if main_test_class is not None: main_test_class.tearDown(self) # delete the test database - try: - db = CouchDatabase(self._couch_url, 'test') - db.delete_database() - except DatabaseDoesNotExist: - pass BaseSoledadTest.tearDown(self) CouchDBTestCase.tearDown(self) def make_app(self): - couch_url = urljoin( - 'http://localhost:' + str(self.wrapper.port), 'tests') - self.request_state = CouchServerStateForTests(couch_url) + self.request_state = CouchServerStateForTests(self.couch_url) + self.addCleanup(self.delete_dbs) return self.make_app_with_state(self.request_state) + + def delete_dbs(self): + for db in self.request_state.dbs: + self.delete_db(db._dbname) diff --git a/docs/sphinx/sync.rst b/docs/sphinx/sync.rst new file mode 100644 index 00000000..f243befb --- /dev/null +++ b/docs/sphinx/sync.rst @@ -0,0 +1,32 @@ +Soledad sync process +==================== + +Phases of sync: + +(1) client acquires knowledge about server state. http GET +(2) client sends its documents to the server. http POSTs, or a single POST. +(3) client downloads documents from the server. +(4) client records its new state on the server. + +Originally in u1db: + (1) is a GET, + (2) and (3) are one POST (send in body, receive in response), + (4) is a PUT. + +In soledad: + +(1) is a GET. +(2) is either 1 or a series of sequential POSTS. + (2.1) encrypt asynchronously + (2.2) store in temp sync db + (2.3) upload sequentially ***THIS IS SLOW*** +(3) is a series of concurrent POSTS, insert sequentially on local client db. + (3.1) download concurrently + (3.2) store in temp sync db + (3.3) decrypt asynchronously + (3.4) insert sequentially in local client db +(4) is a PUT. + +This difference between u1db and soledad was made in order to be able to gracefully interrupt the sync in the middle of the upload or the download. + +it is essential that all the uploads and downloads are sequential: documents must be added in order. the download happens in parallel, but then locally they are added sequentially to the local db. diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index 7a03f6fb..1b795016 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -238,6 +238,7 @@ class HTTPInvocationByMethodWithBody( if content_type == 'application/x-soledad-sync-put': meth_put = self._lookup('%s_put' % method) meth_end = self._lookup('%s_end' % method) + entries = [] while True: line = body_getline() entry = line.strip() @@ -246,9 +247,11 @@ class HTTPInvocationByMethodWithBody( if not entry or not comma: # empty or no prec comma raise http_app.BadRequest entry, comma = utils.check_and_strip_comma(entry) - meth_put({}, entry) + entries.append(entry) if comma or body_getline(): # extra comma or data raise http_app.BadRequest + for entry in entries: + meth_put({}, entry) return meth_end() # handle outgoing documents elif content_type == 'application/x-soledad-sync-get': |