diff options
28 files changed, 1322 insertions, 1752 deletions
@@ -1,3 +1,19 @@ +0.7.3 Sep 22, 2015: +Client: +  o Bugfix: refactor code loss. Closes #7412. +  o Bugfix: Set active secret before saving local file. +  o Split http_target into 4 modules, separating those responsibilities. +  o Refactor details of making an HTTP request body and headers out of the +    send/fetch logic. This also makes it easier to enable batching. + +Server: +  o Fix a bug where BadRequest could be raised after everything was persisted. + +Common: +  o Refactor couch.py to separate persistence from logic while saving uploaded +    documents. Also simplify logic while checking for conflicts. + +  0.7.2 Aug 26, 2015:  Client:    o Remove MAC from secrets file. Closes #6980. @@ -6,7 +22,7 @@ Client:    o Improve how we send information on SOLEDAD_SYNC_SEND_STATUS and in      SOLEDAD_SYNC_RECEIVE_STATUS. Related to Feature #7353.    o Fix hanging sync by properly waiting db initialization on sync decrypter -    pool. Closes #7686. +    pool. Closes #7386.    o Avoid double decryption of documents.    o Fix the order of the events emited for incoming documents.    o bugfix: move sync db and encpool creation to api. @@ -13,18 +13,24 @@ repository:  **leap.soledad.common** common pieces. -.. image:: https://pypip.in/v/leap.soledad.common/badge.png -        :target: https://crate.io/packages/leap.soledad.common +.. image:: https://badge.fury.io/py/leap.soledad.common.svg +    :target: http://badge.fury.io/py/leap.soledad.common +.. image:: https://img.shields.io/pypi/dm/leap.soledad.common.svg +    :target: http://badge.fury.io/py/leap.soledad.common  **leap.soledad.client** where the soledad client lives. -.. image:: https://pypip.in/v/leap.soledad.client/badge.png -        :target: https://crate.io/packages/leap.soledad.client +.. image:: https://badge.fury.io/py/leap.soledad.client.svg +    :target: http://badge.fury.io/py/leap.soledad.client +.. image:: https://img.shields.io/pypi/dm/leap.soledad.client.svg +    :target: http://badge.fury.io/py/leap.soledad.client  **leap.soledad.server** oh surprise! bits needed for the soledad server. -.. image:: https://pypip.in/v/leap.soledad.server/badge.png -        :target: https://crate.io/packages/leap.soledad.server +.. image:: https://badge.fury.io/py/leap.soledad.server.svg +    :target: http://badge.fury.io/py/leap.soledad.server +.. image:: https://img.shields.io/pypi/dm/leap.soledad.server.svg +    :target: http://badge.fury.io/py/leap.soledad.server  Compatibility diff --git a/client/pkg/requirements-leap.pip b/client/pkg/requirements-leap.pip index c5fbcd5f..52d1263b 100644 --- a/client/pkg/requirements-leap.pip +++ b/client/pkg/requirements-leap.pip @@ -1,2 +1,2 @@ -leap.common>=0.4.1 -leap.soledad.common>=0.6.5 +leap.common>=0.4.3 +leap.soledad.common>=0.7.0 diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 237159bd..77822247 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -285,7 +285,8 @@ class U1DBConnectionPool(adbapi.ConnectionPool):          A final close, only called by the shutdown trigger.          """          self.shutdownID = None -        self.threadpool.stop() +        if self.threadpool.started: +            self.threadpool.stop()          self.running = False          for conn in self.connections.values():              self._close(conn) diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index a6a98551..a558addd 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -718,7 +718,7 @@ class Soledad(object):              return failure          def _emit_done_data_sync(passthrough): -            soledad_events.emit( +            soledad_events.emit_async(                  soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid)              return passthrough diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 2ad98767..6d3c11b9 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -74,6 +74,8 @@ class SyncEncryptDecryptPool(object):          self._started = True      def stop(self): +        if not self._started: +            return          self._started = False          self._destroy_pool()          # maybe cancel the next delayed call diff --git a/client/src/leap/soledad/client/events.py b/client/src/leap/soledad/client/events.py index b1379521..058be59c 100644 --- a/client/src/leap/soledad/client/events.py +++ b/client/src/leap/soledad/client/events.py @@ -20,7 +20,7 @@  Signaling functions.  """ -from leap.common.events import emit +from leap.common.events import emit_async  from leap.common.events import catalog @@ -40,7 +40,7 @@ SOLEDAD_SYNC_RECEIVE_STATUS = catalog.SOLEDAD_SYNC_RECEIVE_STATUS  __all__ = [      "catalog", -    "emit", +    "emit_async",      "SOLEDAD_CREATING_KEYS",      "SOLEDAD_DONE_CREATING_KEYS",      "SOLEDAD_DOWNLOADING_KEYS", diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py deleted file mode 100644 index a6ef2b0d..00000000 --- a/client/src/leap/soledad/client/http_target.py +++ /dev/null @@ -1,711 +0,0 @@ -# -*- coding: utf-8 -*- -# http_target.py -# Copyright (C) 2015 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -A U1DB backend for encrypting data before sending to server and decrypting -after receiving. -""" - - -import json -import base64 -import logging -import warnings - -from uuid import uuid4 - -from twisted.internet import defer -from twisted.web.error import Error -from twisted.web.client import _ReadBodyProtocol -from twisted.web.client import PartialDownloadError -from twisted.web._newclient import ResponseDone -from twisted.web._newclient import PotentialDataLoss - -from u1db import errors -from u1db import SyncTarget -from u1db.remote import utils -from u1db.remote import http_errors - -from leap.common.http import HTTPClient - -from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.errors import InvalidAuthTokenError - -from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS -from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS -from leap.soledad.client.events import emit -from leap.soledad.client.encdecpool import SyncDecrypterPool - - -logger = logging.getLogger(__name__) - - -# we want to make sure that HTTP errors will raise appropriate u1db errors, -# that is, fire errbacks with the appropriate failures, in the context of -# twisted. Because of that, we redefine the http body reader used by the HTTP -# client below. - -class ReadBodyProtocol(_ReadBodyProtocol): - -    def __init__(self, response, deferred): -        """ -        Initialize the protocol, additionally storing the response headers. -        """ -        _ReadBodyProtocol.__init__( -            self, response.code, response.phrase, deferred) -        self.headers = response.headers - -    # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks -    def _error(self, respdic): -        descr = respdic.get("error") -        exc_cls = errors.wire_description_to_exc.get(descr) -        if exc_cls is not None: -            message = respdic.get("message") -            self.deferred.errback(exc_cls(message)) -    # ---8<--- end of snippet from u1db.remote.http_client - -    def connectionLost(self, reason): -        """ -        Deliver the accumulated response bytes to the waiting L{Deferred}, if -        the response body has been completely received without error. -        """ -        if reason.check(ResponseDone): - -            body = b''.join(self.dataBuffer) - -            # ---8<--- snippet from u1db.remote.http_client -            if self.status in (200, 201): -                self.deferred.callback(body) -            elif self.status in http_errors.ERROR_STATUSES: -                try: -                    respdic = json.loads(body) -                except ValueError: -                    self.deferred.errback( -                        errors.HTTPError(self.status, body, self.headers)) -                else: -                    self._error(respdic) -            # special cases -            elif self.status == 503: -                self.deferred.errback(errors.Unavailable(body, self.headers)) -            else: -                self.deferred.errback( -                    errors.HTTPError(self.status, body, self.headers)) -            # ---8<--- end of snippet from u1db.remote.http_client - -        elif reason.check(PotentialDataLoss): -            self.deferred.errback( -                PartialDownloadError(self.status, self.message, -                                     b''.join(self.dataBuffer))) -        else: -            self.deferred.errback(reason) - - -def readBody(response): -    """ -    Get the body of an L{IResponse} and return it as a byte string. - -    This is a helper function for clients that don't want to incrementally -    receive the body of an HTTP response. - -    @param response: The HTTP response for which the body will be read. -    @type response: L{IResponse} provider - -    @return: A L{Deferred} which will fire with the body of the response. -        Cancelling it will close the connection to the server immediately. -    """ -    def cancel(deferred): -        """ -        Cancel a L{readBody} call, close the connection to the HTTP server -        immediately, if it is still open. - -        @param deferred: The cancelled L{defer.Deferred}. -        """ -        abort = getAbort() -        if abort is not None: -            abort() - -    d = defer.Deferred(cancel) -    protocol = ReadBodyProtocol(response, d) - -    def getAbort(): -        return getattr(protocol.transport, 'abortConnection', None) - -    response.deliverBody(protocol) - -    if protocol.transport is not None and getAbort() is None: -        warnings.warn( -            'Using readBody with a transport that does not have an ' -            'abortConnection method', -            category=DeprecationWarning, -            stacklevel=2) - -    return d - - -class SoledadHTTPSyncTarget(SyncTarget): - -    """ -    A SyncTarget that encrypts data before sending and decrypts data after -    receiving. - -    Normally encryption will have been written to the sync database upon -    document modification. The sync database is also used to write temporarily -    the parsed documents that the remote send us, before being decrypted and -    written to the main database. -    """ - -    def __init__(self, url, source_replica_uid, creds, crypto, cert_file, -                 sync_db=None, sync_enc_pool=None): -        """ -        Initialize the sync target. - -        :param url: The server sync url. -        :type url: str -        :param source_replica_uid: The source replica uid which we use when -                                   deferring decryption. -        :type source_replica_uid: str -        :param creds: A dictionary containing the uuid and token. -        :type creds: creds -        :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt -                        document contents when syncing. -        :type crypto: soledad.crypto.SoledadCrypto -        :param cert_file: Path to the certificate of the ca used to validate -                          the SSL certificate used by the remote soledad -                          server. -        :type cert_file: str -        :param sync_db: Optional. handler for the db with the symmetric -                        encryption of the syncing documents. If -                        None, encryption will be done in-place, -                        instead of retreiving it from the dedicated -                        database. -        :type sync_db: Sqlite handler -        :param sync_enc_pool: The encryption pool to use to defer encryption. -                              If None is passed the encryption will not be -                              deferred. -        :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool -        """ -        if url.endswith("/"): -            url = url[:-1] -        self._url = str(url) + "/sync-from/" + str(source_replica_uid) -        self.source_replica_uid = source_replica_uid -        self._auth_header = None -        self.set_creds(creds) -        self._crypto = crypto -        self._sync_db = sync_db -        self._sync_enc_pool = sync_enc_pool -        self._insert_doc_cb = None -        # asynchronous encryption/decryption attributes -        self._decryption_callback = None -        self._sync_decr_pool = None -        self._http = HTTPClient(cert_file) - -    def close(self): -        self._http.close() - -    def set_creds(self, creds): -        """ -        Update credentials. - -        :param creds: A dictionary containing the uuid and token. -        :type creds: dict -        """ -        uuid = creds['token']['uuid'] -        token = creds['token']['token'] -        auth = '%s:%s' % (uuid, token) -        b64_token = base64.b64encode(auth) -        self._auth_header = {'Authorization': ['Token %s' % b64_token]} - -    @property -    def _defer_encryption(self): -        return self._sync_enc_pool is not None - -    # -    # SyncTarget API -    # - -    @defer.inlineCallbacks -    def get_sync_info(self, source_replica_uid): -        """ -        Return information about known state of remote database. - -        Return the replica_uid and the current database generation of the -        remote database, and its last-seen database generation for the client -        replica. - -        :param source_replica_uid: The client-size replica uid. -        :type source_replica_uid: str - -        :return: A deferred which fires with (target_replica_uid, -                 target_replica_generation, target_trans_id, -                 source_replica_last_known_generation, -                 source_replica_last_known_transaction_id) -        :rtype: twisted.internet.defer.Deferred -        """ -        raw = yield self._http_request(self._url, headers=self._auth_header) -        res = json.loads(raw) -        defer.returnValue(( -            res['target_replica_uid'], -            res['target_replica_generation'], -            res['target_replica_transaction_id'], -            res['source_replica_generation'], -            res['source_transaction_id'] -        )) - -    def record_sync_info( -            self, source_replica_uid, source_replica_generation, -            source_replica_transaction_id): -        """ -        Record tip information for another replica. - -        After sync_exchange has been processed, the caller will have -        received new content from this replica. This call allows the -        source replica instigating the sync to inform us what their -        generation became after applying the documents we returned. - -        This is used to allow future sync operations to not need to repeat data -        that we just talked about. It also means that if this is called at the -        wrong time, there can be database records that will never be -        synchronized. - -        :param source_replica_uid: The identifier for the source replica. -        :type source_replica_uid: str -        :param source_replica_generation: The database generation for the -                                          source replica. -        :type source_replica_generation: int -        :param source_replica_transaction_id: The transaction id associated -                                              with the source replica -                                              generation. -        :type source_replica_transaction_id: str - -        :return: A deferred which fires with the result of the query. -        :rtype: twisted.internet.defer.Deferred -        """ -        data = json.dumps({ -            'generation': source_replica_generation, -            'transaction_id': source_replica_transaction_id -        }) -        headers = self._auth_header.copy() -        headers.update({'content-type': ['application/json']}) -        return self._http_request( -            self._url, -            method='PUT', -            headers=headers, -            body=data) - -    @defer.inlineCallbacks -    def sync_exchange(self, docs_by_generation, source_replica_uid, -                      last_known_generation, last_known_trans_id, -                      insert_doc_cb, ensure_callback=None, -                      defer_decryption=True, sync_id=None): -        """ -        Find out which documents the remote database does not know about, -        encrypt and send them. After that, receive documents from the remote -        database. - -        :param docs_by_generations: A list of (doc_id, generation, trans_id) -                                    of local documents that were changed since -                                    the last local generation the remote -                                    replica knows about. -        :type docs_by_generations: list of tuples - -        :param source_replica_uid: The uid of the source replica. -        :type source_replica_uid: str - -        :param last_known_generation: Target's last known generation. -        :type last_known_generation: int - -        :param last_known_trans_id: Target's last known transaction id. -        :type last_known_trans_id: str - -        :param insert_doc_cb: A callback for inserting received documents from -                              target. If not overriden, this will call u1db -                              insert_doc_from_target in synchronizer, which -                              implements the TAKE OTHER semantics. -        :type insert_doc_cb: function - -        :param ensure_callback: A callback that ensures we know the target -                                replica uid if the target replica was just -                                created. -        :type ensure_callback: function - -        :param defer_decryption: Whether to defer the decryption process using -                                 the intermediate database. If False, -                                 decryption will be done inline. -        :type defer_decryption: bool - -        :return: A deferred which fires with the new generation and -                 transaction id of the target replica. -        :rtype: twisted.internet.defer.Deferred -        """ - -        self._ensure_callback = ensure_callback - -        if sync_id is None: -            sync_id = str(uuid4()) -        self.source_replica_uid = source_replica_uid - -        # save a reference to the callback so we can use it after decrypting -        self._insert_doc_cb = insert_doc_cb - -        gen_after_send, trans_id_after_send = yield self._send_docs( -            docs_by_generation, -            last_known_generation, -            last_known_trans_id, -            sync_id) - -        cur_target_gen, cur_target_trans_id = yield self._receive_docs( -            last_known_generation, last_known_trans_id, -            ensure_callback, sync_id, -            defer_decryption=defer_decryption) - -        # update gen and trans id info in case we just sent and did not -        # receive docs. -        if gen_after_send is not None and gen_after_send > cur_target_gen: -            cur_target_gen = gen_after_send -            cur_target_trans_id = trans_id_after_send - -        defer.returnValue([cur_target_gen, cur_target_trans_id]) - -    # -    # methods to send docs -    # - -    def _prepare(self, comma, entries, **dic): -        entry = comma + '\r\n' + json.dumps(dic) -        entries.append(entry) -        return len(entry) - -    @defer.inlineCallbacks -    def _send_docs(self, docs_by_generation, last_known_generation, -                   last_known_trans_id, sync_id): - -        if not docs_by_generation: -            defer.returnValue([None, None]) - -        headers = self._auth_header.copy() -        headers.update({'content-type': ['application/x-soledad-sync-put']}) -        # add remote replica metadata to the request -        first_entries = ['['] -        self._prepare( -            '', first_entries, -            last_known_generation=last_known_generation, -            last_known_trans_id=last_known_trans_id, -            sync_id=sync_id, -            ensure=self._ensure_callback is not None) -        idx = 0 -        total = len(docs_by_generation) -        for doc, gen, trans_id in docs_by_generation: -            idx += 1 -            result = yield self._send_one_doc( -                headers, first_entries, doc, -                gen, trans_id, total, idx) -            if self._defer_encryption: -                self._sync_enc_pool.delete_encrypted_doc( -                    doc.doc_id, doc.rev) - -            msg = "%d/%d" % (idx, total) -            content = {'sent': idx, 'total': total} -            emit(SOLEDAD_SYNC_SEND_STATUS, content) -            logger.debug("Sync send status: %s" % msg) - -        response_dict = json.loads(result)[0] -        gen_after_send = response_dict['new_generation'] -        trans_id_after_send = response_dict['new_transaction_id'] -        defer.returnValue([gen_after_send, trans_id_after_send]) - -    @defer.inlineCallbacks -    def _send_one_doc(self, headers, first_entries, doc, gen, trans_id, -                      number_of_docs, doc_idx): -        entries = first_entries[:] -        # add the document to the request -        content = yield self._encrypt_doc(doc) -        self._prepare( -            ',', entries, -            id=doc.doc_id, rev=doc.rev, content=content, gen=gen, -            trans_id=trans_id, number_of_docs=number_of_docs, -            doc_idx=doc_idx) -        entries.append('\r\n]') -        data = ''.join(entries) -        result = yield self._http_request( -            self._url, -            method='POST', -            headers=headers, -            body=data) -        defer.returnValue(result) - -    def _encrypt_doc(self, doc): -        d = None -        if doc.is_tombstone(): -            d = defer.succeed(None) -        elif not self._defer_encryption: -            # fallback case, for tests -            d = defer.succeed(self._crypto.encrypt_doc(doc)) -        else: - -            def _maybe_encrypt_doc_inline(doc_json): -                if doc_json is None: -                    # the document is not marked as tombstone, but we got -                    # nothing from the sync db. As it is not encrypted -                    # yet, we force inline encryption. -                    return self._crypto.encrypt_doc(doc) -                return doc_json - -            d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) -            d.addCallback(_maybe_encrypt_doc_inline) -        return d - -    # -    # methods to receive doc -    # - -    @defer.inlineCallbacks -    def _receive_docs(self, last_known_generation, last_known_trans_id, -                      ensure_callback, sync_id, defer_decryption): - -        self._queue_for_decrypt = defer_decryption \ -            and self._sync_db is not None - -        new_generation = last_known_generation -        new_transaction_id = last_known_trans_id - -        if self._queue_for_decrypt: -            logger.debug( -                "Soledad sync: will queue received docs for decrypting.") - -        if defer_decryption: -            self._setup_sync_decr_pool() - -        headers = self._auth_header.copy() -        headers.update({'content-type': ['application/x-soledad-sync-get']}) - -        # --------------------------------------------------------------------- -        # maybe receive the first document -        # --------------------------------------------------------------------- - -        # we fetch the first document before fetching the rest because we need -        # to know the total number of documents to be received, and this -        # information comes as metadata to each request. - -        doc = yield self._receive_one_doc( -            headers, last_known_generation, last_known_trans_id, -            sync_id, 0) -        self._received_docs = 0 -        number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) - -        # update the target gen and trans_id in case a document was received -        if ngen: -            new_generation = ngen -            new_transaction_id = ntrans - -        if defer_decryption: -            self._sync_decr_pool.start(number_of_changes) - -        # --------------------------------------------------------------------- -        # maybe receive the rest of the documents -        # --------------------------------------------------------------------- - -        # launch many asynchronous fetches and inserts of received documents -        # in the temporary sync db. Will wait for all results before -        # continuing. - -        received = 1 -        deferreds = [] -        while received < number_of_changes: -            d = self._receive_one_doc( -                headers, last_known_generation, -                last_known_trans_id, sync_id, received) -            d.addCallback( -                self._insert_received_doc, -                received + 1,  # the index of the current received doc -                number_of_changes) -            deferreds.append(d) -            received += 1 -        results = yield defer.gatherResults(deferreds) - -        # get generation and transaction id of target after insertions -        if deferreds: -            _, new_generation, new_transaction_id = results.pop() - -        # --------------------------------------------------------------------- -        # wait for async decryption to finish -        # --------------------------------------------------------------------- - -        if defer_decryption: -            yield self._sync_decr_pool.deferred -            self._sync_decr_pool.stop() - -        defer.returnValue([new_generation, new_transaction_id]) - -    def _receive_one_doc(self, headers, last_known_generation, -                         last_known_trans_id, sync_id, received): -        entries = ['['] -        # add remote replica metadata to the request -        self._prepare( -            '', entries, -            last_known_generation=last_known_generation, -            last_known_trans_id=last_known_trans_id, -            sync_id=sync_id, -            ensure=self._ensure_callback is not None) -        # inform server of how many documents have already been received -        self._prepare( -            ',', entries, received=received) -        entries.append('\r\n]') -        # send headers -        return self._http_request( -            self._url, -            method='POST', -            headers=headers, -            body=''.join(entries)) - -    def _insert_received_doc(self, response, idx, total): -        """ -        Insert a received document into the local replica. - -        :param response: The body and headers of the response. -        :type response: tuple(str, dict) -        :param idx: The index count of the current operation. -        :type idx: int -        :param total: The total number of operations. -        :type total: int -        """ -        new_generation, new_transaction_id, number_of_changes, doc_id, \ -            rev, content, gen, trans_id = \ -            self._parse_received_doc_response(response) -        if doc_id is not None: -            # decrypt incoming document and insert into local database -            # ------------------------------------------------------------- -            # symmetric decryption of document's contents -            # ------------------------------------------------------------- -            # If arriving content was symmetrically encrypted, we decrypt it. -            # We do it inline if defer_decryption flag is False or no sync_db -            # was defined, otherwise we defer it writing it to the received -            # docs table. -            doc = SoledadDocument(doc_id, rev, content) -            if is_symmetrically_encrypted(doc): -                if self._queue_for_decrypt: -                    self._sync_decr_pool.insert_encrypted_received_doc( -                        doc.doc_id, doc.rev, doc.content, gen, trans_id, -                        idx) -                else: -                    # defer_decryption is False or no-sync-db fallback -                    doc.set_json(self._crypto.decrypt_doc(doc)) -                    self._insert_doc_cb(doc, gen, trans_id) -            else: -                # not symmetrically encrypted doc, insert it directly -                # or save it in the decrypted stage. -                if self._queue_for_decrypt: -                    self._sync_decr_pool.insert_received_doc( -                        doc.doc_id, doc.rev, doc.content, gen, trans_id, -                        idx) -                else: -                    self._insert_doc_cb(doc, gen, trans_id) -            # ------------------------------------------------------------- -            # end of symmetric decryption -            # ------------------------------------------------------------- -        self._received_docs += 1 -        msg = "%d/%d" % (self._received_docs, total) -        content = {'received': self._received_docs, 'total': total} -        emit(SOLEDAD_SYNC_RECEIVE_STATUS, content) -        logger.debug("Sync receive status: %s" % msg) -        return number_of_changes, new_generation, new_transaction_id - -    def _parse_received_doc_response(self, response): -        """ -        Parse the response from the server containing the received document. - -        :param response: The body and headers of the response. -        :type response: tuple(str, dict) - -        :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, -                 content, gen, trans_id) -        :rtype: tuple -        """ -        # decode incoming stream -        parts = response.splitlines() -        if not parts or parts[0] != '[' or parts[-1] != ']': -            raise errors.BrokenSyncStream -        data = parts[1:-1] -        # decode metadata -        try: -            line, comma = utils.check_and_strip_comma(data[0]) -            metadata = None -        except (IndexError): -            raise errors.BrokenSyncStream -        try: -            metadata = json.loads(line) -            new_generation = metadata['new_generation'] -            new_transaction_id = metadata['new_transaction_id'] -            number_of_changes = metadata['number_of_changes'] -        except (ValueError, KeyError): -            raise errors.BrokenSyncStream -        # make sure we have replica_uid from fresh new dbs -        if self._ensure_callback and 'replica_uid' in metadata: -            self._ensure_callback(metadata['replica_uid']) -        # parse incoming document info -        doc_id = None -        rev = None -        content = None -        gen = None -        trans_id = None -        if number_of_changes > 0: -            try: -                entry = json.loads(data[1]) -                doc_id = entry['id'] -                rev = entry['rev'] -                content = entry['content'] -                gen = entry['gen'] -                trans_id = entry['trans_id'] -            except (IndexError, KeyError): -                raise errors.BrokenSyncStream -        return new_generation, new_transaction_id, number_of_changes, \ -            doc_id, rev, content, gen, trans_id - -    def _setup_sync_decr_pool(self): -        """ -        Set up the SyncDecrypterPool for deferred decryption. -        """ -        if self._sync_decr_pool is None and self._sync_db is not None: -            # initialize syncing queue decryption pool -            self._sync_decr_pool = SyncDecrypterPool( -                self._crypto, -                self._sync_db, -                insert_doc_cb=self._insert_doc_cb, -                source_replica_uid=self.source_replica_uid) - -    def _http_request(self, url, method='GET', body=None, headers={}): -        d = self._http.request(url, method, body, headers, readBody) -        d.addErrback(_unauth_to_invalid_token_error) -        return d - - -def _unauth_to_invalid_token_error(failure): -    """ -    An errback to translate unauthorized errors to our own invalid token -    class. - -    :param failure: The original failure. -    :type failure: twisted.python.failure.Failure - -    :return: Either the original failure or an invalid auth token error. -    :rtype: twisted.python.failure.Failure -    """ -    failure.trap(Error) -    if failure.getErrorMessage() == "401 Unauthorized": -        raise InvalidAuthTokenError -    return failure diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py new file mode 100644 index 00000000..7a5cea9f --- /dev/null +++ b/client/src/leap/soledad/client/http_target/__init__.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +A U1DB backend for encrypting data before sending to server and decrypting +after receiving. +""" + + +import logging + +from leap.common.http import HTTPClient +from leap.soledad.client.http_target.send import HTTPDocSender +from leap.soledad.client.http_target.api import SyncTargetAPI +from leap.soledad.client.http_target.fetch import HTTPDocFetcher + + +logger = logging.getLogger(__name__) + + +class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): + +    """ +    A SyncTarget that encrypts data before sending and decrypts data after +    receiving. + +    Normally encryption will have been written to the sync database upon +    document modification. The sync database is also used to write temporarily +    the parsed documents that the remote send us, before being decrypted and +    written to the main database. +    """ +    def __init__(self, url, source_replica_uid, creds, crypto, cert_file, +                 sync_db=None, sync_enc_pool=None): +        """ +        Initialize the sync target. + +        :param url: The server sync url. +        :type url: str +        :param source_replica_uid: The source replica uid which we use when +                                   deferring decryption. +        :type source_replica_uid: str +        :param creds: A dictionary containing the uuid and token. +        :type creds: creds +        :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt +                        document contents when syncing. +        :type crypto: soledad.crypto.SoledadCrypto +        :param cert_file: Path to the certificate of the ca used to validate +                          the SSL certificate used by the remote soledad +                          server. +        :type cert_file: str +        :param sync_db: Optional. handler for the db with the symmetric +                        encryption of the syncing documents. If +                        None, encryption will be done in-place, +                        instead of retreiving it from the dedicated +                        database. +        :type sync_db: Sqlite handler +        :param sync_enc_pool: The encryption pool to use to defer encryption. +                              If None is passed the encryption will not be +                              deferred. +        :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool +        """ +        if url.endswith("/"): +            url = url[:-1] +        self._url = str(url) + "/sync-from/" + str(source_replica_uid) +        self.source_replica_uid = source_replica_uid +        self._auth_header = None +        self.set_creds(creds) +        self._crypto = crypto +        self._sync_db = sync_db +        self._sync_enc_pool = sync_enc_pool +        self._insert_doc_cb = None +        # asynchronous encryption/decryption attributes +        self._decryption_callback = None +        self._sync_decr_pool = None +        self._http = HTTPClient(cert_file) diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py new file mode 100644 index 00000000..dcc762f6 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/api.py @@ -0,0 +1,229 @@ +# -*- coding: utf-8 -*- +# api.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +import base64 + +from uuid import uuid4 +from u1db import SyncTarget + +from twisted.web.error import Error +from twisted.internet import defer + +from leap.soledad.common.errors import InvalidAuthTokenError +from leap.soledad.client.http_target.support import readBody + + +class SyncTargetAPI(SyncTarget): +    """ +    Declares public methods and implements u1db.SyncTarget. +    """ + +    @defer.inlineCallbacks +    def close(self): +        if self._sync_enc_pool: +            self._sync_enc_pool.stop() +        if self._sync_decr_pool: +            self._sync_decr_pool.stop() +        yield self._http.close() + +    def set_creds(self, creds): +        """ +        Update credentials. + +        :param creds: A dictionary containing the uuid and token. +        :type creds: dict +        """ +        uuid = creds['token']['uuid'] +        token = creds['token']['token'] +        auth = '%s:%s' % (uuid, token) +        b64_token = base64.b64encode(auth) +        self._auth_header = {'Authorization': ['Token %s' % b64_token]} + +    @property +    def _base_header(self): +        return self._auth_header.copy() if self._auth_header else {} + +    @property +    def _defer_encryption(self): +        return self._sync_enc_pool is not None + +    def _http_request(self, url, method='GET', body=None, headers=None, +                      content_type=None): +        headers = headers or self._base_header +        if content_type: +            headers.update({'content-type': [content_type]}) +        d = self._http.request(url, method, body, headers, readBody) +        d.addErrback(_unauth_to_invalid_token_error) +        return d + +    @defer.inlineCallbacks +    def get_sync_info(self, source_replica_uid): +        """ +        Return information about known state of remote database. + +        Return the replica_uid and the current database generation of the +        remote database, and its last-seen database generation for the client +        replica. + +        :param source_replica_uid: The client-size replica uid. +        :type source_replica_uid: str + +        :return: A deferred which fires with (target_replica_uid, +                 target_replica_generation, target_trans_id, +                 source_replica_last_known_generation, +                 source_replica_last_known_transaction_id) +        :rtype: twisted.internet.defer.Deferred +        """ +        raw = yield self._http_request(self._url) +        res = json.loads(raw) +        defer.returnValue(( +            res['target_replica_uid'], +            res['target_replica_generation'], +            res['target_replica_transaction_id'], +            res['source_replica_generation'], +            res['source_transaction_id'] +        )) + +    def record_sync_info( +            self, source_replica_uid, source_replica_generation, +            source_replica_transaction_id): +        """ +        Record tip information for another replica. + +        After sync_exchange has been processed, the caller will have +        received new content from this replica. This call allows the +        source replica instigating the sync to inform us what their +        generation became after applying the documents we returned. + +        This is used to allow future sync operations to not need to repeat data +        that we just talked about. It also means that if this is called at the +        wrong time, there can be database records that will never be +        synchronized. + +        :param source_replica_uid: The identifier for the source replica. +        :type source_replica_uid: str +        :param source_replica_generation: The database generation for the +                                          source replica. +        :type source_replica_generation: int +        :param source_replica_transaction_id: The transaction id associated +                                              with the source replica +                                              generation. +        :type source_replica_transaction_id: str + +        :return: A deferred which fires with the result of the query. +        :rtype: twisted.internet.defer.Deferred +        """ +        data = json.dumps({ +            'generation': source_replica_generation, +            'transaction_id': source_replica_transaction_id +        }) +        return self._http_request( +            self._url, +            method='PUT', +            body=data, +            content_type='application/json') + +    @defer.inlineCallbacks +    def sync_exchange(self, docs_by_generation, source_replica_uid, +                      last_known_generation, last_known_trans_id, +                      insert_doc_cb, ensure_callback=None, +                      defer_decryption=True, sync_id=None): +        """ +        Find out which documents the remote database does not know about, +        encrypt and send them. After that, receive documents from the remote +        database. + +        :param docs_by_generations: A list of (doc_id, generation, trans_id) +                                    of local documents that were changed since +                                    the last local generation the remote +                                    replica knows about. +        :type docs_by_generations: list of tuples + +        :param source_replica_uid: The uid of the source replica. +        :type source_replica_uid: str + +        :param last_known_generation: Target's last known generation. +        :type last_known_generation: int + +        :param last_known_trans_id: Target's last known transaction id. +        :type last_known_trans_id: str + +        :param insert_doc_cb: A callback for inserting received documents from +                              target. If not overriden, this will call u1db +                              insert_doc_from_target in synchronizer, which +                              implements the TAKE OTHER semantics. +        :type insert_doc_cb: function + +        :param ensure_callback: A callback that ensures we know the target +                                replica uid if the target replica was just +                                created. +        :type ensure_callback: function + +        :param defer_decryption: Whether to defer the decryption process using +                                 the intermediate database. If False, +                                 decryption will be done inline. +        :type defer_decryption: bool + +        :return: A deferred which fires with the new generation and +                 transaction id of the target replica. +        :rtype: twisted.internet.defer.Deferred +        """ + +        self._ensure_callback = ensure_callback + +        if sync_id is None: +            sync_id = str(uuid4()) +        self.source_replica_uid = source_replica_uid + +        # save a reference to the callback so we can use it after decrypting +        self._insert_doc_cb = insert_doc_cb + +        gen_after_send, trans_id_after_send = yield self._send_docs( +            docs_by_generation, +            last_known_generation, +            last_known_trans_id, +            sync_id) + +        cur_target_gen, cur_target_trans_id = yield self._receive_docs( +            last_known_generation, last_known_trans_id, +            ensure_callback, sync_id, +            defer_decryption=defer_decryption) + +        # update gen and trans id info in case we just sent and did not +        # receive docs. +        if gen_after_send is not None and gen_after_send > cur_target_gen: +            cur_target_gen = gen_after_send +            cur_target_trans_id = trans_id_after_send + +        defer.returnValue([cur_target_gen, cur_target_trans_id]) + + +def _unauth_to_invalid_token_error(failure): +    """ +    An errback to translate unauthorized errors to our own invalid token +    class. + +    :param failure: The original failure. +    :type failure: twisted.python.failure.Failure + +    :return: Either the original failure or an invalid auth token error. +    :rtype: twisted.python.failure.Failure +    """ +    failure.trap(Error) +    if failure.getErrorMessage() == "401 Unauthorized": +        raise InvalidAuthTokenError +    return failure diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py new file mode 100644 index 00000000..65e576d9 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -0,0 +1,252 @@ +# -*- coding: utf-8 -*- +# fetch.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import logging +import json +from u1db import errors +from u1db.remote import utils +from twisted.internet import defer +from leap.soledad.common.document import SoledadDocument +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import emit_async +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.encdecpool import SyncDecrypterPool +from leap.soledad.client.http_target.support import RequestBody + +logger = logging.getLogger(__name__) + + +class HTTPDocFetcher(object): +    """ +    Handles Document fetching from Soledad server, using HTTP as transport. +    Steps: +    * Prepares metadata by asking server for one document +    * Fetch the total on response and prepare to ask all remaining +    * (async) Documents will come encrypted. +              So we parse, decrypt and insert locally as they arrive. +    """ + +    @defer.inlineCallbacks +    def _receive_docs(self, last_known_generation, last_known_trans_id, +                      ensure_callback, sync_id, defer_decryption): + +        self._queue_for_decrypt = defer_decryption \ +            and self._sync_db is not None + +        new_generation = last_known_generation +        new_transaction_id = last_known_trans_id + +        if self._queue_for_decrypt: +            logger.debug( +                "Soledad sync: will queue received docs for decrypting.") + +        if defer_decryption: +            self._setup_sync_decr_pool() + +        # --------------------------------------------------------------------- +        # maybe receive the first document +        # --------------------------------------------------------------------- + +        # we fetch the first document before fetching the rest because we need +        # to know the total number of documents to be received, and this +        # information comes as metadata to each request. + +        doc = yield self._receive_one_doc( +            last_known_generation, last_known_trans_id, +            sync_id, 0) +        self._received_docs = 0 +        number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) + +        if ngen: +            new_generation = ngen +            new_transaction_id = ntrans + +        if defer_decryption: +            self._sync_decr_pool.start(number_of_changes) + +        # --------------------------------------------------------------------- +        # maybe receive the rest of the documents +        # --------------------------------------------------------------------- + +        # launch many asynchronous fetches and inserts of received documents +        # in the temporary sync db. Will wait for all results before +        # continuing. + +        received = 1 +        deferreds = [] +        while received < number_of_changes: +            d = self._receive_one_doc( +                last_known_generation, +                last_known_trans_id, sync_id, received) +            d.addCallback( +                self._insert_received_doc, +                received + 1,  # the index of the current received doc +                number_of_changes) +            deferreds.append(d) +            received += 1 +        results = yield defer.gatherResults(deferreds) + +        # get generation and transaction id of target after insertions +        if deferreds: +            _, new_generation, new_transaction_id = results.pop() + +        # --------------------------------------------------------------------- +        # wait for async decryption to finish +        # --------------------------------------------------------------------- + +        if defer_decryption: +            yield self._sync_decr_pool.deferred +            self._sync_decr_pool.stop() + +        defer.returnValue([new_generation, new_transaction_id]) + +    def _receive_one_doc(self, last_known_generation, +                         last_known_trans_id, sync_id, received): +        # add remote replica metadata to the request +        body = RequestBody( +            last_known_generation=last_known_generation, +            last_known_trans_id=last_known_trans_id, +            sync_id=sync_id, +            ensure=self._ensure_callback is not None) +        # inform server of how many documents have already been received +        body.insert_info(received=received) +        # send headers +        return self._http_request( +            self._url, +            method='POST', +            body=str(body), +            content_type='application/x-soledad-sync-get') + +    def _insert_received_doc(self, response, idx, total): +        """ +        Insert a received document into the local replica. + +        :param response: The body and headers of the response. +        :type response: tuple(str, dict) +        :param idx: The index count of the current operation. +        :type idx: int +        :param total: The total number of operations. +        :type total: int +        """ +        new_generation, new_transaction_id, number_of_changes, doc_id, \ +            rev, content, gen, trans_id = \ +            self._parse_received_doc_response(response) +        if doc_id is not None: +            # decrypt incoming document and insert into local database +            # ------------------------------------------------------------- +            # symmetric decryption of document's contents +            # ------------------------------------------------------------- +            # If arriving content was symmetrically encrypted, we decrypt it. +            # We do it inline if defer_decryption flag is False or no sync_db +            # was defined, otherwise we defer it writing it to the received +            # docs table. +            doc = SoledadDocument(doc_id, rev, content) +            if is_symmetrically_encrypted(doc): +                if self._queue_for_decrypt: +                    self._sync_decr_pool.insert_encrypted_received_doc( +                        doc.doc_id, doc.rev, doc.content, gen, trans_id, +                        idx) +                else: +                    # defer_decryption is False or no-sync-db fallback +                    doc.set_json(self._crypto.decrypt_doc(doc)) +                    self._insert_doc_cb(doc, gen, trans_id) +            else: +                # not symmetrically encrypted doc, insert it directly +                # or save it in the decrypted stage. +                if self._queue_for_decrypt: +                    self._sync_decr_pool.insert_received_doc( +                        doc.doc_id, doc.rev, doc.content, gen, trans_id, +                        idx) +                else: +                    self._insert_doc_cb(doc, gen, trans_id) +            # ------------------------------------------------------------- +            # end of symmetric decryption +            # ------------------------------------------------------------- +        self._received_docs += 1 +        _emit_receive_status(self._received_docs, total) +        return number_of_changes, new_generation, new_transaction_id + +    def _parse_received_doc_response(self, response): +        """ +        Parse the response from the server containing the received document. + +        :param response: The body and headers of the response. +        :type response: tuple(str, dict) + +        :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, +                 content, gen, trans_id) +        :rtype: tuple +        """ +        # decode incoming stream +        parts = response.splitlines() +        if not parts or parts[0] != '[' or parts[-1] != ']': +            raise errors.BrokenSyncStream +        data = parts[1:-1] +        # decode metadata +        try: +            line, comma = utils.check_and_strip_comma(data[0]) +            metadata = None +        except (IndexError): +            raise errors.BrokenSyncStream +        try: +            metadata = json.loads(line) +            new_generation = metadata['new_generation'] +            new_transaction_id = metadata['new_transaction_id'] +            number_of_changes = metadata['number_of_changes'] +        except (ValueError, KeyError): +            raise errors.BrokenSyncStream +        # make sure we have replica_uid from fresh new dbs +        if self._ensure_callback and 'replica_uid' in metadata: +            self._ensure_callback(metadata['replica_uid']) +        # parse incoming document info +        doc_id = None +        rev = None +        content = None +        gen = None +        trans_id = None +        if number_of_changes > 0: +            try: +                entry = json.loads(data[1]) +                doc_id = entry['id'] +                rev = entry['rev'] +                content = entry['content'] +                gen = entry['gen'] +                trans_id = entry['trans_id'] +            except (IndexError, KeyError): +                raise errors.BrokenSyncStream +        return new_generation, new_transaction_id, number_of_changes, \ +            doc_id, rev, content, gen, trans_id + +    def _setup_sync_decr_pool(self): +        """ +        Set up the SyncDecrypterPool for deferred decryption. +        """ +        if self._sync_decr_pool is None and self._sync_db is not None: +            # initialize syncing queue decryption pool +            self._sync_decr_pool = SyncDecrypterPool( +                self._crypto, +                self._sync_db, +                insert_doc_cb=self._insert_doc_cb, +                source_replica_uid=self.source_replica_uid) + + +def _emit_receive_status(received_docs, total): +    content = {'received': received_docs, 'total': total} +    emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, content) + +    if received_docs % 20 == 0: +        msg = "%d/%d" % (received_docs, total) +        logger.debug("Sync receive status: %s" % msg) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py new file mode 100644 index 00000000..80483f0d --- /dev/null +++ b/client/src/leap/soledad/client/http_target/send.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# send.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +import logging +from twisted.internet import defer +from leap.soledad.client.events import emit_async +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.http_target.support import RequestBody +logger = logging.getLogger(__name__) + + +class HTTPDocSender(object): +    """ +    Handles Document uploading from Soledad server, using HTTP as transport. +    They need to be encrypted and metadata prepared before sending. +    """ + +    @defer.inlineCallbacks +    def _send_docs(self, docs_by_generation, last_known_generation, +                   last_known_trans_id, sync_id): + +        if not docs_by_generation: +            defer.returnValue([None, None]) + +        # add remote replica metadata to the request +        body = RequestBody( +            last_known_generation=last_known_generation, +            last_known_trans_id=last_known_trans_id, +            sync_id=sync_id, +            ensure=self._ensure_callback is not None) +        total = len(docs_by_generation) +        for idx, entry in enumerate(docs_by_generation, 1): +            yield self._prepare_one_doc(entry, body, idx, total) +            result = yield self._http_request( +                self._url, +                method='POST', +                body=body.pop(1), +                content_type='application/x-soledad-sync-put') +            if self._defer_encryption: +                self._delete_sent(idx, docs_by_generation) +            _emit_send_status(idx, total) +        response_dict = json.loads(result)[0] +        gen_after_send = response_dict['new_generation'] +        trans_id_after_send = response_dict['new_transaction_id'] +        defer.returnValue([gen_after_send, trans_id_after_send]) + +    def _delete_sent(self, idx, docs_by_generation): +        doc = docs_by_generation[idx - 1][0] +        self._sync_enc_pool.delete_encrypted_doc( +            doc.doc_id, doc.rev) + +    @defer.inlineCallbacks +    def _prepare_one_doc(self, entry, body, idx, total): +        doc, gen, trans_id = entry +        content = yield self._encrypt_doc(doc) +        body.insert_info( +            id=doc.doc_id, rev=doc.rev, content=content, gen=gen, +            trans_id=trans_id, number_of_docs=total, +            doc_idx=idx) + +    def _encrypt_doc(self, doc): +        d = None +        if doc.is_tombstone(): +            d = defer.succeed(None) +        elif not self._defer_encryption: +            # fallback case, for tests +            d = defer.succeed(self._crypto.encrypt_doc(doc)) +        else: + +            def _maybe_encrypt_doc_inline(doc_json): +                if doc_json is None: +                    # the document is not marked as tombstone, but we got +                    # nothing from the sync db. As it is not encrypted +                    # yet, we force inline encryption. +                    return self._crypto.encrypt_doc(doc) +                return doc_json + +            d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) +            d.addCallback(_maybe_encrypt_doc_inline) +        return d + + +def _emit_send_status(idx, total): +    content = {'sent': idx, 'total': total} +    emit_async(SOLEDAD_SYNC_SEND_STATUS, content) + +    msg = "%d/%d" % (idx, total) +    logger.debug("Sync send status: %s" % msg) diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py new file mode 100644 index 00000000..44cd7089 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/support.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- +# support.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import warnings +import json +from u1db import errors +from u1db.remote import http_errors +from twisted.internet import defer +from twisted.web.client import _ReadBodyProtocol +from twisted.web.client import PartialDownloadError +from twisted.web._newclient import ResponseDone +from twisted.web._newclient import PotentialDataLoss + + +# we want to make sure that HTTP errors will raise appropriate u1db errors, +# that is, fire errbacks with the appropriate failures, in the context of +# twisted. Because of that, we redefine the http body reader used by the HTTP +# client below. + +class ReadBodyProtocol(_ReadBodyProtocol): +    """ +    From original Twisted implementation, focused on adding our error +    handling and ensuring that the proper u1db error is raised. +    """ + +    def __init__(self, response, deferred): +        """ +        Initialize the protocol, additionally storing the response headers. +        """ +        _ReadBodyProtocol.__init__( +            self, response.code, response.phrase, deferred) +        self.headers = response.headers + +    # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks +    def _error(self, respdic): +        descr = respdic.get("error") +        exc_cls = errors.wire_description_to_exc.get(descr) +        if exc_cls is not None: +            message = respdic.get("message") +            self.deferred.errback(exc_cls(message)) +    # ---8<--- end of snippet from u1db.remote.http_client + +    def connectionLost(self, reason): +        """ +        Deliver the accumulated response bytes to the waiting L{Deferred}, if +        the response body has been completely received without error. +        """ +        if reason.check(ResponseDone): + +            body = b''.join(self.dataBuffer) + +            # ---8<--- snippet from u1db.remote.http_client +            if self.status in (200, 201): +                self.deferred.callback(body) +            elif self.status in http_errors.ERROR_STATUSES: +                try: +                    respdic = json.loads(body) +                except ValueError: +                    self.deferred.errback( +                        errors.HTTPError(self.status, body, self.headers)) +                else: +                    self._error(respdic) +            # special cases +            elif self.status == 503: +                self.deferred.errback(errors.Unavailable(body, self.headers)) +            else: +                self.deferred.errback( +                    errors.HTTPError(self.status, body, self.headers)) +            # ---8<--- end of snippet from u1db.remote.http_client + +        elif reason.check(PotentialDataLoss): +            self.deferred.errback( +                PartialDownloadError(self.status, self.message, +                                     b''.join(self.dataBuffer))) +        else: +            self.deferred.errback(reason) + + +def readBody(response): +    """ +    Get the body of an L{IResponse} and return it as a byte string. + +    This is a helper function for clients that don't want to incrementally +    receive the body of an HTTP response. + +    @param response: The HTTP response for which the body will be read. +    @type response: L{IResponse} provider + +    @return: A L{Deferred} which will fire with the body of the response. +        Cancelling it will close the connection to the server immediately. +    """ +    def cancel(deferred): +        """ +        Cancel a L{readBody} call, close the connection to the HTTP server +        immediately, if it is still open. + +        @param deferred: The cancelled L{defer.Deferred}. +        """ +        abort = getAbort() +        if abort is not None: +            abort() + +    d = defer.Deferred(cancel) +    protocol = ReadBodyProtocol(response, d) + +    def getAbort(): +        return getattr(protocol.transport, 'abortConnection', None) + +    response.deliverBody(protocol) + +    if protocol.transport is not None and getAbort() is None: +        warnings.warn( +            'Using readBody with a transport that does not have an ' +            'abortConnection method', +            category=DeprecationWarning, +            stacklevel=2) + +    return d + + +class RequestBody(object): +    """ +    This class is a helper to generate send and fetch requests. +    The expected format is something like: +    [ +    {headers}, +    {entry1}, +    {...}, +    {entryN}, +    ] +    """ + +    def __init__(self, **header_dict): +        """ +        Creates a new RequestBody holding header information. + +        :param header_dict: A dictionary with the headers. +        :type header_dict: dict +        """ +        self.headers = header_dict +        self.entries = [] + +    def insert_info(self, **entry_dict): +        """ +        Dumps an entry into JSON format and add it to entries list. + +        :param entry_dict: Entry as a dictionary +        :type entry_dict: dict + +        :return: length of the entry after JSON dumps +        :rtype: int +        """ +        entry = json.dumps(entry_dict) +        self.entries.append(entry) +        return len(entry) + +    def pop(self, number=1): +        """ +        Removes an amount of entries and returns it formatted and ready +        to be sent. + +        :param number: number of entries to pop and format +        :type number: int + +        :return: formatted body ready to be sent +        :rtype: str +        """ +        entries = [self.entries.pop(0) for i in xrange(number)] +        return self.entries_to_str(entries) + +    def __str__(self): +        return self.entries_to_str(self.entries) + +    def __len__(self): +        return len(self.entries) + +    def entries_to_str(self, entries=None): +        """ +        Format a list of entries into the body format expected +        by the server. + +        :param entries: entries to format +        :type entries: list + +        :return: formatted body ready to be sent +        :rtype: str +        """ +        data = '[\r\n' + json.dumps(self.headers) +        data += ''.join(',\r\n' + entry for entry in entries) +        return data + '\r\n]' diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index ee3aacdb..c3c3dff5 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -261,6 +261,16 @@ class SoledadSecrets(object):          logger.info("Could not find a secret in local storage.")          return False +    def _maybe_set_active_secret(self, active_secret): +        """ +        If no secret_id is already set, choose the passed active secret, or +        just choose first secret available if none. +        """ +        if not self._secret_id: +            if not active_secret: +                active_secret = self._secrets.items()[0][0] +            self.set_secret_id(active_secret) +      def _load_secrets(self):          """          Load storage secrets from local file. @@ -270,12 +280,7 @@ class SoledadSecrets(object):          with open(self._secrets_path, 'r') as f:              content = json.loads(f.read())          _, active_secret = self._import_recovery_document(content) -        # choose first secret if no secret_id was given -        if self._secret_id is None: -            if active_secret is None: -                self.set_secret_id(self._secrets.items()[0][0]) -            else: -                self.set_secret_id(active_secret) +        self._maybe_set_active_secret(active_secret)          # enlarge secret if needed          enlarged = False          if len(self._secrets[self._secret_id]) < self.GEN_SECRET_LENGTH: @@ -306,12 +311,8 @@ class SoledadSecrets(object):                  'Found cryptographic secrets in shared recovery '                  'database.')              _, active_secret = self._import_recovery_document(doc.content) +            self._maybe_set_active_secret(active_secret)              self._store_secrets()  # save new secrets in local file -            if self._secret_id is None: -                if active_secret is None: -                    self.set_secret_id(self._secrets.items()[0][0]) -                else: -                    self.set_secret_id(active_secret)          else:              # STAGE 3 - there are no secrets in server also, so              # generate a secret and store it in remote db. @@ -432,13 +433,13 @@ class SoledadSecrets(object):          :return: a document with encrypted key material in its contents          :rtype: document.SoledadDocument          """ -        events.emit(events.SOLEDAD_DOWNLOADING_KEYS, self._uuid) +        events.emit_async(events.SOLEDAD_DOWNLOADING_KEYS, self._uuid)          db = self._shared_db          if not db:              logger.warning('No shared db found')              return          doc = db.get_doc(self._shared_db_doc_id()) -        events.emit(events.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid) +        events.emit_async(events.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid)          return doc      def _put_secrets_in_shared_db(self): @@ -461,13 +462,13 @@ class SoledadSecrets(object):          # fill doc with encrypted secrets          doc.content = self._export_recovery_document()          # upload secrets to server -        events.emit(events.SOLEDAD_UPLOADING_KEYS, self._uuid) +        events.emit_async(events.SOLEDAD_UPLOADING_KEYS, self._uuid)          db = self._shared_db          if not db:              logger.warning('No shared db found')              return          db.put_doc(doc) -        events.emit(events.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid) +        events.emit_async(events.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid)      #      # Management of secret for symmetric encryption. @@ -587,13 +588,13 @@ class SoledadSecrets(object):          :return: The id of the generated secret.          :rtype: str          """ -        events.emit(events.SOLEDAD_CREATING_KEYS, self._uuid) +        events.emit_async(events.SOLEDAD_CREATING_KEYS, self._uuid)          # generate random secret          secret = os.urandom(self.GEN_SECRET_LENGTH)          secret_id = sha256(secret).hexdigest()          self._secrets[secret_id] = secret          self._store_secrets() -        events.emit(events.SOLEDAD_DONE_CREATING_KEYS, self._uuid) +        events.emit_async(events.SOLEDAD_DONE_CREATING_KEYS, self._uuid)          return secret_id      def _store_secrets(self): diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 2151884a..22ddc87d 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -559,6 +559,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          """          Close the syncer and syncdb orderly          """ +        super(SQLCipherU1DBSync, self).close()          # close all open syncers          for url in self._syncers.keys():              _, syncer = self._syncers[url] diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 6c28e0be..1c762036 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -87,8 +87,7 @@ class CouchDocument(SoledadDocument):      atomic and consistent update of the database.      """ -    def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, -                 syncable=True): +    def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False):          """          Container for handling a document that is stored in couch backend. @@ -100,27 +99,10 @@ class CouchDocument(SoledadDocument):          :type json: str          :param has_conflicts: Boolean indicating if this document has conflicts          :type has_conflicts: bool -        :param syncable: Should this document be synced with remote replicas? -        :type syncable: bool          """          SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) -        self._couch_rev = None -        self._conflicts = None -        self._transactions = None - -    def _ensure_fetch_conflicts(self, get_conflicts_fun): -        """ -        Ensure conflict data has been fetched from the server. - -        :param get_conflicts_fun: A function which, given the document id and -                                  the couch revision, return the conflicted -                                  versions of the current document. -        :type get_conflicts_fun: function -        """ -        if self._conflicts is None: -            self._conflicts = get_conflicts_fun(self.doc_id, -                                                couch_rev=self.couch_rev) -        self.has_conflicts = len(self._conflicts) > 0 +        self.couch_rev = None +        self.transactions = None      def get_conflicts(self):          """ @@ -149,7 +131,7 @@ class CouchDocument(SoledadDocument):          :type doc: CouchDocument          """          if self._conflicts is None: -            raise Exception("Run self._ensure_fetch_conflicts first!") +            raise Exception("Fetch conflicts first!")          self._conflicts.append(doc)          self.has_conflicts = len(self._conflicts) > 0 @@ -161,27 +143,48 @@ class CouchDocument(SoledadDocument):          :type conflict_revs: [str]          """          if self._conflicts is None: -            raise Exception("Run self._ensure_fetch_conflicts first!") +            raise Exception("Fetch conflicts first!")          self._conflicts = filter(              lambda doc: doc.rev not in conflict_revs,              self._conflicts)          self.has_conflicts = len(self._conflicts) > 0 -    def _get_couch_rev(self): -        return self._couch_rev - -    def _set_couch_rev(self, rev): -        self._couch_rev = rev - -    couch_rev = property(_get_couch_rev, _set_couch_rev) - -    def _get_transactions(self): -        return self._transactions +    def update(self, new_doc): +        # update info +        self.rev = new_doc.rev +        if new_doc.is_tombstone(): +            self.is_tombstone() +        else: +            self.content = new_doc.content +        self.has_conflicts = new_doc.has_conflicts -    def _set_transactions(self, rev): -        self._transactions = rev +    def prune_conflicts(self, doc_vcr, autoresolved_increment): +        """ +        Prune conflicts that are older then the current document's revision, or +        whose content match to the current document's content. +        Originally in u1db.CommonBackend -    transactions = property(_get_transactions, _set_transactions) +        :param doc: The document to have conflicts pruned. +        :type doc: CouchDocument +        :param doc_vcr: A vector clock representing the current document's +                        revision. +        :type doc_vcr: u1db.vectorclock.VectorClock +        """ +        if self.has_conflicts: +            autoresolved = False +            c_revs_to_prune = [] +            for c_doc in self._conflicts: +                c_vcr = vectorclock.VectorClockRev(c_doc.rev) +                if doc_vcr.is_newer(c_vcr): +                    c_revs_to_prune.append(c_doc.rev) +                elif self.same_content_as(c_doc): +                    c_revs_to_prune.append(c_doc.rev) +                    doc_vcr.maximize(c_vcr) +                    autoresolved = True +            if autoresolved: +                doc_vcr.increment(autoresolved_increment) +                self.rev = doc_vcr.as_str() +            self.delete_conflicts(c_revs_to_prune)  # monkey-patch the u1db http app to use CouchDocument @@ -482,13 +485,10 @@ class CouchDatabase(CommonBackend):          Ensure that the design documents used by the backend exist on the          couch database.          """ -        # we check for existence of one of the files, and put all of them if -        # that one does not exist -        try: -            self._database['_design/docs'] -            return -        except ResourceNotFound: -            for ddoc_name in ['docs', 'syncs', 'transactions']: +        for ddoc_name in ['docs', 'syncs', 'transactions']: +            try: +                self._database.info(ddoc_name) +            except ResourceNotFound:                  ddoc = json.loads(                      binascii.a2b_base64(                          getattr(ddocs, ddoc_name))) @@ -750,7 +750,6 @@ class CouchDatabase(CommonBackend):          if check_for_conflicts \                  and '_attachments' in result \                  and 'u1db_conflicts' in result['_attachments']: -            doc.has_conflicts = True              doc.set_conflicts(                  self._build_conflicts(                      doc.doc_id, @@ -1044,7 +1043,7 @@ class CouchDatabase(CommonBackend):              conflicts.append(doc)          return conflicts -    def _get_conflicts(self, doc_id, couch_rev=None): +    def get_doc_conflicts(self, doc_id, couch_rev=None):          """          Get the conflicted versions of a document. @@ -1059,32 +1058,21 @@ class CouchDatabase(CommonBackend):          """          # request conflicts attachment from server          params = {} +        conflicts = []          if couch_rev is not None:              params['rev'] = couch_rev  # restric document's couch revision +        else: +            # TODO: move into resource logic! +            first_entry = self._get_doc(doc_id, check_for_conflicts=True) +            conflicts.append(first_entry)          resource = self._database.resource(doc_id, 'u1db_conflicts')          try:              response = resource.get_json(**params) -            return self._build_conflicts( +            return conflicts + self._build_conflicts(                  doc_id, json.loads(response[2].read()))          except ResourceNotFound:              return [] -    def get_doc_conflicts(self, doc_id): -        """ -        Get the list of conflicts for the given document. - -        The order of the conflicts is such that the first entry is the value -        that would be returned by "get_doc". - -        :return: A list of the document entries that are conflicted. -        :rtype: [CouchDocument] -        """ -        conflict_docs = self._get_conflicts(doc_id) -        if len(conflict_docs) == 0: -            return [] -        this_doc = self._get_doc(doc_id, check_for_conflicts=True) -        return [this_doc] + conflict_docs -      def _get_replica_gen_and_trans_id(self, other_replica_uid):          """          Return the last known generation and transaction id for the other db @@ -1140,9 +1128,11 @@ class CouchDatabase(CommonBackend):          :param sync_id: The id of the current sync session.          :type sync_id: str          """ -        self._do_set_replica_gen_and_trans_id( -            other_replica_uid, other_generation, other_transaction_id, -            number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id) +        if other_replica_uid is not None and other_generation is not None: +            self._do_set_replica_gen_and_trans_id( +                other_replica_uid, other_generation, other_transaction_id, +                number_of_docs=number_of_docs, doc_idx=doc_idx, +                sync_id=sync_id)      def _do_set_replica_gen_and_trans_id(              self, other_replica_uid, other_generation, other_transaction_id, @@ -1206,70 +1196,6 @@ class CouchDatabase(CommonBackend):          except ResourceNotFound as e:              raise_missing_design_doc_error(e, ddoc_path) -    def _add_conflict(self, doc, my_doc_rev, my_content): -        """ -        Add a conflict to the document. - -        Note that this method does not actually update the backend; rather, it -        updates the CouchDocument object which will provide the conflict data -        when the atomic document update is made. - -        :param doc: The document to have conflicts added to. -        :type doc: CouchDocument -        :param my_doc_rev: The revision of the conflicted document. -        :type my_doc_rev: str -        :param my_content: The content of the conflicted document as a JSON -                           serialized string. -        :type my_content: str -        """ -        doc._ensure_fetch_conflicts(self._get_conflicts) -        doc.add_conflict( -            self._factory(doc_id=doc.doc_id, rev=my_doc_rev, -                          json=my_content)) - -    def _delete_conflicts(self, doc, conflict_revs): -        """ -        Delete the conflicted revisions from the list of conflicts of C{doc}. - -        Note that this method does not actually update the backend; rather, it -        updates the CouchDocument object which will provide the conflict data -        when the atomic document update is made. - -        :param doc: The document to have conflicts deleted. -        :type doc: CouchDocument -        :param conflict_revs: A list of the revisions to be deleted. -        :param conflict_revs: [str] -        """ -        doc._ensure_fetch_conflicts(self._get_conflicts) -        doc.delete_conflicts(conflict_revs) - -    def _prune_conflicts(self, doc, doc_vcr): -        """ -        Prune conflicts that are older then the current document's revision, or -        whose content match to the current document's content. - -        :param doc: The document to have conflicts pruned. -        :type doc: CouchDocument -        :param doc_vcr: A vector clock representing the current document's -                        revision. -        :type doc_vcr: u1db.vectorclock.VectorClock -        """ -        if doc.has_conflicts is True: -            autoresolved = False -            c_revs_to_prune = [] -            for c_doc in doc.get_conflicts(): -                c_vcr = vectorclock.VectorClockRev(c_doc.rev) -                if doc_vcr.is_newer(c_vcr): -                    c_revs_to_prune.append(c_doc.rev) -                elif doc.same_content_as(c_doc): -                    c_revs_to_prune.append(c_doc.rev) -                    doc_vcr.maximize(c_vcr) -                    autoresolved = True -            if autoresolved: -                doc_vcr.increment(self._replica_uid) -                doc.rev = doc_vcr.as_str() -            self._delete_conflicts(doc, c_revs_to_prune) -      def _force_doc_sync_conflict(self, doc):          """          Add a conflict and force a document put. @@ -1278,9 +1204,9 @@ class CouchDatabase(CommonBackend):          :type doc: CouchDocument          """          my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) -        self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) -        self._add_conflict(doc, my_doc.rev, my_doc.get_json()) -        doc.has_conflicts = True +        doc.prune_conflicts( +            vectorclock.VectorClockRev(doc.rev), self._replica_uid) +        doc.add_conflict(my_doc)          self._put_doc(my_doc, doc)      def resolve_doc(self, doc, conflicted_doc_revs): @@ -1325,14 +1251,14 @@ class CouchDatabase(CommonBackend):              # the newer doc version will supersede the one in the database, so              # we copy conflicts before updating the backend.              doc.set_conflicts(cur_doc.get_conflicts())  # copy conflicts over. -            self._delete_conflicts(doc, superseded_revs) +            doc.delete_conflicts(superseded_revs)              self._put_doc(cur_doc, doc)          else:              # the newer doc version does not supersede the one in the              # database, so we will add a conflict to the database and copy              # those over to the document the user has in her hands. -            self._add_conflict(cur_doc, new_rev, doc.get_json()) -            self._delete_conflicts(cur_doc, superseded_revs) +            cur_doc.add_conflict(doc) +            cur_doc.delete_conflicts(superseded_revs)              self._put_doc(cur_doc, cur_doc)  # just update conflicts              # backend has been updated with current conflicts, now copy them              # to the current document. @@ -1392,65 +1318,33 @@ class CouchDatabase(CommonBackend):                   'converged', at_gen is the insertion/current generation.          :rtype: (str, int)          """ -        cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) -        # at this point, `doc` has arrived from the other syncing party, and -        # we will decide what to do with it. -        # First, we prepare the arriving doc to update couch database. -        old_doc = doc -        doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) -        if cur_doc is not None: -            doc.couch_rev = cur_doc.couch_rev -        # fetch conflicts because we will eventually manipulate them -        doc._ensure_fetch_conflicts(self._get_conflicts) -        # from now on, it works just like u1db sqlite backend -        doc_vcr = vectorclock.VectorClockRev(doc.rev) -        if cur_doc is None: -            cur_vcr = vectorclock.VectorClockRev(None) -        else: -            cur_vcr = vectorclock.VectorClockRev(cur_doc.rev) -        self._validate_source(replica_uid, replica_gen, replica_trans_id) -        if doc_vcr.is_newer(cur_vcr): -            rev = doc.rev -            self._prune_conflicts(doc, doc_vcr) -            if doc.rev != rev: -                # conflicts have been autoresolved -                state = 'superseded' -            else: -                state = 'inserted' -            self._put_doc(cur_doc, doc) -        elif doc.rev == cur_doc.rev: -            # magical convergence -            state = 'converged' -        elif cur_vcr.is_newer(doc_vcr): -            # Don't add this to seen_ids, because we have something newer, -            # so we should send it back, and we should not generate a -            # conflict -            state = 'superseded' -        elif cur_doc.same_content_as(doc): -            # the documents have been edited to the same thing at both ends -            doc_vcr.maximize(cur_vcr) -            doc_vcr.increment(self._replica_uid) -            doc.rev = doc_vcr.as_str() -            self._put_doc(cur_doc, doc) -            state = 'superseded' -        else: -            state = 'conflicted' -            if save_conflict: -                self._force_doc_sync_conflict(doc) -        if replica_uid is not None and replica_gen is not None: -            self._set_replica_gen_and_trans_id( -                replica_uid, replica_gen, replica_trans_id, -                number_of_docs=number_of_docs, doc_idx=doc_idx, -                sync_id=sync_id) -        # update info -        old_doc.rev = doc.rev -        if doc.is_tombstone(): -            old_doc.is_tombstone() -        else: -            old_doc.content = doc.content -        old_doc.has_conflicts = doc.has_conflicts +        if not isinstance(doc, CouchDocument): +            doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) +        self._save_source_info(replica_uid, replica_gen, +                               replica_trans_id, number_of_docs, +                               doc_idx, sync_id) +        my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) +        if my_doc is not None: +            my_doc.set_conflicts( +                self.get_doc_conflicts(my_doc.doc_id, my_doc.couch_rev)) +        state, save_doc = _process_incoming_doc( +            my_doc, doc, save_conflict, self.replica_uid) +        if save_doc: +            self._put_doc(my_doc, save_doc) +            doc.update(save_doc)          return state, self._get_generation() +    def _save_source_info(self, replica_uid, replica_gen, replica_trans_id, +                          number_of_docs, doc_idx, sync_id): +        """ +        Validate and save source information. +        """ +        self._validate_source(replica_uid, replica_gen, replica_trans_id) +        self._set_replica_gen_and_trans_id( +            replica_uid, replica_gen, replica_trans_id, +            number_of_docs=number_of_docs, doc_idx=doc_idx, +            sync_id=sync_id) +      def get_docs(self, doc_ids, check_for_conflicts=True,                   include_deleted=False):          """ @@ -1495,6 +1389,13 @@ class CouchDatabase(CommonBackend):                  continue              yield t._doc +    def _prune_conflicts(self, doc, doc_vcr): +        """ +        Overrides original method, but it is implemented elsewhere for +        simplicity. +        """ +        doc.prune_conflicts(doc_vcr, self._replica_uid) +      def _new_resource(self, *path):          """          Return a new resource for accessing a couch database. @@ -1546,7 +1447,7 @@ class CouchServerState(ServerState):          :param couch_url: The URL for the couch database.          :type couch_url: str          """ -        self._couch_url = couch_url +        self.couch_url = couch_url      def open_database(self, dbname):          """ @@ -1559,7 +1460,7 @@ class CouchServerState(ServerState):          :rtype: CouchDatabase          """          return CouchDatabase( -            self._couch_url, +            self.couch_url,              dbname,              ensure_ddocs=False) @@ -1594,21 +1495,52 @@ class CouchServerState(ServerState):          """          raise Unauthorized() -    def _set_couch_url(self, url): -        """ -        Set the couchdb URL - -        :param url: CouchDB URL -        :type url: str -        """ -        self._couch_url = url - -    def _get_couch_url(self): -        """ -        Return CouchDB URL -        :rtype: str -        """ -        return self._couch_url - -    couch_url = property(_get_couch_url, _set_couch_url, doc='CouchDB URL') +def _process_incoming_doc(my_doc, other_doc, save_conflict, replica_uid): +    """ +    Check document, save and return state. +    """ +    # at this point, `doc` has arrived from the other syncing party, and +    # we will decide what to do with it. +    # First, we prepare the arriving doc to update couch database. +    new_doc = CouchDocument( +        other_doc.doc_id, other_doc.rev, other_doc.get_json()) +    if my_doc is None: +        return 'inserted', new_doc +    new_doc.couch_rev = my_doc.couch_rev +    new_doc.set_conflicts(my_doc.get_conflicts()) +    # fetch conflicts because we will eventually manipulate them +    # from now on, it works just like u1db sqlite backend +    doc_vcr = vectorclock.VectorClockRev(new_doc.rev) +    cur_vcr = vectorclock.VectorClockRev(my_doc.rev) +    if doc_vcr.is_newer(cur_vcr): +        rev = new_doc.rev +        new_doc.prune_conflicts(doc_vcr, replica_uid) +        if new_doc.rev != rev: +            # conflicts have been autoresolved +            return 'superseded', new_doc +        else: +            return'inserted', new_doc +    elif new_doc.rev == my_doc.rev: +        # magical convergence +        return 'converged', None +    elif cur_vcr.is_newer(doc_vcr): +        # Don't add this to seen_ids, because we have something newer, +        # so we should send it back, and we should not generate a +        # conflict +        other_doc.update(new_doc) +        return 'superseded', None +    elif my_doc.same_content_as(new_doc): +        # the documents have been edited to the same thing at both ends +        doc_vcr.maximize(cur_vcr) +        doc_vcr.increment(replica_uid) +        new_doc.rev = doc_vcr.as_str() +        return 'superseded', new_doc +    else: +        if save_conflict: +            new_doc.prune_conflicts( +                vectorclock.VectorClockRev(new_doc.rev), replica_uid) +            new_doc.add_conflict(my_doc) +            return 'conflicted', new_doc +        other_doc.update(new_doc) +        return 'conflicted', None diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 468ad8d8..a08ffd16 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -25,6 +25,7 @@ import json  from urlparse import urljoin  from couchdb.client import Server +from uuid import uuid4  from testscenarios import TestWithScenarios @@ -42,7 +43,6 @@ from leap.soledad.common.tests.util import sync_via_synchronizer  from leap.soledad.common.tests.u1db_tests import test_backends  from leap.soledad.common.tests.u1db_tests import DatabaseBaseTests -from leap.soledad.common.tests.u1db_tests import TestCaseWithServer  from u1db.backends.inmemory import InMemoryIndex @@ -56,8 +56,8 @@ class TestCouchBackendImpl(CouchDBTestCase):      def test__allocate_doc_id(self):          db = couch.CouchDatabase.open_database(              urljoin( -                'http://localhost:' + str(self.wrapper.port), -                'u1db_tests' +                'http://localhost:' + str(self.couch_port), +                ('test-%s' % uuid4().hex)              ),              create=True,              ensure_ddocs=True) @@ -66,6 +66,7 @@ class TestCouchBackendImpl(CouchDBTestCase):          self.assertEqual(34, len(doc_id1))          int(doc_id1[len('D-'):], 16)          self.assertNotEqual(doc_id1, db._allocate_doc_id()) +        self.delete_db(db._dbname)  # ----------------------------------------------------------------------------- @@ -73,25 +74,28 @@ class TestCouchBackendImpl(CouchDBTestCase):  # -----------------------------------------------------------------------------  def make_couch_database_for_test(test, replica_uid): -    port = str(test.wrapper.port) -    return couch.CouchDatabase.open_database( -        urljoin('http://localhost:' + port, replica_uid), +    port = str(test.couch_port) +    dbname = ('test-%s' % uuid4().hex) +    db = couch.CouchDatabase.open_database( +        urljoin('http://localhost:' + port, dbname),          create=True,          replica_uid=replica_uid or 'test',          ensure_ddocs=True) +    test.addCleanup(test.delete_db, dbname) +    return db  def copy_couch_database_for_test(test, db): -    port = str(test.wrapper.port) +    port = str(test.couch_port)      couch_url = 'http://localhost:' + port -    new_dbname = db._replica_uid + '_copy' +    new_dbname = db._dbname + '_copy'      new_db = couch.CouchDatabase.open_database(          urljoin(couch_url, new_dbname),          create=True,          replica_uid=db._replica_uid or 'test')      # copy all docs      session = couch.Session() -    old_couch_db = Server(couch_url, session=session)[db._replica_uid] +    old_couch_db = Server(couch_url, session=session)[db._dbname]      new_couch_db = Server(couch_url, session=session)[new_dbname]      for doc_id in old_couch_db:          doc = old_couch_db.get(doc_id) @@ -143,24 +147,6 @@ class CouchTests(      scenarios = COUCH_SCENARIOS -    def setUp(self): -        test_backends.AllDatabaseTests.setUp(self) -        # save db info because of test_close -        self._url = self.db._url -        self._dbname = self.db._dbname - -    def tearDown(self): -        # if current test is `test_close` we have to use saved objects to -        # delete the database because the close() method will have removed the -        # references needed to do it using the CouchDatabase. -        if self.id().endswith('test_couch.CouchTests.test_close(couch)'): -            session = couch.Session() -            server = Server(url=self._url, session=session) -            del(server[self._dbname]) -        else: -            self.db.delete_database() -        test_backends.AllDatabaseTests.tearDown(self) -  class CouchDatabaseTests(          TestWithScenarios, @@ -169,10 +155,6 @@ class CouchDatabaseTests(      scenarios = COUCH_SCENARIOS -    def tearDown(self): -        self.db.delete_database() -        test_backends.LocalDatabaseTests.tearDown(self) -  class CouchValidateGenNTransIdTests(          TestWithScenarios, @@ -181,10 +163,6 @@ class CouchValidateGenNTransIdTests(      scenarios = COUCH_SCENARIOS -    def tearDown(self): -        self.db.delete_database() -        test_backends.LocalDatabaseValidateGenNTransIdTests.tearDown(self) -  class CouchValidateSourceGenTests(          TestWithScenarios, @@ -193,10 +171,6 @@ class CouchValidateSourceGenTests(      scenarios = COUCH_SCENARIOS -    def tearDown(self): -        self.db.delete_database() -        test_backends.LocalDatabaseValidateSourceGenTests.tearDown(self) -  class CouchWithConflictsTests(          TestWithScenarios, @@ -205,10 +179,6 @@ class CouchWithConflictsTests(          scenarios = COUCH_SCENARIOS -        def tearDown(self): -            self.db.delete_database() -            test_backends.LocalDatabaseWithConflictsTests.tearDown(self) -  # Notice: the CouchDB backend does not have indexing capabilities, so we do  # not test indexing now. @@ -237,7 +207,6 @@ nested_doc = tests.nested_doc  class CouchDatabaseSyncTargetTests(          TestWithScenarios,          DatabaseBaseTests, -        TestCaseWithServer,          CouchDBTestCase):      # TODO: implement _set_trace_hook(_shallow) in CouchSyncTarget so @@ -260,26 +229,13 @@ class CouchDatabaseSyncTargetTests(      def setUp(self):          CouchDBTestCase.setUp(self) -        # from DatabaseBaseTests.setUp -        self.db = self.create_database('test') -        # from TestCaseWithServer.setUp -        self.server = self.server_thread = self.port = None          # other stuff          self.db, self.st = self.create_db_and_target(self)          self.other_changes = []      def tearDown(self): +        self.db.close()          CouchDBTestCase.tearDown(self) -        # from TestCaseWithServer.tearDown -        if self.server is not None: -            self.server.shutdown() -            self.server_thread.join() -            self.server.server_close() -        if self.port: -            self.port.stopListening() -        # from DatabaseBaseTests.tearDown -        if hasattr(self, 'db') and self.db is not None: -            self.db.close()      def receive_doc(self, doc, gen, trans_id):          self.other_changes.append( @@ -724,17 +680,8 @@ class CouchDatabaseSyncTests(              self.db3, self.db1_copy, self.db2_copy          ]:              if db is not None: -                db.delete_database() +                self.delete_db(db._dbname)                  db.close() -        for replica_uid, dbname in [ -            ('test1_copy', 'source'), -            ('test2_copy', 'target'), -            ('test3', 'target') -        ]: -            db = self.create_database(replica_uid, dbname) -            db.delete_database() -            # cleanup connections to avoid leaking of file descriptors -            db.close()          DatabaseBaseTests.tearDown(self)      def assertLastExchangeLog(self, db, expected): @@ -1203,7 +1150,7 @@ class CouchDatabaseSyncTests(          self.db1 = self.create_database('test1', 'both')          self.db2 = self.create_database('test2', 'both')          doc1 = self.db1.create_doc_from_json('{"a": 1}', doc_id='the-doc') -        db3 = self.create_database('test3', 'both') +        self.db3 = self.create_database('test3', 'both')          self.sync(self.db2, self.db1)          self.assertEqual(              self.db1._get_generation_info(), @@ -1211,20 +1158,20 @@ class CouchDatabaseSyncTests(          self.assertEqual(              self.db2._get_generation_info(),              self.db1._get_replica_gen_and_trans_id(self.db2._replica_uid)) -        self.sync(db3, self.db1) +        self.sync(self.db3, self.db1)          # update on 2          doc2 = self.make_document('the-doc', doc1.rev, '{"a": 2}')          self.db2.put_doc(doc2) -        self.sync(self.db2, db3) -        self.assertEqual(db3.get_doc('the-doc').rev, doc2.rev) +        self.sync(self.db2, self.db3) +        self.assertEqual(self.db3.get_doc('the-doc').rev, doc2.rev)          # update on 1          doc1.set_json('{"a": 3}')          self.db1.put_doc(doc1)          # conflicts          self.sync(self.db2, self.db1) -        self.sync(db3, self.db1) +        self.sync(self.db3, self.db1)          self.assertTrue(self.db2.get_doc('the-doc').has_conflicts) -        self.assertTrue(db3.get_doc('the-doc').has_conflicts) +        self.assertTrue(self.db3.get_doc('the-doc').has_conflicts)          # resolve          conflicts = self.db2.get_doc_conflicts('the-doc')          doc4 = self.make_document('the-doc', None, '{"a": 4}') @@ -1233,38 +1180,38 @@ class CouchDatabaseSyncTests(          doc2 = self.db2.get_doc('the-doc')          self.assertEqual(doc4.get_json(), doc2.get_json())          self.assertFalse(doc2.has_conflicts) -        self.sync(self.db2, db3) -        doc3 = db3.get_doc('the-doc') +        self.sync(self.db2, self.db3) +        doc3 = self.db3.get_doc('the-doc')          self.assertEqual(doc4.get_json(), doc3.get_json())          self.assertFalse(doc3.has_conflicts)      def test_sync_supersedes_conflicts(self):          self.db1 = self.create_database('test1', 'both')          self.db2 = self.create_database('test2', 'target') -        db3 = self.create_database('test3', 'both') +        self.db3 = self.create_database('test3', 'both')          doc1 = self.db1.create_doc_from_json('{"a": 1}', doc_id='the-doc')          self.db2.create_doc_from_json('{"b": 1}', doc_id='the-doc') -        db3.create_doc_from_json('{"c": 1}', doc_id='the-doc') -        self.sync(db3, self.db1) +        self.db3.create_doc_from_json('{"c": 1}', doc_id='the-doc') +        self.sync(self.db3, self.db1)          self.assertEqual(              self.db1._get_generation_info(), -            db3._get_replica_gen_and_trans_id(self.db1._replica_uid)) +            self.db3._get_replica_gen_and_trans_id(self.db1._replica_uid))          self.assertEqual( -            db3._get_generation_info(), -            self.db1._get_replica_gen_and_trans_id(db3._replica_uid)) -        self.sync(db3, self.db2) +            self.db3._get_generation_info(), +            self.db1._get_replica_gen_and_trans_id(self.db3._replica_uid)) +        self.sync(self.db3, self.db2)          self.assertEqual(              self.db2._get_generation_info(), -            db3._get_replica_gen_and_trans_id(self.db2._replica_uid)) +            self.db3._get_replica_gen_and_trans_id(self.db2._replica_uid))          self.assertEqual( -            db3._get_generation_info(), -            self.db2._get_replica_gen_and_trans_id(db3._replica_uid)) -        self.assertEqual(3, len(db3.get_doc_conflicts('the-doc'))) +            self.db3._get_generation_info(), +            self.db2._get_replica_gen_and_trans_id(self.db3._replica_uid)) +        self.assertEqual(3, len(self.db3.get_doc_conflicts('the-doc')))          doc1.set_json('{"a": 2}')          self.db1.put_doc(doc1) -        self.sync(db3, self.db1) +        self.sync(self.db3, self.db1)          # original doc1 should have been removed from conflicts -        self.assertEqual(3, len(db3.get_doc_conflicts('the-doc'))) +        self.assertEqual(3, len(self.db3.get_doc_conflicts('the-doc')))      def test_sync_stops_after_get_sync_info(self):          self.db1 = self.create_database('test1', 'source') @@ -1283,79 +1230,78 @@ class CouchDatabaseSyncTests(          self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1')          self.assertRaises(              u1db_errors.InvalidReplicaUID, self.sync, self.db1, self.db2) -        # remove the reference to db2 to avoid double deleting on tearDown -        self.db2.close() -        self.db2 = None      def test_sync_detects_rollback_in_source(self):          self.db1 = self.create_database('test1', 'source')          self.db2 = self.create_database('test2', 'target')          self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1')          self.sync(self.db1, self.db2) -        db1_copy = self.copy_database(self.db1) +        self.db1_copy = self.copy_database(self.db1)          self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc2')          self.sync(self.db1, self.db2)          self.assertRaises( -            u1db_errors.InvalidGeneration, self.sync, db1_copy, self.db2) +            u1db_errors.InvalidGeneration, self.sync, self.db1_copy, self.db2)      def test_sync_detects_rollback_in_target(self):          self.db1 = self.create_database('test1', 'source')          self.db2 = self.create_database('test2', 'target')          self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent")          self.sync(self.db1, self.db2) -        db2_copy = self.copy_database(self.db2) +        self.db2_copy = self.copy_database(self.db2)          self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc2')          self.sync(self.db1, self.db2)          self.assertRaises( -            u1db_errors.InvalidGeneration, self.sync, self.db1, db2_copy) +            u1db_errors.InvalidGeneration, self.sync, self.db1, self.db2_copy)      def test_sync_detects_diverged_source(self):          self.db1 = self.create_database('test1', 'source')          self.db2 = self.create_database('test2', 'target') -        db3 = self.copy_database(self.db1) +        self.db3 = self.copy_database(self.db1)          self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent") -        db3.create_doc_from_json(tests.simple_doc, doc_id="divergent") +        self.db3.create_doc_from_json(tests.simple_doc, doc_id="divergent")          self.sync(self.db1, self.db2)          self.assertRaises( -            u1db_errors.InvalidTransactionId, self.sync, db3, self.db2) +            u1db_errors.InvalidTransactionId, self.sync, self.db3, self.db2)      def test_sync_detects_diverged_target(self):          self.db1 = self.create_database('test1', 'source')          self.db2 = self.create_database('test2', 'target') -        db3 = self.copy_database(self.db2) -        db3.create_doc_from_json(tests.nested_doc, doc_id="divergent") +        self.db3 = self.copy_database(self.db2) +        self.db3.create_doc_from_json(tests.nested_doc, doc_id="divergent")          self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent")          self.sync(self.db1, self.db2)          self.assertRaises( -            u1db_errors.InvalidTransactionId, self.sync, self.db1, db3) +            u1db_errors.InvalidTransactionId, self.sync, self.db1, self.db3)      def test_sync_detects_rollback_and_divergence_in_source(self):          self.db1 = self.create_database('test1', 'source')          self.db2 = self.create_database('test2', 'target')          self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1')          self.sync(self.db1, self.db2) -        db1_copy = self.copy_database(self.db1) +        self.db1_copy = self.copy_database(self.db1)          self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc2')          self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc3')          self.sync(self.db1, self.db2) -        db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2') -        db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3') +        self.db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2') +        self.db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3')          self.assertRaises( -            u1db_errors.InvalidTransactionId, self.sync, db1_copy, self.db2) +            u1db_errors.InvalidTransactionId, self.sync, +            self.db1_copy, self.db2)      def test_sync_detects_rollback_and_divergence_in_target(self):          self.db1 = self.create_database('test1', 'source')          self.db2 = self.create_database('test2', 'target')          self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent")          self.sync(self.db1, self.db2) -        db2_copy = self.copy_database(self.db2) +        self.db2_copy = self.copy_database(self.db2)          self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc2')          self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc3')          self.sync(self.db1, self.db2) -        db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2') -        db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3') +        self.db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2') +        self.db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3')          self.assertRaises( -            u1db_errors.InvalidTransactionId, self.sync, self.db1, db2_copy) +            u1db_errors.InvalidTransactionId, self.sync, +            self.db1, self.db2_copy)      def test_optional_sync_preserve_json(self):          self.db1 = self.create_database('test1', 'source') @@ -1373,10 +1319,14 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):      def setUp(self):          CouchDBTestCase.setUp(self) + +    def create_db(self, ensure=True, dbname=None): +        if not dbname: +            dbname = ('test-%s' % uuid4().hex)          self.db = couch.CouchDatabase.open_database( -            urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'), +            urljoin('http://127.0.0.1:%d' % self.couch_port, dbname),              create=True, -            ensure_ddocs=False)  # note that we don't enforce ddocs here +            ensure_ddocs=ensure)      def tearDown(self):          self.db.delete_database() @@ -1388,6 +1338,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          Test that all methods that access design documents will raise if the          design docs are not present.          """ +        self.create_db(ensure=False)          # _get_generation()          self.assertRaises(              errors.MissingDesignDocError, @@ -1418,10 +1369,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          Test that all methods that access design documents list functions          will raise if the functions are not present.          """ -        self.db = couch.CouchDatabase.open_database( -            urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'), -            create=True, -            ensure_ddocs=True) +        self.create_db(ensure=True)          # erase views from _design/transactions          transactions = self.db._database['_design/transactions']          transactions['lists'] = {} @@ -1448,10 +1396,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          Test that all methods that access design documents list functions          will raise if the functions are not present.          """ -        self.db = couch.CouchDatabase.open_database( -            urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'), -            create=True, -            ensure_ddocs=True) +        self.create_db(ensure=True)          # erase views from _design/transactions          transactions = self.db._database['_design/transactions']          del transactions['lists'] @@ -1478,10 +1423,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          Test that all methods that access design documents' named views  will          raise if the views are not present.          """ -        self.db = couch.CouchDatabase.open_database( -            urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'), -            create=True, -            ensure_ddocs=True) +        self.create_db(ensure=True)          # erase views from _design/docs          docs = self.db._database['_design/docs']          del docs['views'] @@ -1520,10 +1462,7 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          Test that all methods that access design documents will raise if the          design docs are not present.          """ -        self.db = couch.CouchDatabase.open_database( -            urljoin('http://127.0.0.1:%d' % self.wrapper.port, 'test'), -            create=True, -            ensure_ddocs=True) +        self.create_db(ensure=True)          # delete _design/docs          del self.db._database['_design/docs']          # delete _design/syncs @@ -1554,3 +1493,16 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          self.assertRaises(              errors.MissingDesignDocDeletedError,              self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) + +    def test_ensure_ddoc_independently(self): +        """ +        Test that a missing ddocs other than _design/docs will be ensured +        even if _design/docs is there. +        """ +        self.create_db(ensure=True) +        del self.db._database['_design/transactions'] +        self.assertRaises( +            errors.MissingDesignDocDeletedError, +            self.db._get_transaction_log) +        self.create_db(ensure=True, dbname=self.db._dbname) +        self.db._get_transaction_log() diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index c488822e..25f709ca 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -23,6 +23,7 @@ import threading  from urlparse import urljoin  from twisted.internet import defer +from uuid import uuid4  from leap.soledad.client import Soledad  from leap.soledad.common.couch import CouchDatabase, CouchServerState @@ -55,7 +56,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):      sync_target = soledad_sync_target -    def _soledad_instance(self, user='user-uuid', passphrase=u'123', +    def _soledad_instance(self, user=None, passphrase=u'123',                            prefix='',                            secrets_path='secrets.json',                            local_db_path='soledad.u1db', server_url='', @@ -63,6 +64,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):          """          Instantiate Soledad.          """ +        user = user or self.user          # this callback ensures we save a document which is sent to the shared          # db. @@ -83,15 +85,15 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):          return soledad      def make_app(self): -        self.request_state = CouchServerState(self._couch_url) +        self.request_state = CouchServerState(self.couch_url)          return self.make_app_after_state(self.request_state)      def setUp(self):          TestCaseWithServer.setUp(self)          CouchDBTestCase.setUp(self) -        self._couch_url = 'http://localhost:' + str(self.wrapper.port) +        self.user = ('user-%s' % uuid4().hex)          self.db = CouchDatabase.open_database( -            urljoin(self._couch_url, 'user-user-uuid'), +            urljoin(self.couch_url, 'user-' + self.user),              create=True,              replica_uid='replica',              ensure_ddocs=True) diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index 5ffa2a63..f512d6c1 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -50,7 +50,7 @@ from leap.soledad.server.auth import URLToAuthorization  def _couch_ensure_database(self, dbname):      db = CouchDatabase.open_database( -        self._couch_url + '/' + dbname, +        self.couch_url + '/' + dbname,          create=True,          ensure_ddocs=True)      return db, db._replica_uid @@ -325,7 +325,7 @@ class EncryptedSyncTestCase(              shared_db=self.get_default_shared_mock(_put_doc_side_effect))      def make_app(self): -        self.request_state = CouchServerState(self._couch_url) +        self.request_state = CouchServerState(self.couch_url)          return self.make_app_with_state(self.request_state)      def setUp(self): @@ -333,7 +333,6 @@ class EncryptedSyncTestCase(          # dependencies.          # XXX explain better          CouchDBTestCase.setUp(self) -        self._couch_url = 'http://localhost:' + str(self.wrapper.port)          self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")          TestCaseWithServer.setUp(self) @@ -368,7 +367,7 @@ class EncryptedSyncTestCase(          # ensure remote db exists before syncing          db = CouchDatabase.open_database( -            urljoin(self._couch_url, 'user-' + user), +            urljoin(self.couch_url, 'user-' + user),              create=True,              ensure_ddocs=True) @@ -494,27 +493,18 @@ class LockResourceTestCase(          # dependencies.          # XXX explain better          CouchDBTestCase.setUp(self) -        self._couch_url = 'http://localhost:' + str(self.wrapper.port)          self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")          TestCaseWithServer.setUp(self)          # create the databases -        CouchDatabase.open_database( -            urljoin(self._couch_url, 'shared'), -            create=True, -            ensure_ddocs=True) -        CouchDatabase.open_database( -            urljoin(self._couch_url, 'tokens'), +        db = CouchDatabase.open_database( +            urljoin(self.couch_url, ('shared-%s' % (uuid4().hex))),              create=True,              ensure_ddocs=True) -        self._state = CouchServerState(self._couch_url) +        self.addCleanup(db.delete_database) +        self._state = CouchServerState(self.couch_url) +        self._state.open_database = mock.Mock(return_value=db)      def tearDown(self): -        # delete remote database -        db = CouchDatabase.open_database( -            urljoin(self._couch_url, 'shared'), -            create=True, -            ensure_ddocs=True) -        db.delete_database()          CouchDBTestCase.tearDown(self)          TestCaseWithServer.tearDown(self) diff --git a/common/src/leap/soledad/common/tests/test_soledad.py b/common/src/leap/soledad/common/tests/test_soledad.py index bd356858..85d6734e 100644 --- a/common/src/leap/soledad/common/tests/test_soledad.py +++ b/common/src/leap/soledad/common/tests/test_soledad.py @@ -223,7 +223,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):      def setUp(self):          # mock signaling          soledad.client.signal = Mock() -        soledad.client.secrets.events.emit = Mock() +        soledad.client.secrets.events.emit_async = Mock()          # run parent's setUp          BaseSoledadTest.setUp(self) @@ -245,57 +245,57 @@ class SoledadSignalingTestCase(BaseSoledadTest):            - downloading keys / done downloading keys.            - uploading keys / done uploading keys.          """ -        soledad.client.secrets.events.emit.reset_mock() +        soledad.client.secrets.events.emit_async.reset_mock()          # get a fresh instance so it emits all bootstrap signals          sol = self._soledad_instance(              secrets_path='alternative_stage3.json',              local_db_path='alternative_stage3.u1db')          # reverse call order so we can verify in the order the signals were          # expected -        soledad.client.secrets.events.emit.mock_calls.reverse() -        soledad.client.secrets.events.emit.call_args = \ -            soledad.client.secrets.events.emit.call_args_list[0] -        soledad.client.secrets.events.emit.call_args_list.reverse() +        soledad.client.secrets.events.emit_async.mock_calls.reverse() +        soledad.client.secrets.events.emit_async.call_args = \ +            soledad.client.secrets.events.emit_async.call_args_list[0] +        soledad.client.secrets.events.emit_async.call_args_list.reverse()          # downloading keys signals -        soledad.client.secrets.events.emit.assert_called_with( +        soledad.client.secrets.events.emit_async.assert_called_with(              catalog.SOLEDAD_DOWNLOADING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.client.secrets.events.emit) -        soledad.client.secrets.events.emit.assert_called_with( +        self._pop_mock_call(soledad.client.secrets.events.emit_async) +        soledad.client.secrets.events.emit_async.assert_called_with(              catalog.SOLEDAD_DONE_DOWNLOADING_KEYS,              ADDRESS,          )          # creating keys signals -        self._pop_mock_call(soledad.client.secrets.events.emit) -        soledad.client.secrets.events.emit.assert_called_with( +        self._pop_mock_call(soledad.client.secrets.events.emit_async) +        soledad.client.secrets.events.emit_async.assert_called_with(              catalog.SOLEDAD_CREATING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.client.secrets.events.emit) -        soledad.client.secrets.events.emit.assert_called_with( +        self._pop_mock_call(soledad.client.secrets.events.emit_async) +        soledad.client.secrets.events.emit_async.assert_called_with(              catalog.SOLEDAD_DONE_CREATING_KEYS,              ADDRESS,          )          # downloading once more (inside _put_keys_in_shared_db) -        self._pop_mock_call(soledad.client.secrets.events.emit) -        soledad.client.secrets.events.emit.assert_called_with( +        self._pop_mock_call(soledad.client.secrets.events.emit_async) +        soledad.client.secrets.events.emit_async.assert_called_with(              catalog.SOLEDAD_DOWNLOADING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.client.secrets.events.emit) -        soledad.client.secrets.events.emit.assert_called_with( +        self._pop_mock_call(soledad.client.secrets.events.emit_async) +        soledad.client.secrets.events.emit_async.assert_called_with(              catalog.SOLEDAD_DONE_DOWNLOADING_KEYS,              ADDRESS,          )          # uploading keys signals -        self._pop_mock_call(soledad.client.secrets.events.emit) -        soledad.client.secrets.events.emit.assert_called_with( +        self._pop_mock_call(soledad.client.secrets.events.emit_async) +        soledad.client.secrets.events.emit_async.assert_called_with(              catalog.SOLEDAD_UPLOADING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.client.secrets.events.emit) -        soledad.client.secrets.events.emit.assert_called_with( +        self._pop_mock_call(soledad.client.secrets.events.emit_async) +        soledad.client.secrets.events.emit_async.assert_called_with(              catalog.SOLEDAD_DONE_UPLOADING_KEYS,              ADDRESS,          ) @@ -316,7 +316,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):          doc.content = sol.secrets._export_recovery_document()          sol.close()          # reset mock -        soledad.client.secrets.events.emit.reset_mock() +        soledad.client.secrets.events.emit_async.reset_mock()          # get a fresh instance so it emits all bootstrap signals          shared_db = self.get_default_shared_mock(get_doc_return_value=doc)          sol = self._soledad_instance( @@ -325,17 +325,17 @@ class SoledadSignalingTestCase(BaseSoledadTest):              shared_db_class=shared_db)          # reverse call order so we can verify in the order the signals were          # expected -        soledad.client.secrets.events.emit.mock_calls.reverse() -        soledad.client.secrets.events.emit.call_args = \ -            soledad.client.secrets.events.emit.call_args_list[0] -        soledad.client.secrets.events.emit.call_args_list.reverse() +        soledad.client.secrets.events.emit_async.mock_calls.reverse() +        soledad.client.secrets.events.emit_async.call_args = \ +            soledad.client.secrets.events.emit_async.call_args_list[0] +        soledad.client.secrets.events.emit_async.call_args_list.reverse()          # assert download keys signals -        soledad.client.secrets.events.emit.assert_called_with( +        soledad.client.secrets.events.emit_async.assert_called_with(              catalog.SOLEDAD_DOWNLOADING_KEYS,              ADDRESS,          ) -        self._pop_mock_call(soledad.client.secrets.events.emit) -        soledad.client.secrets.events.emit.assert_called_with( +        self._pop_mock_call(soledad.client.secrets.events.emit_async) +        soledad.client.secrets.events.emit_async.assert_called_with(              catalog.SOLEDAD_DONE_DOWNLOADING_KEYS,              ADDRESS,          ) @@ -369,7 +369,7 @@ class SoledadSignalingTestCase(BaseSoledadTest):          yield sol.sync()          # assert the signal has been emitted -        soledad.client.events.emit.assert_called_with( +        soledad.client.events.emit_async.assert_called_with(              catalog.SOLEDAD_DONE_DATA_SYNC,              ADDRESS,          ) diff --git a/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py b/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py index c57d6f61..439fc070 100644 --- a/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py +++ b/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py @@ -19,29 +19,26 @@ Test sqlcipher backend sync.  """ -import json +import os  from u1db import sync  from u1db import vectorclock  from u1db import errors +from uuid import uuid4  from testscenarios import TestWithScenarios -from urlparse import urljoin -from twisted.internet import defer - -from leap.soledad.common import couch  from leap.soledad.common.crypto import ENC_SCHEME_KEY  from leap.soledad.client.http_target import SoledadHTTPSyncTarget  from leap.soledad.client.crypto import decrypt_doc_dict -from leap.soledad.client.sqlcipher import SQLCipherDatabase  from leap.soledad.common.tests import u1db_tests as tests  from leap.soledad.common.tests.test_sqlcipher import SQLCIPHER_SCENARIOS  from leap.soledad.common.tests.util import make_soledad_app +from leap.soledad.common.tests.test_sync_target import \ +    SoledadDatabaseSyncTargetTests  from leap.soledad.common.tests.util import soledad_sync_target  from leap.soledad.common.tests.util import BaseSoledadTest -from leap.soledad.common.tests.util import SoledadWithCouchServerMixin  # ----------------------------------------------------------------------------- @@ -97,23 +94,6 @@ class SQLCipherDatabaseSyncTests(          self._use_tracking = {}          super(tests.DatabaseBaseTests, self).setUp() -    def tearDown(self): -        super(tests.DatabaseBaseTests, self).tearDown() -        if hasattr(self, 'db1') and isinstance(self.db1, SQLCipherDatabase): -            self.db1.close() -        if hasattr(self, 'db1_copy') \ -                and isinstance(self.db1_copy, SQLCipherDatabase): -            self.db1_copy.close() -        if hasattr(self, 'db2') \ -                and isinstance(self.db2, SQLCipherDatabase): -            self.db2.close() -        if hasattr(self, 'db2_copy') \ -                and isinstance(self.db2_copy, SQLCipherDatabase): -            self.db2_copy.close() -        if hasattr(self, 'db3') \ -                and isinstance(self.db3, SQLCipherDatabase): -            self.db3.close() -      def create_database(self, replica_uid, sync_role=None):          if replica_uid == 'test' and sync_role is None:              # created up the chain by base class but unused @@ -121,6 +101,7 @@ class SQLCipherDatabaseSyncTests(          db = self.create_database_for_role(replica_uid, sync_role)          if sync_role:              self._use_tracking[db] = (replica_uid, sync_role) +        self.addCleanup(db.close)          return db      def create_database_for_role(self, replica_uid, sync_role): @@ -729,38 +710,30 @@ class SQLCipherDatabaseSyncTests(              errors.InvalidTransactionId, self.sync, self.db1, self.db2_copy) -def _make_local_db_and_token_http_target(test, path='test'): +def make_local_db_and_soledad_target( +        test, path='test', +        source_replica_uid=uuid4().hex):      test.startTwistedServer() -    # ensure remote db exists before syncing -    db = couch.CouchDatabase.open_database( -        urljoin(test._couch_url, 'test'), -        create=True, -        replica_uid='test', -        ensure_ddocs=True) - -    replica_uid = test._soledad._dbpool.replica_uid +    replica_uid = os.path.basename(path) +    db = test.request_state._create_database(replica_uid)      sync_db = test._soledad._sync_db      sync_enc_pool = test._soledad._sync_enc_pool      st = soledad_sync_target( -        test, path, -        source_replica_uid=replica_uid, +        test, db._dbname, +        source_replica_uid=source_replica_uid,          sync_db=sync_db,          sync_enc_pool=sync_enc_pool)      return db, st  target_scenarios = [      ('leap', { -        'create_db_and_target': _make_local_db_and_token_http_target, +        'create_db_and_target': make_local_db_and_soledad_target,          'make_app_with_state': make_soledad_app,          'do_sync': sync_via_synchronizer_and_soledad}),  ] -class SQLCipherSyncTargetTests( -        TestWithScenarios, -        tests.DatabaseBaseTests, -        tests.TestCaseWithServer, -        SoledadWithCouchServerMixin): +class SQLCipherSyncTargetTests(SoledadDatabaseSyncTargetTests):      # TODO: implement _set_trace_hook(_shallow) in SoledadHTTPSyncTarget so      #       skipped tests can be succesfully executed. @@ -769,368 +742,3 @@ class SQLCipherSyncTargetTests(                                            target_scenarios))      whitebox = False - -    def setUp(self): -        super(tests.DatabaseBaseTests, self).setUp() -        self.db, self.st = self.create_db_and_target(self) -        self.addCleanup(self.st.close) -        self.other_changes = [] - -    def tearDown(self): -        super(tests.DatabaseBaseTests, self).tearDown() - -    def assertLastExchangeLog(self, db, expected): -        log = getattr(db, '_last_exchange_log', None) -        if log is None: -            return -        self.assertEqual(expected, log) - -    def receive_doc(self, doc, gen, trans_id): -        self.other_changes.append( -            (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id)) - -    def make_app(self): -        self.request_state = couch.CouchServerState(self._couch_url) -        return self.make_app_with_state(self.request_state) - -    def set_trace_hook(self, callback, shallow=False): -        setter = (self.st._set_trace_hook if not shallow else -                  self.st._set_trace_hook_shallow) -        try: -            setter(callback) -        except NotImplementedError: -            self.skipTest("%s does not implement _set_trace_hook" -                          % (self.st.__class__.__name__,)) - -    def test_get_sync_target(self): -        self.assertIsNot(None, self.st) - -    @defer.inlineCallbacks -    def test_get_sync_info(self): -        sync_info = yield self.st.get_sync_info('other') -        self.assertEqual( -            ('test', 0, '', 0, ''), sync_info) - -    @defer.inlineCallbacks -    def test_create_doc_updates_sync_info(self): -        sync_info = yield self.st.get_sync_info('other') -        self.assertEqual( -            ('test', 0, '', 0, ''), sync_info) -        self.db.create_doc_from_json(tests.simple_doc) -        sync_info = yield self.st.get_sync_info('other') -        self.assertEqual(1, sync_info[1]) - -    @defer.inlineCallbacks -    def test_record_sync_info(self): -        yield self.st.record_sync_info('replica', 10, 'T-transid') -        sync_info = yield self.st.get_sync_info('other') -        self.assertEqual( -            ('test', 0, '', 10, 'T-transid'), sync_info) - -    @defer.inlineCallbacks -    def test_sync_exchange(self): -        """ -        Modified to account for possibly receiving encrypted documents from -        sever-side. -        """ - -        docs_by_gen = [ -            (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10, -             'T-sid')] -        new_gen, trans_id = yield self.st.sync_exchange( -            docs_by_gen, 'replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc) -        self.assertGetEncryptedDoc( -            self.db, 'doc-id', 'replica:1', tests.simple_doc, False) -        self.assertTransactionLog(['doc-id'], self.db) -        last_trans_id = self.getLastTransId(self.db) -        self.assertEqual(([], 1, last_trans_id), -                         (self.other_changes, new_gen, last_trans_id)) -        sync_info = yield self.st.get_sync_info('replica') -        self.assertEqual(10, sync_info[3]) - -    @defer.inlineCallbacks -    def test_sync_exchange_push_many(self): -        """ -        Modified to account for possibly receiving encrypted documents from -        sever-side. -        """ -        docs_by_gen = [ -            (self.make_document( -                'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'), -            (self.make_document('doc-id2', 'replica:1', tests.nested_doc), 11, -             'T-2')] -        new_gen, trans_id = yield self.st.sync_exchange( -            docs_by_gen, 'replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc) -        self.assertGetEncryptedDoc( -            self.db, 'doc-id', 'replica:1', tests.simple_doc, False) -        self.assertGetEncryptedDoc( -            self.db, 'doc-id2', 'replica:1', tests.nested_doc, False) -        self.assertTransactionLog(['doc-id', 'doc-id2'], self.db) -        last_trans_id = self.getLastTransId(self.db) -        self.assertEqual(([], 2, last_trans_id), -                         (self.other_changes, new_gen, trans_id)) -        sync_info = yield self.st.get_sync_info('replica') -        self.assertEqual(11, sync_info[3]) - -    @defer.inlineCallbacks -    def test_sync_exchange_returns_many_new_docs(self): -        """ -        Modified to account for JSON serialization differences. -        """ -        doc = self.db.create_doc_from_json(tests.simple_doc) -        doc2 = self.db.create_doc_from_json(tests.nested_doc) -        self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) -        new_gen, _ = yield self.st.sync_exchange( -            [], 'other-replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc) -        self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) -        self.assertEqual(2, new_gen) -        self.assertEqual( -            [(doc.doc_id, doc.rev, 1), -             (doc2.doc_id, doc2.rev, 2)], -            [c[:2] + c[3:4] for c in self.other_changes]) -        self.assertEqual( -            json.dumps(tests.simple_doc), -            json.dumps(self.other_changes[0][2])) -        self.assertEqual( -            json.loads(tests.nested_doc), -            json.loads(self.other_changes[1][2])) -        if self.whitebox: -            self.assertEqual( -                self.db._last_exchange_log['return'], -                {'last_gen': 2, 'docs': -                 [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]}) - -    @defer.inlineCallbacks -    def test_sync_exchange_deleted(self): -        doc = self.db.create_doc_from_json('{}') -        edit_rev = 'replica:1|' + doc.rev -        docs_by_gen = [ -            (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')] -        new_gen, trans_id = yield self.st.sync_exchange( -            docs_by_gen, 'replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc) -        self.assertGetDocIncludeDeleted( -            self.db, doc.doc_id, edit_rev, None, False) -        self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) -        last_trans_id = self.getLastTransId(self.db) -        self.assertEqual(([], 2, last_trans_id), -                         (self.other_changes, new_gen, trans_id)) -        sync_info = yield self.st.get_sync_info('replica') -        self.assertEqual(10, sync_info[3]) - -    @defer.inlineCallbacks -    def test_sync_exchange_refuses_conflicts(self): -        doc = self.db.create_doc_from_json(tests.simple_doc) -        self.assertTransactionLog([doc.doc_id], self.db) -        new_doc = '{"key": "altval"}' -        docs_by_gen = [ -            (self.make_document(doc.doc_id, 'replica:1', new_doc), 10, -             'T-sid')] -        new_gen, _ = yield self.st.sync_exchange( -            docs_by_gen, 'replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc) -        self.assertTransactionLog([doc.doc_id], self.db) -        self.assertEqual( -            (doc.doc_id, doc.rev, tests.simple_doc, 1), -            self.other_changes[0][:-1]) -        self.assertEqual(1, new_gen) -        if self.whitebox: -            self.assertEqual(self.db._last_exchange_log['return'], -                             {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]}) - -    @defer.inlineCallbacks -    def test_sync_exchange_ignores_convergence(self): -        doc = self.db.create_doc_from_json(tests.simple_doc) -        self.assertTransactionLog([doc.doc_id], self.db) -        gen, txid = self.db._get_generation_info() -        docs_by_gen = [ -            (self.make_document( -                doc.doc_id, doc.rev, tests.simple_doc), 10, 'T-sid')] -        new_gen, _ = yield self.st.sync_exchange( -            docs_by_gen, 'replica', last_known_generation=gen, -            last_known_trans_id=txid, insert_doc_cb=self.receive_doc) -        self.assertTransactionLog([doc.doc_id], self.db) -        self.assertEqual(([], 1), (self.other_changes, new_gen)) - -    @defer.inlineCallbacks -    def test_sync_exchange_returns_new_docs(self): -        doc = self.db.create_doc_from_json(tests.simple_doc) -        self.assertTransactionLog([doc.doc_id], self.db) -        new_gen, _ = yield self.st.sync_exchange( -            [], 'other-replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc) -        self.assertTransactionLog([doc.doc_id], self.db) -        self.assertEqual( -            (doc.doc_id, doc.rev, tests.simple_doc, 1), -            self.other_changes[0][:-1]) -        self.assertEqual(1, new_gen) -        if self.whitebox: -            self.assertEqual(self.db._last_exchange_log['return'], -                             {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]}) - -    @defer.inlineCallbacks -    def test_sync_exchange_returns_deleted_docs(self): -        doc = self.db.create_doc_from_json(tests.simple_doc) -        self.db.delete_doc(doc) -        self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) -        new_gen, _ = yield self.st.sync_exchange( -            [], 'other-replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc) -        self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) -        self.assertEqual( -            (doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1]) -        self.assertEqual(2, new_gen) -        if self.whitebox: -            self.assertEqual(self.db._last_exchange_log['return'], -                             {'last_gen': 2, 'docs': [(doc.doc_id, doc.rev)]}) - -    @defer.inlineCallbacks -    def test_sync_exchange_getting_newer_docs(self): -        doc = self.db.create_doc_from_json(tests.simple_doc) -        self.assertTransactionLog([doc.doc_id], self.db) -        new_doc = '{"key": "altval"}' -        docs_by_gen = [ -            (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, -             'T-sid')] -        new_gen, _ = yield self.st.sync_exchange( -            docs_by_gen, 'other-replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc) -        self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) -        self.assertEqual(([], 2), (self.other_changes, new_gen)) - -    @defer.inlineCallbacks -    def test_sync_exchange_with_concurrent_updates_of_synced_doc(self): -        expected = [] - -        def before_whatschanged_cb(state): -            if state != 'before whats_changed': -                return -            cont = '{"key": "cuncurrent"}' -            conc_rev = self.db.put_doc( -                self.make_document(doc.doc_id, 'test:1|z:2', cont)) -            expected.append((doc.doc_id, conc_rev, cont, 3)) - -        self.set_trace_hook(before_whatschanged_cb) -        doc = self.db.create_doc_from_json(tests.simple_doc) -        self.assertTransactionLog([doc.doc_id], self.db) -        new_doc = '{"key": "altval"}' -        docs_by_gen = [ -            (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, -             'T-sid')] -        new_gen, _ = yield self.st.sync_exchange( -            docs_by_gen, 'other-replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc) -        self.assertEqual(expected, [c[:-1] for c in self.other_changes]) -        self.assertEqual(3, new_gen) - -    @defer.inlineCallbacks -    def test_sync_exchange_with_concurrent_updates(self): - -        def after_whatschanged_cb(state): -            if state != 'after whats_changed': -                return -            self.db.create_doc_from_json('{"new": "doc"}') - -        self.set_trace_hook(after_whatschanged_cb) -        doc = self.db.create_doc_from_json(tests.simple_doc) -        self.assertTransactionLog([doc.doc_id], self.db) -        new_doc = '{"key": "altval"}' -        docs_by_gen = [ -            (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, -             'T-sid')] -        new_gen, _ = yield self.st.sync_exchange( -            docs_by_gen, 'other-replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc) -        self.assertEqual(([], 2), (self.other_changes, new_gen)) - -    @defer.inlineCallbacks -    def test_sync_exchange_converged_handling(self): -        doc = self.db.create_doc_from_json(tests.simple_doc) -        docs_by_gen = [ -            (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'), -            (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5, -             'T-bar')] -        new_gen, _ = yield self.st.sync_exchange( -            docs_by_gen, 'other-replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc) -        self.assertEqual(([], 2), (self.other_changes, new_gen)) - -    @defer.inlineCallbacks -    def test_sync_exchange_detect_incomplete_exchange(self): -        def before_get_docs_explode(state): -            if state != 'before get_docs': -                return -            raise errors.U1DBError("fail") -        self.set_trace_hook(before_get_docs_explode) -        # suppress traceback printing in the wsgiref server -        # self.patch(simple_server.ServerHandler, -        #           'log_exception', lambda h, exc_info: None) -        doc = self.db.create_doc_from_json(tests.simple_doc) -        self.assertTransactionLog([doc.doc_id], self.db) -        with self.assertRaises((errors.U1DBError, errors.BrokenSyncStream)): -            yield self.st.sync_exchange( -                [], 'other-replica', -                last_known_generation=0, last_known_trans_id=None, -                insert_doc_cb=self.receive_doc) - -    @defer.inlineCallbacks -    def test_sync_exchange_doc_ids(self): -        sync_exchange_doc_ids = getattr(self.st, 'sync_exchange_doc_ids', None) -        if sync_exchange_doc_ids is None: -            self.skipTest("sync_exchange_doc_ids not implemented") -        db2 = self.create_database('test2') -        doc = db2.create_doc_from_json(tests.simple_doc) -        new_gen, trans_id = sync_exchange_doc_ids( -            db2, [(doc.doc_id, 10, 'T-sid')], 0, None, -            insert_doc_cb=self.receive_doc) -        self.assertGetDoc(self.db, doc.doc_id, doc.rev, -                          tests.simple_doc, False) -        self.assertTransactionLog([doc.doc_id], self.db) -        last_trans_id = self.getLastTransId(self.db) -        self.assertEqual(([], 1, last_trans_id), -                         (self.other_changes, new_gen, trans_id)) -        self.assertEqual(10, self.st.get_sync_info(db2._replica_uid)[3]) - -    @defer.inlineCallbacks -    def test__set_trace_hook(self): -        called = [] - -        def cb(state): -            called.append(state) - -        self.set_trace_hook(cb) -        yield self.st.sync_exchange([], 'replica', 0, None, self.receive_doc) -        yield self.st.record_sync_info('replica', 0, 'T-sid') -        self.assertEqual(['before whats_changed', -                          'after whats_changed', -                          'before get_docs', -                          'record_sync_info', -                          ], -                         called) - -    @defer.inlineCallbacks -    def test__set_trace_hook_shallow(self): -        if (self.st._set_trace_hook_shallow == self.st._set_trace_hook or -            self.st._set_trace_hook_shallow.im_func == -                SoledadHTTPSyncTarget._set_trace_hook_shallow.im_func): -            # shallow same as full -            expected = ['before whats_changed', -                        'after whats_changed', -                        'before get_docs', -                        'record_sync_info', -                        ] -        else: -            expected = ['sync_exchange', 'record_sync_info'] - -        called = [] - -        def cb(state): -            called.append(state) - -        self.set_trace_hook(cb, shallow=True) -        self.st.sync_exchange([], 'replica', 0, None, self.receive_doc) -        self.st.record_sync_info('replica', 0, 'T-sid') -        self.assertEqual(expected, called) diff --git a/common/src/leap/soledad/common/tests/test_sync.py b/common/src/leap/soledad/common/tests/test_sync.py index 14152370..1041367b 100644 --- a/common/src/leap/soledad/common/tests/test_sync.py +++ b/common/src/leap/soledad/common/tests/test_sync.py @@ -56,14 +56,13 @@ class InterruptableSyncTestCase(      sync_target = soledad_sync_target      def make_app(self): -        self.request_state = couch.CouchServerState(self._couch_url) +        self.request_state = couch.CouchServerState(self.couch_url)          return self.make_app_with_state(self.request_state)      def setUp(self):          TestCaseWithServer.setUp(self)          CouchDBTestCase.setUp(self)          self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") -        self._couch_url = 'http://localhost:' + str(self.wrapper.port)      def tearDown(self):          CouchDBTestCase.tearDown(self) @@ -103,7 +102,7 @@ class InterruptableSyncTestCase(          # ensure remote db exists before syncing          db = couch.CouchDatabase.open_database( -            urljoin(self._couch_url, 'user-user-uuid'), +            urljoin(self.couch_url, 'user-user-uuid'),              create=True,              ensure_ddocs=True) @@ -148,8 +147,8 @@ class InterruptableSyncTestCase(  class TestSoledadDbSync(          TestWithScenarios, -        tests.TestCaseWithServer, -        SoledadWithCouchServerMixin): +        SoledadWithCouchServerMixin, +        tests.TestCaseWithServer):      """      Test db.sync remote sync shortcut @@ -166,10 +165,6 @@ class TestSoledadDbSync(      oauth = False      token = False -    def make_app(self): -        self.request_state = couch.CouchServerState(self._couch_url) -        return self.make_app_with_state(self.request_state) -      def setUp(self):          """          Need to explicitely invoke inicialization on all bases. @@ -177,29 +172,22 @@ class TestSoledadDbSync(          SoledadWithCouchServerMixin.setUp(self)          self.startTwistedServer()          self.db = self.make_database_for_test(self, 'test1') -        self.db2 = couch.CouchDatabase.open_database( -            urljoin( -                'http://localhost:' + str(self.wrapper.port), -                'test' -            ), -            create=True, -            ensure_ddocs=True) +        self.db2 = self.request_state._create_database(replica_uid='test')      def tearDown(self):          """          Need to explicitely invoke destruction on all bases.          """ -        self.db2.delete_database()          SoledadWithCouchServerMixin.tearDown(self)          # tests.TestCaseWithServer.tearDown(self) -    def do_sync(self, target_name): +    def do_sync(self):          """          Perform sync using SoledadSynchronizer, SoledadSyncTarget          and Token auth.          """          target = soledad_sync_target( -            self, target_name, +            self, self.db2._dbname,              source_replica_uid=self._soledad._dbpool.replica_uid)          self.addCleanup(target.close)          return sync.SoledadSynchronizer( @@ -217,7 +205,7 @@ class TestSoledadDbSync(          doc1 = self.db.create_doc_from_json(tests.simple_doc)          doc2 = self.db2.create_doc_from_json(tests.nested_doc) -        local_gen_before_sync = yield self.do_sync('test') +        local_gen_before_sync = yield self.do_sync()          gen, _, changes = self.db.whats_changed(local_gen_before_sync)          self.assertEqual(1, len(changes))          self.assertEqual(doc2.doc_id, changes[0][0]) diff --git a/common/src/leap/soledad/common/tests/test_sync_deferred.py b/common/src/leap/soledad/common/tests/test_sync_deferred.py index ffb8a4ae..90b00670 100644 --- a/common/src/leap/soledad/common/tests/test_sync_deferred.py +++ b/common/src/leap/soledad/common/tests/test_sync_deferred.py @@ -59,6 +59,7 @@ class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin):      def setUp(self):          SoledadWithCouchServerMixin.setUp(self) +        self.startTwistedServer()          # config info          self.db1_file = os.path.join(self.tempdir, "db1.u1db")          os.unlink(self.db1_file) @@ -85,13 +86,7 @@ class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin):              defer_encryption=True, sync_db_key=sync_db_key)          self.db1 = SQLCipherDatabase(self.opts) -        self.db2 = couch.CouchDatabase.open_database( -            urljoin( -                'http://localhost:' + str(self.wrapper.port), -                'test' -            ), -            create=True, -            ensure_ddocs=True) +        self.db2 = self.request_state._create_database('test')      def tearDown(self):          # XXX should not access "private" attrs @@ -109,8 +104,8 @@ class SyncTimeoutError(Exception):  class TestSoledadDbSyncDeferredEncDecr(          TestWithScenarios, -        tests.TestCaseWithServer, -        BaseSoledadDeferredEncTest): +        BaseSoledadDeferredEncTest, +        tests.TestCaseWithServer):      """      Test db.sync remote sync shortcut. @@ -128,17 +123,12 @@ class TestSoledadDbSyncDeferredEncDecr(      oauth = False      token = True -    def make_app(self): -        self.request_state = couch.CouchServerState(self._couch_url) -        return self.make_app_with_state(self.request_state) -      def setUp(self):          """          Need to explicitely invoke inicialization on all bases.          """          BaseSoledadDeferredEncTest.setUp(self)          self.server = self.server_thread = None -        self.startTwistedServer()          self.syncer = None      def tearDown(self): @@ -150,7 +140,7 @@ class TestSoledadDbSyncDeferredEncDecr(              dbsyncer.close()          BaseSoledadDeferredEncTest.tearDown(self) -    def do_sync(self, target_name): +    def do_sync(self):          """          Perform sync using SoledadSynchronizer, SoledadSyncTarget          and Token auth. @@ -159,7 +149,7 @@ class TestSoledadDbSyncDeferredEncDecr(          sync_db = self._soledad._sync_db          sync_enc_pool = self._soledad._sync_enc_pool          target = soledad_sync_target( -            self, target_name, +            self, self.db2._dbname,              source_replica_uid=replica_uid,              sync_db=sync_db,              sync_enc_pool=sync_enc_pool) @@ -190,7 +180,7 @@ class TestSoledadDbSyncDeferredEncDecr(          """          doc1 = self.db1.create_doc_from_json(tests.simple_doc)          doc2 = self.db2.create_doc_from_json(tests.nested_doc) -        local_gen_before_sync = yield self.do_sync('test') +        local_gen_before_sync = yield self.do_sync()          gen, _, changes = self.db1.whats_changed(local_gen_before_sync)          self.assertEqual(1, len(changes)) diff --git a/common/src/leap/soledad/common/tests/test_sync_mutex.py b/common/src/leap/soledad/common/tests/test_sync_mutex.py index a904a940..2e2123a7 100644 --- a/common/src/leap/soledad/common/tests/test_sync_mutex.py +++ b/common/src/leap/soledad/common/tests/test_sync_mutex.py @@ -84,14 +84,14 @@ class TestSyncMutex(      sync_target = soledad_sync_target      def make_app(self): -        self.request_state = couch.CouchServerState(self._couch_url) +        self.request_state = couch.CouchServerState(self.couch_url)          return self.make_app_with_state(self.request_state)      def setUp(self):          TestCaseWithServer.setUp(self)          CouchDBTestCase.setUp(self)          self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") -        self._couch_url = 'http://localhost:' + str(self.wrapper.port) +        self.user = ('user-%s' % uuid.uuid4().hex)      def tearDown(self):          CouchDBTestCase.tearDown(self) @@ -103,12 +103,12 @@ class TestSyncMutex(          # ensure remote db exists before syncing          db = couch.CouchDatabase.open_database( -            urljoin(self._couch_url, 'user-user-uuid'), +            urljoin(self.couch_url, 'user-' + self.user),              create=True,              ensure_ddocs=True)          sol = self._soledad_instance( -            user='user-uuid', server_url=self.getURL()) +            user=self.user, server_url=self.getURL())          d1 = sol.sync()          d2 = sol.sync() diff --git a/common/src/leap/soledad/common/tests/test_sync_target.py b/common/src/leap/soledad/common/tests/test_sync_target.py index d855fb52..c0987e90 100644 --- a/common/src/leap/soledad/common/tests/test_sync_target.py +++ b/common/src/leap/soledad/common/tests/test_sync_target.py @@ -63,13 +63,12 @@ class TestSoledadParseReceivedDocResponse(SoledadWithCouchServerMixin):      def setUp(self):          SoledadWithCouchServerMixin.setUp(self) -        self._couch_url = 'http://localhost:' + str(self.wrapper.port)          creds = {'token': {              'uuid': 'user-uuid',              'token': 'auth-token',          }}          self.target = target.SoledadHTTPSyncTarget( -            self._couch_url, +            self.couch_url,              uuid4().hex,              creds,              self._soledad._crypto, @@ -151,11 +150,12 @@ def make_local_db_and_soledad_target(          test, path='test',          source_replica_uid=uuid4().hex):      test.startTwistedServer() -    db = test.request_state._create_database(os.path.basename(path)) +    replica_uid = os.path.basename(path) +    db = test.request_state._create_database(replica_uid)      sync_db = test._soledad._sync_db      sync_enc_pool = test._soledad._sync_enc_pool      st = soledad_sync_target( -        test, path, +        test, db._dbname,          source_replica_uid=source_replica_uid,          sync_db=sync_db,          sync_enc_pool=sync_enc_pool) @@ -191,6 +191,8 @@ class TestSoledadSyncTarget(              self.startTwistedServer()          sync_db = self._soledad._sync_db          sync_enc_pool = self._soledad._sync_enc_pool +        if path is None: +            path = self.db2._dbname          target = self.sync_target(              self, path,              source_replica_uid=source_replica_uid, @@ -204,11 +206,11 @@ class TestSoledadSyncTarget(          SoledadWithCouchServerMixin.setUp(self)          self.startTwistedServer()          self.db1 = make_sqlcipher_database_for_test(self, 'test1') -        self.db2 = self.request_state._create_database('test2') +        self.db2 = self.request_state._create_database('test')      def tearDown(self):          # db2, _ = self.request_state.ensure_database('test2') -        self.db2.delete_database() +        self.delete_db(self.db2._dbname)          self.db1.close()          SoledadWithCouchServerMixin.tearDown(self)          TestWithScenarios.tearDown(self) @@ -220,8 +222,8 @@ class TestSoledadSyncTarget(          This test was adapted to decrypt remote content before assert.          """ -        db = self.request_state._create_database('test') -        remote_target = self.getSyncTarget('test') +        db = self.db2 +        remote_target = self.getSyncTarget()          other_docs = []          def receive_doc(doc, gen, trans_id): @@ -247,7 +249,7 @@ class TestSoledadSyncTarget(          def blackhole_getstderr(inst):              return cStringIO.StringIO() -        db = self.request_state._create_database('test') +        db = self.db2          _put_doc_if_newer = db._put_doc_if_newer          trigger_ids = ['doc-here2'] @@ -267,7 +269,6 @@ class TestSoledadSyncTarget(          self.patch(              IndexedCouchDatabase, '_put_doc_if_newer', bomb_put_doc_if_newer)          remote_target = self.getSyncTarget( -            'test',              source_replica_uid='replica')          other_changes = [] @@ -317,7 +318,7 @@ class TestSoledadSyncTarget(          This test was adapted to decrypt remote content before assert.          """ -        remote_target = self.getSyncTarget('test') +        remote_target = self.getSyncTarget()          other_docs = []          replica_uid_box = [] @@ -333,7 +334,7 @@ class TestSoledadSyncTarget(              last_known_trans_id=None, insert_doc_cb=receive_doc,              ensure_callback=ensure_cb, defer_decryption=False)          self.assertEqual(1, new_gen) -        db = self.request_state.open_database('test') +        db = self.db2          self.assertEqual(1, len(replica_uid_box))          self.assertEqual(db._replica_uid, replica_uid_box[0])          self.assertGetEncryptedDoc( @@ -346,10 +347,9 @@ class TestSoledadSyncTarget(      @defer.inlineCallbacks      def test_get_sync_info(self): -        db = self.request_state._create_database('test') +        db = self.db2          db._set_replica_gen_and_trans_id('other-id', 1, 'T-transid')          remote_target = self.getSyncTarget( -            'test',              source_replica_uid='other-id')          sync_info = yield remote_target.get_sync_info('other-id')          self.assertEqual( @@ -358,19 +358,17 @@ class TestSoledadSyncTarget(      @defer.inlineCallbacks      def test_record_sync_info(self): -        db = self.request_state._create_database('test')          remote_target = self.getSyncTarget( -            'test',              source_replica_uid='other-id')          yield remote_target.record_sync_info('other-id', 2, 'T-transid') -        self.assertEqual( -            (2, 'T-transid'), db._get_replica_gen_and_trans_id('other-id')) +        self.assertEqual((2, 'T-transid'), +                         self.db2._get_replica_gen_and_trans_id('other-id'))      @defer.inlineCallbacks      def test_sync_exchange_receive(self): -        db = self.request_state._create_database('test') +        db = self.db2          doc = db.create_doc_from_json('{"value": "there"}') -        remote_target = self.getSyncTarget('test') +        remote_target = self.getSyncTarget()          other_changes = []          def receive_doc(doc, gen, trans_id): @@ -423,10 +421,10 @@ class SoledadDatabaseSyncTargetTests(          self.db, self.st = make_local_db_and_soledad_target(self)      def tearDown(self): -        tests.TestCaseWithServer.tearDown(self) -        SoledadWithCouchServerMixin.tearDown(self)          self.db.close()          self.st.close() +        tests.TestCaseWithServer.tearDown(self) +        SoledadWithCouchServerMixin.tearDown(self)      def set_trace_hook(self, callback, shallow=False):          setter = (self.st._set_trace_hook if not shallow else @@ -818,10 +816,6 @@ class TestSoledadDbSync(      oauth = False      token = False -    def make_app(self): -        self.request_state = couch.CouchServerState(self._couch_url) -        return self.make_app_with_state(self.request_state) -      def setUp(self):          """          Need to explicitely invoke inicialization on all bases. @@ -857,13 +851,7 @@ class TestSoledadDbSync(              defer_encryption=True, sync_db_key=sync_db_key)          self.db1 = SQLCipherDatabase(self.opts) -        self.db2 = couch.CouchDatabase.open_database( -            urljoin( -                'http://localhost:' + str(self.wrapper.port), -                'test' -            ), -            create=True, -            ensure_ddocs=True) +        self.db2 = self.request_state._create_database(replica_uid='test')      def tearDown(self):          """ @@ -890,7 +878,7 @@ class TestSoledadDbSync(                  'uuid': 'user-uuid',                  'token': 'auth-token',              }} -            target_url = self.getURL(target_name) +            target_url = self.getURL(self.db2._dbname)              # get a u1db syncer              crypto = self._soledad._crypto diff --git a/common/src/leap/soledad/common/tests/util.py b/common/src/leap/soledad/common/tests/util.py index daa9c558..1c7adb91 100644 --- a/common/src/leap/soledad/common/tests/util.py +++ b/common/src/leap/soledad/common/tests/util.py @@ -27,10 +27,8 @@ import shutil  import random  import string  import u1db -import subprocess -import time -import re  import traceback +import couchdb  from uuid import uuid4  from mock import Mock @@ -337,119 +335,6 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):          self.assertEqual(exp_doc.content, doc.content) -# ----------------------------------------------------------------------------- -# A wrapper for running couchdb locally. -# ----------------------------------------------------------------------------- - -# from: https://github.com/smcq/paisley/blob/master/paisley/test/util.py -# TODO: include license of above project. -class CouchDBWrapper(object): - -    """ -    Wrapper for external CouchDB instance which is started and stopped for -    testing. -    """ -    BOOT_TIMEOUT_SECONDS = 5 -    RETRY_LIMIT = 3 - -    def start(self): -        tries = 0 -        while tries < self.RETRY_LIMIT and not hasattr(self, 'port'): -            try: -                self._try_start() -                return -            except Exception, e: -                print traceback.format_exc() -                self.stop() -                tries += 1 -        raise Exception( -            "Check your couchdb: Tried to start 3 times and failed badly") - -    def _try_start(self): -        """ -        Start a CouchDB instance for a test. -        """ -        self.tempdir = tempfile.mkdtemp(suffix='.couch.test') - -        path = os.path.join(os.path.dirname(__file__), -                            'couchdb.ini.template') -        handle = open(path) -        conf = handle.read() % { -            'tempdir': self.tempdir, -        } -        handle.close() - -        shutil.copy('/etc/couchdb/default.ini', self.tempdir) -        defaultConfPath = os.path.join(self.tempdir, 'default.ini') - -        confPath = os.path.join(self.tempdir, 'test.ini') -        handle = open(confPath, 'w') -        handle.write(conf) -        handle.close() - -        # create the dirs from the template -        mkdir_p(os.path.join(self.tempdir, 'lib')) -        mkdir_p(os.path.join(self.tempdir, 'log')) -        args = ['/usr/bin/couchdb', '-n', -                '-a', defaultConfPath, '-a', confPath] -        null = open('/dev/null', 'w') - -        self.process = subprocess.Popen( -            args, env=None, stdout=null.fileno(), stderr=null.fileno(), -            close_fds=True) -        boot_time = time.time() -        # find port -        logPath = os.path.join(self.tempdir, 'log', 'couch.log') -        while not os.path.exists(logPath): -            if self.process.poll() is not None: -                got_stdout, got_stderr = "", "" -                if self.process.stdout is not None: -                    got_stdout = self.process.stdout.read() - -                if self.process.stderr is not None: -                    got_stderr = self.process.stderr.read() -                raise Exception(""" -couchdb exited with code %d. -stdout: -%s -stderr: -%s""" % ( -                    self.process.returncode, got_stdout, got_stderr)) -            time.sleep(0.01) -            if (time.time() - boot_time) > self.BOOT_TIMEOUT_SECONDS: -                self.stop() -                raise Exception("Timeout starting couch") -        while os.stat(logPath).st_size == 0: -            time.sleep(0.01) -            if (time.time() - boot_time) > self.BOOT_TIMEOUT_SECONDS: -                self.stop() -                raise Exception("Timeout starting couch") -        PORT_RE = re.compile( -            'Apache CouchDB has started on http://127.0.0.1:(?P<port>\d+)') - -        handle = open(logPath) -        line = handle.read() -        handle.close() -        m = PORT_RE.search(line) -        if not m: -            self.stop() -            raise Exception("Cannot find port in line %s" % line) -        self.port = int(m.group('port')) - -    def stop(self): -        """ -        Terminate the CouchDB instance. -        """ -        try: -            self.process.terminate() -            self.process.communicate() -        except: -            # just to clean up -            # if it can't, the process wasn't created anyway -            pass -        shutil.rmtree(self.tempdir) - -  class CouchDBTestCase(unittest.TestCase, MockedSharedDBTest):      """ @@ -460,15 +345,16 @@ class CouchDBTestCase(unittest.TestCase, MockedSharedDBTest):          """          Make sure we have a CouchDB instance for a test.          """ -        self.wrapper = CouchDBWrapper() -        self.wrapper.start() -        # self.db = self.wrapper.db +        self.couch_port = 5984 +        self.couch_url = 'http://localhost:%d' % self.couch_port +        self.couch_server = couchdb.Server(self.couch_url) -    def tearDown(self): -        """ -        Stop CouchDB instance for test. -        """ -        self.wrapper.stop() +    def delete_db(self, name): +        try: +            self.couch_server.delete(name) +        except: +            # ignore if already missing +            pass  class CouchServerStateForTests(CouchServerState): @@ -484,15 +370,25 @@ class CouchServerStateForTests(CouchServerState):      which is less pleasant than allowing the db to be automatically created.      """ -    def _create_database(self, dbname): -        return CouchDatabase.open_database( -            urljoin(self._couch_url, dbname), +    def __init__(self, *args, **kwargs): +        self.dbs = [] +        super(CouchServerStateForTests, self).__init__(*args, **kwargs) + +    def _create_database(self, replica_uid=None, dbname=None): +        """ +        Create db and append to a list, allowing test to close it later +        """ +        dbname = dbname or ('test-%s' % uuid4().hex) +        db = CouchDatabase.open_database( +            urljoin(self.couch_url, dbname),              True, -            replica_uid=dbname, +            replica_uid=replica_uid or 'test',              ensure_ddocs=True) +        self.dbs.append(db) +        return db      def ensure_database(self, dbname): -        db = self._create_database(dbname) +        db = self._create_database(dbname=dbname)          return db, db.replica_uid @@ -506,23 +402,20 @@ class SoledadWithCouchServerMixin(          main_test_class = getattr(self, 'main_test_class', None)          if main_test_class is not None:              main_test_class.setUp(self) -        self._couch_url = 'http://localhost:%d' % self.wrapper.port      def tearDown(self):          main_test_class = getattr(self, 'main_test_class', None)          if main_test_class is not None:              main_test_class.tearDown(self)          # delete the test database -        try: -            db = CouchDatabase(self._couch_url, 'test') -            db.delete_database() -        except DatabaseDoesNotExist: -            pass          BaseSoledadTest.tearDown(self)          CouchDBTestCase.tearDown(self)      def make_app(self): -        couch_url = urljoin( -            'http://localhost:' + str(self.wrapper.port), 'tests') -        self.request_state = CouchServerStateForTests(couch_url) +        self.request_state = CouchServerStateForTests(self.couch_url) +        self.addCleanup(self.delete_dbs)          return self.make_app_with_state(self.request_state) + +    def delete_dbs(self): +        for db in self.request_state.dbs: +            self.delete_db(db._dbname) diff --git a/docs/sphinx/sync.rst b/docs/sphinx/sync.rst new file mode 100644 index 00000000..f243befb --- /dev/null +++ b/docs/sphinx/sync.rst @@ -0,0 +1,32 @@ +Soledad sync process +==================== + +Phases of sync: + +(1) client acquires knowledge about server state. http GET +(2) client sends its documents to the server. http POSTs, or a single POST. +(3) client downloads documents from the server. +(4) client records its new state on the server. + +Originally in u1db: +    (1) is a GET, +    (2) and (3) are one POST (send in body, receive in response), +    (4) is a PUT. + +In soledad: + +(1) is a GET. +(2) is either 1 or a series of sequential POSTS. +  (2.1) encrypt asynchronously +  (2.2) store in temp sync db +  (2.3) upload sequentially ***THIS IS SLOW*** +(3) is a series of concurrent POSTS, insert sequentially on local client db. +  (3.1) download concurrently +  (3.2) store in temp sync db +  (3.3) decrypt asynchronously +  (3.4) insert sequentially in local client db +(4) is a PUT. + +This difference between u1db and soledad was made in order to be able to gracefully interrupt the sync in the middle of the upload or the download. + +it is essential that all the uploads and downloads are sequential: documents must be added in order. the download happens in parallel, but then locally they are added sequentially to the local db. diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index 7a03f6fb..1b795016 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -238,6 +238,7 @@ class HTTPInvocationByMethodWithBody(                  if content_type == 'application/x-soledad-sync-put':                      meth_put = self._lookup('%s_put' % method)                      meth_end = self._lookup('%s_end' % method) +                    entries = []                      while True:                          line = body_getline()                          entry = line.strip() @@ -246,9 +247,11 @@ class HTTPInvocationByMethodWithBody(                          if not entry or not comma:  # empty or no prec comma                              raise http_app.BadRequest                          entry, comma = utils.check_and_strip_comma(entry) -                        meth_put({}, entry) +                        entries.append(entry)                      if comma or body_getline():  # extra comma or data                          raise http_app.BadRequest +                    for entry in entries: +                        meth_put({}, entry)                      return meth_end()                  # handle outgoing documents                  elif content_type == 'application/x-soledad-sync-get':  | 
