diff options
Diffstat (limited to 'common/src')
8 files changed, 1120 insertions, 371 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index d2414477..8e8613a1 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -24,23 +24,48 @@ import uuid  import logging  import binascii  import socket +import time +import sys +import threading + + +from StringIO import StringIO +from collections import defaultdict  from couchdb.client import Server -from couchdb.http import ResourceNotFound, Unauthorized -from u1db import errors, query_parser, vectorclock +from couchdb.http import ( +    ResourceConflict, +    ResourceNotFound, +    ServerError, +    Session, +) +from u1db import query_parser, vectorclock +from u1db.errors import ( +    DatabaseDoesNotExist, +    InvalidGeneration, +    RevisionConflict, +    InvalidDocId, +    ConflictedDoc, +    DocumentDoesNotExist, +    DocumentAlreadyDeleted, +    Unauthorized, +)  from u1db.backends import CommonBackend, CommonSyncTarget  from u1db.remote import http_app  from u1db.remote.server_state import ServerState -from leap.soledad.common import USER_DB_PREFIX, ddocs +from leap.soledad.common import USER_DB_PREFIX, ddocs, errors  from leap.soledad.common.document import SoledadDocument  logger = logging.getLogger(__name__) +COUCH_TIMEOUT = 120  # timeout for transfers between Soledad server and Couch + +  class InvalidURLError(Exception):      """      Exception raised when Soledad encounters a malformed URL. @@ -75,9 +100,9 @@ class CouchDocument(SoledadDocument):          SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts)          self._couch_rev = None          self._conflicts = None -        self._modified_conflicts = False +        self._transactions = None -    def ensure_fetch_conflicts(self, get_conflicts_fun): +    def _ensure_fetch_conflicts(self, get_conflicts_fun):          """          Ensure conflict data has been fetched from the server. @@ -100,6 +125,16 @@ class CouchDocument(SoledadDocument):          """          return self._conflicts +    def set_conflicts(self, conflicts): +        """ +        Set the conflicted versions of the document. + +        :param conflicts: The conflicted versions of the document. +        :type conflicts: list +        """ +        self._conflicts = conflicts +        self.has_conflicts = len(self._conflicts) > 0 +      def add_conflict(self, doc):          """          Add a conflict to this document. @@ -108,8 +143,7 @@ class CouchDocument(SoledadDocument):          :type doc: CouchDocument          """          if self._conflicts is None: -            raise Exception("Run self.ensure_fetch_conflicts first!") -        self._modified_conflicts = True +            raise Exception("Run self._ensure_fetch_conflicts first!")          self._conflicts.append(doc)          self.has_conflicts = len(self._conflicts) > 0 @@ -121,25 +155,13 @@ class CouchDocument(SoledadDocument):          :type conflict_revs: [str]          """          if self._conflicts is None: -            raise Exception("Run self.ensure_fetch_conflicts first!") +            raise Exception("Run self._ensure_fetch_conflicts first!")          conflicts_len = len(self._conflicts)          self._conflicts = filter(              lambda doc: doc.rev not in conflict_revs,              self._conflicts) -        if len(self._conflicts) < conflicts_len: -            self._modified_conflicts = True          self.has_conflicts = len(self._conflicts) > 0 -    def modified_conflicts(self): -        """ -        Return whether this document's conflicts have been modified. - -        :return: Whether this document's conflicts have been modified. -        :rtype: bool -        """ -        return self._conflicts is not None and \ -            self._modified_conflicts is True -      def _get_couch_rev(self):          return self._couch_rev @@ -148,18 +170,217 @@ class CouchDocument(SoledadDocument):      couch_rev = property(_get_couch_rev, _set_couch_rev) +    def _get_transactions(self): +        return self._transactions + +    def _set_transactions(self, rev): +        self._transactions = rev + +    transactions = property(_get_transactions, _set_transactions) +  # monkey-patch the u1db http app to use CouchDocument  http_app.Document = CouchDocument +def raise_missing_design_doc_error(exc, ddoc_path): +    """ +    Raise an appropriate exception when catching a ResourceNotFound when +    accessing a design document. + +    :param exc: The exception cought. +    :type exc: ResourceNotFound +    :param ddoc_path: A list representing the requested path. +    :type ddoc_path: list + +    :raise MissingDesignDocError: Raised when tried to access a missing design +                                  document. +    :raise MissingDesignDocListFunctionError: Raised when trying to access a +                                              missing list function on a +                                              design document. +    :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                           missing named view on a design +                                           document. +    :raise MissingDesignDocDeletedError: Raised when trying to access a +                                         deleted design document. +    :raise MissingDesignDocUnknownError: Raised when failed to access a design +                                         document for an yet unknown reason. +    """ +    path = "".join(ddoc_path) +    if exc.message[1] == 'missing': +        raise errors.MissingDesignDocError(path) +    elif exc.message[1] == 'missing function' or \ +            exc.message[1].startswith('missing lists function'): +        raise errors.MissingDesignDocListFunctionError(path) +    elif exc.message[1] == 'missing_named_view': +        raise errors.MissingDesignDocNamedViewError(path) +    elif exc.message[1] == 'deleted': +        raise errors.MissingDesignDocDeletedError(path) +    # other errors are unknown for now +    raise errors.DesignDocUnknownError("%s: %s" % (path, str(exc.message))) + + +def raise_server_error(exc, ddoc_path): +    """ +    Raise an appropriate exception when catching a ServerError when +    accessing a design document. + +    :param exc: The exception cought. +    :type exc: ResourceNotFound +    :param ddoc_path: A list representing the requested path. +    :type ddoc_path: list + +    :raise MissingDesignDocListFunctionError: Raised when trying to access a +                                              missing list function on a +                                              design document. +    :raise MissingDesignDocUnknownError: Raised when failed to access a design +                                         document for an yet unknown reason. +    """ +    path = "".join(ddoc_path) +    if exc.message[1][0] == 'unnamed_error': +        raise errors.MissingDesignDocListFunctionError(path) +    # other errors are unknown for now +    raise errors.DesignDocUnknownError(path) + + +class MultipartWriter(object): +    """ +    A multipart writer adapted from python-couchdb's one so we can PUT +    documents using couch's multipart PUT. + +    This stripped down version does not allow for nested structures, and +    contains only the essential things we need to PUT SoledadDocuments to the +    couch backend. +    """ + +    CRLF = '\r\n' + +    def __init__(self, fileobj, headers=None, boundary=None): +        """ +        Initialize the multipart writer. +        """ +        self.fileobj = fileobj +        if boundary is None: +            boundary = self._make_boundary() +        self._boundary = boundary +        self._build_headers('related', headers) + +    def add(self, mimetype, content, headers={}): +        """ +        Add a part to the multipart stream. +        """ +        self.fileobj.write('--') +        self.fileobj.write(self._boundary) +        self.fileobj.write(self.CRLF) +        headers['Content-Type'] = mimetype +        self._write_headers(headers) +        if content: +            # XXX: throw an exception if a boundary appears in the content?? +            self.fileobj.write(content) +            self.fileobj.write(self.CRLF) + +    def close(self): +        """ +        Close the multipart stream. +        """ +        self.fileobj.write('--') +        self.fileobj.write(self._boundary) +        # be careful not to have anything after '--', otherwise old couch +        # versions (including bigcouch) will fail. +        self.fileobj.write('--') + +    def _make_boundary(self): +        """ +        Create a boundary to discern multi parts. +        """ +        try: +            from uuid import uuid4 +            return '==' + uuid4().hex + '==' +        except ImportError: +            from random import randrange +            token = randrange(sys.maxint) +            format = '%%0%dd' % len(repr(sys.maxint - 1)) +            return '===============' + (format % token) + '==' + +    def _write_headers(self, headers): +        """ +        Write a part header in the buffer stream. +        """ +        if headers: +            for name in sorted(headers.keys()): +                value = headers[name] +                self.fileobj.write(name) +                self.fileobj.write(': ') +                self.fileobj.write(value) +                self.fileobj.write(self.CRLF) +        self.fileobj.write(self.CRLF) + +    def _build_headers(self, subtype, headers): +        """ +        Build the main headers of the multipart stream. + +        This is here so we can send headers separete from content using +        python-couchdb API. +        """ +        self.headers = {} +        self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \ +                                       (subtype, self._boundary) +        if headers: +            for name in sorted(headers.keys()): +                value = headers[name] +                self.headers[name] = value + +  class CouchDatabase(CommonBackend):      """      A U1DB implementation that uses CouchDB as its persistence layer.      """ +    # We spawn threads to parallelize the CouchDatabase.get_docs() method +    MAX_GET_DOCS_THREADS = 20 + +    update_handler_lock = defaultdict(threading.Lock) + +    class _GetDocThread(threading.Thread): +        """ +        A thread that gets a document from a database. + +        TODO: switch this for a twisted deferred to thread. This depends on +        replacing python-couchdb for paisley in this module. +        """ + +        def __init__(self, db, doc_id, check_for_conflicts, +                     release_fun): +            """ +            :param db: The database from where to get the document. +            :type db: u1db.Database +            :param doc_id: The doc_id of the document to be retrieved. +            :type doc_id: str +            :param check_for_conflicts: Whether the get_doc() method should +                                        check for existing conflicts. +            :type check_for_conflicts: bool +            :param release_fun: A function that releases a semaphore, to be +                                called after the document is fetched. +            :type release_fun: function +            """ +            threading.Thread.__init__(self) +            self._db = db +            self._doc_id = doc_id +            self._check_for_conflicts = check_for_conflicts +            self._release_fun = release_fun +            self._doc = None + +        def run(self): +            """ +            Fetch the document, store it as a property, and call the release +            function. +            """ +            self._doc = self._db._get_doc( +                self._doc_id, self._check_for_conflicts) +            self._release_fun() +      @classmethod -    def open_database(cls, url, create): +    def open_database(cls, url, create, ensure_ddocs=False):          """          Open a U1DB database using CouchDB as backend. @@ -167,6 +388,8 @@ class CouchDatabase(CommonBackend):          :type url: str          :param create: should the replica be created if it does not exist?          :type create: bool +        :param ensure_ddocs: Ensure that the design docs exist on server. +        :type ensure_ddocs: bool          :return: the database instance          :rtype: CouchDatabase @@ -182,8 +405,8 @@ class CouchDatabase(CommonBackend):              server[dbname]          except ResourceNotFound:              if not create: -                raise errors.DatabaseDoesNotExist() -        return cls(url, dbname) +                raise DatabaseDoesNotExist() +        return cls(url, dbname, ensure_ddocs=ensure_ddocs)      def __init__(self, url, dbname, replica_uid=None, full_commit=True,                   session=None, ensure_ddocs=True): @@ -206,6 +429,8 @@ class CouchDatabase(CommonBackend):          # save params          self._url = url          self._full_commit = full_commit +        if session is None: +            session = Session(timeout=COUCH_TIMEOUT)          self._session = session          self._factory = CouchDocument          self._real_replica_uid = None @@ -223,6 +448,9 @@ class CouchDatabase(CommonBackend):              self._set_replica_uid(replica_uid)          if ensure_ddocs:              self.ensure_ddocs_on_db() +        # initialize a thread pool for parallelizing get_docs() +        self._sem_pool = threading.BoundedSemaphore( +            value=self.MAX_GET_DOCS_THREADS)      def ensure_ddocs_on_db(self):          """ @@ -318,12 +546,31 @@ class CouchDatabase(CommonBackend):          :return: The current generation.          :rtype: int + +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason.          """          # query a couch list function -        res = self._database.resource( -            '_design', 'transactions', '_list', 'generation', 'log') -        response = res.get_json() -        return response[2]['generation'] +        ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] +        res = self._database.resource(*ddoc_path) +        try: +            response = res.get_json() +            return response[2]['generation'] +        except ResourceNotFound as e: +            raise_missing_design_doc_error(e, ddoc_path) +        except ServerError as e: +            raise_server_error(e, ddoc_path)      def _get_generation_info(self):          """ @@ -331,12 +578,31 @@ class CouchDatabase(CommonBackend):          :return: A tuple containing the current generation and transaction id.          :rtype: (int, str) + +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason.          """          # query a couch list function -        res = self._database.resource( -            '_design', 'transactions', '_list', 'generation', 'log') -        response = res.get_json() -        return (response[2]['generation'], response[2]['transaction_id']) +        ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] +        res = self._database.resource(*ddoc_path) +        try: +            response = res.get_json() +            return (response[2]['generation'], response[2]['transaction_id']) +        except ResourceNotFound as e: +            raise_missing_design_doc_error(e, ddoc_path) +        except ServerError as e: +            raise_server_error(e, ddoc_path)      def _get_trans_id_for_gen(self, generation):          """ @@ -349,16 +615,36 @@ class CouchDatabase(CommonBackend):          :rtype: str          :raise InvalidGeneration: Raised when the generation does not exist. +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason.          """          if generation == 0:              return ''          # query a couch list function -        res = self._database.resource( -            '_design', 'transactions', '_list', 'trans_id_for_gen', 'log') -        response = res.get_json(gen=generation) -        if response[2] == {}: -            raise errors.InvalidGeneration -        return response[2]['transaction_id'] +        ddoc_path = [ +            '_design', 'transactions', '_list', 'trans_id_for_gen', 'log' +        ] +        res = self._database.resource(*ddoc_path) +        try: +            response = res.get_json(gen=generation) +            if response[2] == {}: +                raise InvalidGeneration +            return response[2]['transaction_id'] +        except ResourceNotFound as e: +            raise_missing_design_doc_error(e, ddoc_path) +        except ServerError as e: +            raise_server_error(e, ddoc_path)      def _get_transaction_log(self):          """ @@ -366,12 +652,31 @@ class CouchDatabase(CommonBackend):          :return: The complete transaction log.          :rtype: [(str, str)] + +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason.          """          # query a couch view -        res = self._database.resource( -            '_design', 'transactions', '_view', 'log') -        response = res.get_json() -        return map(lambda row: (row['id'], row['value']), response[2]['rows']) +        ddoc_path = ['_design', 'transactions', '_view', 'log'] +        res = self._database.resource(*ddoc_path) +        try: +            response = res.get_json() +            return map( +                lambda row: (row['id'], row['value']), +                response[2]['rows']) +        except ResourceNotFound as e: +            raise_missing_design_doc_error(e, ddoc_path)      def _get_doc(self, doc_id, check_for_conflicts=False):          """ @@ -413,8 +718,15 @@ class CouchDatabase(CommonBackend):                  and '_attachments' in result \                  and 'u1db_conflicts' in result['_attachments']:              doc.has_conflicts = True +            doc.set_conflicts( +                self._build_conflicts( +                    doc.doc_id, +                    json.loads(binascii.a2b_base64( +                        result['_attachments']['u1db_conflicts']['data']))))          # store couch revision          doc.couch_rev = result['_rev'] +        # store transactions +        doc.transactions = result['u1db_transactions']          return doc      def get_doc(self, doc_id, include_deleted=False): @@ -465,6 +777,10 @@ class CouchDatabase(CommonBackend):          """          Put the document in the Couch backend database. +        Note that C{old_doc} must have been fetched with the parameter +        C{check_for_conflicts} equal to True, so we can properly update the +        new document using the conflict information from the old one. +          :param old_doc: The old document version.          :type old_doc: CouchDocument          :param doc: The document to be put. @@ -472,43 +788,76 @@ class CouchDatabase(CommonBackend):          :raise RevisionConflict: Raised when trying to update a document but                                   couch revisions mismatch. -        """ -        trans_id = self._allocate_transaction_id() -        # encode content -        content = doc.get_json() -        if content is not None: -            content = binascii.b2a_base64(content)[:-1]  # exclude trailing \n -        # encode conflicts -        conflicts = None -        update_conflicts = doc.modified_conflicts() -        if update_conflicts is True: -            if doc.has_conflicts: -                conflicts = binascii.b2a_base64( -                    json.dumps( -                        map(lambda cdoc: (cdoc.rev, cdoc.content), -                            doc.get_conflicts())) -            )[:-1]  # exclude \n -        # perform the request -        resource = self._database.resource( -            '_design', 'docs', '_update', 'put', doc.doc_id) -        response = resource.put_json( -            body={ -                'couch_rev': old_doc.couch_rev -                    if old_doc is not None else None, -                'u1db_rev': doc.rev, -                'content': content, -                'trans_id': trans_id, -                'conflicts': conflicts, -                'update_conflicts': update_conflicts, -            }, -            headers={'content-type': 'application/json'}) -        # the document might have been updated in between, so we check for the -        # return message -        msg = response[2].read() -        if msg == 'ok': -            return -        elif msg == 'revision conflict': -            raise errors.RevisionConflict() +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason. +        """ +        attachments = {}  # we save content and conflicts as attachments +        parts = []  # and we put it using couch's multipart PUT +        # save content as attachment +        if doc.is_tombstone() is False: +            content = doc.get_json() +            attachments['u1db_content'] = { +                'follows': True, +                'content_type': 'application/octet-stream', +                'length': len(content), +            } +            parts.append(content) +        # save conflicts as attachment +        if doc.has_conflicts is True: +            conflicts = json.dumps( +                map(lambda cdoc: (cdoc.rev, cdoc.content), +                    doc.get_conflicts())) +            attachments['u1db_conflicts'] = { +                'follows': True, +                'content_type': 'application/octet-stream', +                'length': len(conflicts), +            } +            parts.append(conflicts) +        # store old transactions, if any +        transactions = old_doc.transactions[:] if old_doc is not None else [] +        # create a new transaction id and timestamp it so the transaction log +        # is consistent when querying the database. +        transactions.append( +            # here we store milliseconds to keep consistent with javascript +            # Date.prototype.getTime() which was used before inside a couchdb +            # update handler. +            (int(time.time() * 1000), +            self._allocate_transaction_id())) +        # build the couch document +        couch_doc = { +            '_id': doc.doc_id, +            'u1db_rev': doc.rev, +            'u1db_transactions': transactions, +            '_attachments': attachments, +        } +        # if we are updating a doc we have to add the couch doc revision +        if old_doc is not None: +            couch_doc['_rev'] = old_doc.couch_rev +        # prepare the multipart PUT +        buf = StringIO() +        envelope = MultipartWriter(buf) +        envelope.add('application/json', json.dumps(couch_doc)) +        for part in parts: +            envelope.add('application/octet-stream', part) +        envelope.close() +        # try to save and fail if there's a revision conflict +        try: +            self._database.resource.put_json( +                doc.doc_id, body=buf.getvalue(), headers=envelope.headers) +            self._renew_couch_session() +        except ResourceConflict: +            raise RevisionConflict()      def put_doc(self, doc):          """ @@ -522,26 +871,26 @@ class CouchDatabase(CommonBackend):          :return: new_doc_rev - The new revision identifier for the document.              The Document object will also be updated. -        :raise errors.InvalidDocId: Raised if the document's id is invalid. -        :raise errors.DocumentTooBig: Raised if the document size is too big. -        :raise errors.ConflictedDoc: Raised if the document has conflicts. +        :raise InvalidDocId: Raised if the document's id is invalid. +        :raise DocumentTooBig: Raised if the document size is too big. +        :raise ConflictedDoc: Raised if the document has conflicts.          """          if doc.doc_id is None: -            raise errors.InvalidDocId() +            raise InvalidDocId()          self._check_doc_id(doc.doc_id)          self._check_doc_size(doc)          old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)          if old_doc and old_doc.has_conflicts: -            raise errors.ConflictedDoc() +            raise ConflictedDoc()          if old_doc and doc.rev is None and old_doc.is_tombstone():              new_rev = self._allocate_doc_rev(old_doc.rev)          else:              if old_doc is not None:                      if old_doc.rev != doc.rev: -                        raise errors.RevisionConflict() +                        raise RevisionConflict()              else:                  if doc.rev is not None: -                    raise errors.RevisionConflict() +                    raise RevisionConflict()              new_rev = self._allocate_doc_rev(doc.rev)          doc.rev = new_rev          self._put_doc(old_doc, doc) @@ -563,32 +912,53 @@ class CouchDatabase(CommonBackend):                   to the last intervening change and sorted by generation (old                   changes first)          :rtype: (int, str, [(str, int, str)]) + +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason.          """          # query a couch list function -        res = self._database.resource( -            '_design', 'transactions', '_list', 'whats_changed', 'log') -        response = res.get_json(old_gen=old_generation) -        results = map( -            lambda row: -                (row['generation'], row['doc_id'], row['transaction_id']), -            response[2]['transactions']) -        results.reverse() -        cur_gen = old_generation -        seen = set() -        changes = [] -        newest_trans_id = '' -        for generation, doc_id, trans_id in results: -            if doc_id not in seen: -                changes.append((doc_id, generation, trans_id)) -                seen.add(doc_id) -        if changes: -            cur_gen = changes[0][1]  # max generation -            newest_trans_id = changes[0][2] -            changes.reverse() -        else: -            cur_gen, newest_trans_id = self._get_generation_info() +        ddoc_path = [ +            '_design', 'transactions', '_list', 'whats_changed', 'log' +        ] +        res = self._database.resource(*ddoc_path) +        try: +            response = res.get_json(old_gen=old_generation) +            results = map( +                lambda row: +                    (row['generation'], row['doc_id'], row['transaction_id']), +                response[2]['transactions']) +            results.reverse() +            cur_gen = old_generation +            seen = set() +            changes = [] +            newest_trans_id = '' +            for generation, doc_id, trans_id in results: +                if doc_id not in seen: +                    changes.append((doc_id, generation, trans_id)) +                    seen.add(doc_id) +            if changes: +                cur_gen = changes[0][1]  # max generation +                newest_trans_id = changes[0][2] +                changes.reverse() +            else: +                cur_gen, newest_trans_id = self._get_generation_info() -        return cur_gen, newest_trans_id, changes +            return cur_gen, newest_trans_id, changes +        except ResourceNotFound as e: +            raise_missing_design_doc_error(e, ddoc_path) +        except ServerError as e: +            raise_server_error(e, ddoc_path)      def delete_doc(self, doc):          """ @@ -600,28 +970,47 @@ class CouchDatabase(CommonBackend):          :param doc: The document to mark as deleted.          :type doc: CouchDocument. -        :raise errors.DocumentDoesNotExist: Raised if the document does not +        :raise DocumentDoesNotExist: Raised if the document does not                                              exist. -        :raise errors.RevisionConflict: Raised if the revisions do not match. -        :raise errors.DocumentAlreadyDeleted: Raised if the document is +        :raise RevisionConflict: Raised if the revisions do not match. +        :raise DocumentAlreadyDeleted: Raised if the document is                                                already deleted. -        :raise errors.ConflictedDoc: Raised if the doc has conflicts. +        :raise ConflictedDoc: Raised if the doc has conflicts.          """          old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)          if old_doc is None: -            raise errors.DocumentDoesNotExist +            raise DocumentDoesNotExist          if old_doc.rev != doc.rev: -            raise errors.RevisionConflict() +            raise RevisionConflict()          if old_doc.is_tombstone(): -            raise errors.DocumentAlreadyDeleted +            raise DocumentAlreadyDeleted          if old_doc.has_conflicts: -            raise errors.ConflictedDoc() +            raise ConflictedDoc()          new_rev = self._allocate_doc_rev(doc.rev)          doc.rev = new_rev          doc.make_tombstone()          self._put_doc(old_doc, doc)          return new_rev +    def _build_conflicts(self, doc_id, attached_conflicts): +        """ +        Build the conflicted documents list from the conflicts attachment +        fetched from a couch document. + +        :param attached_conflicts: The document's conflicts as fetched from a +                                   couch document attachment. +        :type attached_conflicts: dict +        """ +        conflicts = [] +        for doc_rev, content in attached_conflicts: +            doc = self._factory(doc_id, doc_rev) +            if content is None: +                doc.make_tombstone() +            else: +                doc.content = content +            conflicts.append(doc) +        return conflicts +      def _get_conflicts(self, doc_id, couch_rev=None):          """          Get the conflicted versions of a document. @@ -642,16 +1031,8 @@ class CouchDatabase(CommonBackend):          resource = self._database.resource(doc_id, 'u1db_conflicts')          try:              response = resource.get_json(**params) -            conflicts = [] -            # build the conflicted versions -            for doc_rev, content in json.loads(response[2].read()): -                doc = self._factory(doc_id, doc_rev) -                if content is None: -                    doc.make_tombstone() -                else: -                    doc.content = content -                conflicts.append(doc) -            return conflicts +            return self._build_conflicts( +                doc_id, json.loads(response[2].read()))          except ResourceNotFound:              return [] @@ -737,17 +1118,35 @@ class CouchDatabase(CommonBackend):          :param other_transaction_id: The transaction id associated with the                                       generation.          :type other_transaction_id: str + +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason.          """          # query a couch update function -        res = self._database.resource( -            '_design', 'syncs', '_update', 'put', 'u1db_sync_log') -        res.put_json( -            body={ -                'other_replica_uid': other_replica_uid, -                'other_generation': other_generation, -                'other_transaction_id': other_transaction_id, -            }, -            headers={'content-type': 'application/json'}) +        ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log'] +        res = self._database.resource(*ddoc_path) +        try: +            with CouchDatabase.update_handler_lock[self._get_replica_uid()]: +                res.put_json( +                    body={ +                        'other_replica_uid': other_replica_uid, +                        'other_generation': other_generation, +                        'other_transaction_id': other_transaction_id, +                    }, +                    headers={'content-type': 'application/json'}) +        except ResourceNotFound as e: +            raise_missing_design_doc_error(e, ddoc_path)      def _add_conflict(self, doc, my_doc_rev, my_content):          """ @@ -765,7 +1164,7 @@ class CouchDatabase(CommonBackend):                             serialized string.          :type my_content: str          """ -        doc.ensure_fetch_conflicts(self._get_conflicts) +        doc._ensure_fetch_conflicts(self._get_conflicts)          doc.add_conflict(              self._factory(doc_id=doc.doc_id, rev=my_doc_rev,                            json=my_content)) @@ -774,7 +1173,7 @@ class CouchDatabase(CommonBackend):          """          Delete the conflicted revisions from the list of conflicts of C{doc}. -        Note that thie method does not actually update the backed; rather, it +        Note that this method does not actually update the backend; rather, it          updates the CouchDocument object which will provide the conflict data          when the atomic document update is made. @@ -783,7 +1182,7 @@ class CouchDatabase(CommonBackend):          :param conflict_revs: A list of the revisions to be deleted.          :param conflict_revs: [str]          """ -        doc.ensure_fetch_conflicts(self._get_conflicts) +        doc._ensure_fetch_conflicts(self._get_conflicts)          doc.delete_conflicts(conflict_revs)      def _prune_conflicts(self, doc, doc_vcr): @@ -842,34 +1241,44 @@ class CouchDatabase(CommonBackend):          :param conflicted_doc_revs: A list of revisions that the new content                                      supersedes.          :type conflicted_doc_revs: [str] + +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason.          """          cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)          new_rev = self._ensure_maximal_rev(cur_doc.rev,                                             conflicted_doc_revs)          superseded_revs = set(conflicted_doc_revs)          doc.rev = new_rev +        # this backend stores conflicts as properties of the documents, so we +        # have to copy these conflicts over to the document being updated.          if cur_doc.rev in superseded_revs: +            # the newer doc version will supersede the one in the database, so +            # we copy conflicts before updating the backend. +            doc.set_conflicts(cur_doc.get_conflicts())  # copy conflicts over.              self._delete_conflicts(doc, superseded_revs)              self._put_doc(cur_doc, doc)          else: -            self._add_conflict(doc, new_rev, doc.get_json()) -            self._delete_conflicts(doc, superseded_revs) -            # perform request to resolve document in server -            resource = self._database.resource( -                '_design', 'docs', '_update', 'resolve_doc', doc.doc_id) -            conflicts = None -            if doc.has_conflicts: -                conflicts = binascii.b2a_base64( -                    json.dumps( -                        map(lambda cdoc: (cdoc.rev, cdoc.content), -                            doc.get_conflicts())) -                )[:-1]  # exclude \n -            response = resource.put_json( -                body={ -                    'couch_rev': cur_doc.couch_rev, -                    'conflicts': conflicts, -                }, -                headers={'content-type': 'application/json'}) +            # the newer doc version does not supersede the one in the +            # database, so we will add a conflict to the database and copy +            # those over to the document the user has in her hands. +            self._add_conflict(cur_doc, new_rev, doc.get_json()) +            self._delete_conflicts(cur_doc, superseded_revs) +            self._put_doc(cur_doc, cur_doc)  # just update conflicts +            # backend has been updated with current conflicts, now copy them +            # to the current document. +            doc.set_conflicts(cur_doc.get_conflicts())      def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,                            replica_trans_id=''): @@ -926,7 +1335,7 @@ class CouchDatabase(CommonBackend):          if cur_doc is not None:              doc.couch_rev = cur_doc.couch_rev          # fetch conflicts because we will eventually manipulate them -        doc.ensure_fetch_conflicts(self._get_conflicts) +        doc._ensure_fetch_conflicts(self._get_conflicts)          # from now on, it works just like u1db sqlite backend          doc_vcr = vectorclock.VectorClockRev(doc.rev)          if cur_doc is None: @@ -963,7 +1372,7 @@ class CouchDatabase(CommonBackend):              if save_conflict:                  self._force_doc_sync_conflict(doc)          if replica_uid is not None and replica_gen is not None: -            self._do_set_replica_gen_and_trans_id( +            self._set_replica_gen_and_trans_id(                  replica_uid, replica_gen, replica_trans_id)          # update info          old_doc.rev = doc.rev @@ -974,6 +1383,59 @@ class CouchDatabase(CommonBackend):          old_doc.has_conflicts = doc.has_conflicts          return state, self._get_generation() +    def get_docs(self, doc_ids, check_for_conflicts=True, +                 include_deleted=False): +        """ +        Get the JSON content for many documents. + +        :param doc_ids: A list of document identifiers. +        :type doc_ids: list +        :param check_for_conflicts: If set to False, then the conflict check +                                    will be skipped, and 'None' will be +                                    returned instead of True/False. +        :type check_for_conflictsa: bool +        :param include_deleted: If set to True, deleted documents will be +                                returned with empty content. Otherwise deleted +                                documents will not be included in the results. +        :return: iterable giving the Document object for each document id +                 in matching doc_ids order. +        :rtype: iterable +        """ +        # Workaround for: +        # +        #   http://bugs.python.org/issue7980 +        #   https://leap.se/code/issues/5449 +        # +        # python-couchdb uses time.strptime, which is not thread safe. In +        # order to avoid the problem described on the issues above, we preload +        # strptime here by evaluating the conversion of an arbitrary date. +        # This will not be needed when/if we switch from python-couchdb to +        # paisley. +        time.strptime('Mar 4 1917', '%b %d %Y') +        # spawn threads to retrieve docs +        threads = [] +        for doc_id in doc_ids: +            self._sem_pool.acquire() +            t = self._GetDocThread(self, doc_id, check_for_conflicts, +                                   self._sem_pool.release) +            t.start() +            threads.append(t) +        # join threads and yield docs +        for t in threads: +            t.join() +            if t._doc.is_tombstone() and not include_deleted: +                continue +            yield t._doc + +    def _renew_couch_session(self): +        """ +        Create a new couch connection session. + +        This is a workaround for #5448. Will not be needed once bigcouch is +        merged with couchdb. +        """ +        self._database.resource.session = Session(timeout=COUCH_TIMEOUT) +  class CouchSyncTarget(CommonSyncTarget):      """ @@ -997,14 +1459,6 @@ class CouchSyncTarget(CommonSyncTarget):              source_replica_transaction_id) -class NotEnoughCouchPermissions(Exception): -    """ -    Raised when failing to assert for enough permissions on underlying Couch -    Database. -    """ -    pass - -  class CouchServerState(ServerState):      """      Inteface of the WSGI server with the CouchDB backend. @@ -1024,65 +1478,6 @@ class CouchServerState(ServerState):          self._couch_url = couch_url          self._shared_db_name = shared_db_name          self._tokens_db_name = tokens_db_name -        try: -            self._check_couch_permissions() -        except NotEnoughCouchPermissions: -            logger.error("Not enough permissions on underlying couch " -                         "database (%s)." % self._couch_url) -        except (socket.error, socket.gaierror, socket.herror, -                socket.timeout), e: -            logger.error("Socket problem while trying to reach underlying " -                         "couch database: (%s, %s)." % -                         (self._couch_url, e)) - -    def _check_couch_permissions(self): -        """ -        Assert that Soledad Server has enough permissions on the underlying -        couch database. - -        Soledad Server has to be able to do the following in the couch server: - -            * Create, read and write from/to 'shared' db. -            * Create, read and write from/to 'user-<anything>' dbs. -            * Read from 'tokens' db. - -        This function tries to perform the actions above using the "low level" -        couch library to ensure that Soledad Server can do everything it needs -        on the underlying couch database. - -        :param couch_url: The URL of the couch database. -        :type couch_url: str - -        @raise NotEnoughCouchPermissions: Raised in case there are not enough -            permissions to read/write/create the needed couch databases. -        :rtype: bool -        """ - -        def _open_couch_db(dbname): -            server = Server(url=self._couch_url) -            try: -                server[dbname] -            except ResourceNotFound: -                server.create(dbname) -            return server[dbname] - -        def _create_delete_test_doc(db): -            doc_id, _ = db.save({'test': 'document'}) -            doc = db[doc_id] -            db.delete(doc) - -        try: -            # test read/write auth for shared db -            _create_delete_test_doc( -                _open_couch_db(self._shared_db_name)) -            # test read/write auth for user-<something> db -            _create_delete_test_doc( -                _open_couch_db('%stest-db' % USER_DB_PREFIX)) -            # test read auth for tokens db -            tokensdb = _open_couch_db(self._tokens_db_name) -            tokensdb.info() -        except Unauthorized: -            raise NotEnoughCouchPermissions(self._couch_url)      def open_database(self, dbname):          """ @@ -1094,25 +1489,29 @@ class CouchServerState(ServerState):          :return: The CouchDatabase object.          :rtype: CouchDatabase          """ -        # TODO: open couch          return CouchDatabase.open_database(              self._couch_url + '/' + dbname, -            create=False) +            create=False, +            ensure_ddocs=False)      def ensure_database(self, dbname):          """          Ensure couch database exists. +        Usually, this method is used by the server to ensure the existence of +        a database. In our setup, the Soledad user that accesses the underlying +        couch server should never have permission to create (or delete) +        databases. But, in case it ever does, by raising an exception here we +        have one more guarantee that no modified client will be able to +        enforce creation of a database when syncing. +          :param dbname: The name of the database to ensure.          :type dbname: str          :return: The CouchDatabase object and the replica uid.          :rtype: (CouchDatabase, str)          """ -        db = CouchDatabase.open_database( -            self._couch_url + '/' + dbname, -            create=True) -        return db, db._replica_uid +        raise Unauthorized()      def delete_database(self, dbname):          """ @@ -1121,7 +1520,7 @@ class CouchServerState(ServerState):          :param dbname: The name of the database to delete.          :type dbname: str          """ -        CouchDatabase.delete_database(self._couch_url + '/' + dbname) +        raise Unauthorized()      def _set_couch_url(self, url):          """ diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js deleted file mode 100644 index 5a4647de..00000000 --- a/common/src/leap/soledad/common/ddocs/docs/updates/put.js +++ /dev/null @@ -1,64 +0,0 @@ -function(doc, req){ -    /* we expect to receive the following in `req.body`: -     * { -     *     'couch_rev': '<couch_rev>', -     *     'u1db_rev': '<u1db_rev>', -     *     'content': '<base64 encoded content>', -     *     'trans_id': '<reansaction_id>' -     *     'conflicts': '<base64 encoded conflicts>', -     *     'update_conflicts': <boolean> -     * } -     */ -    var body = JSON.parse(req.body); - -    // create a new document document -    if (!doc) { -        doc = {} -        doc['_id'] = req['id']; -    } -    // or fail if couch revisions do not match -    else if (doc['_rev'] != body['couch_rev']) { -        // of fail if revisions do not match -        return [null, 'revision conflict'] -    } - -    // store u1db rev -    doc.u1db_rev = body['u1db_rev']; - -    // save content as attachment -    if (body['content'] != null) { -        // save u1db content as attachment -        if (!doc._attachments) -            doc._attachments = {}; -        doc._attachments.u1db_content =  { -            content_type: "application/octet-stream", -            data: body['content']  // should be base64 encoded -        }; -    } -    // or delete the attachment if document is tombstone -    else if (doc._attachments && -             doc._attachments.u1db_content) -        delete doc._attachments.u1db_content; - -    // store the transaction id -    if (!doc.u1db_transactions) -        doc.u1db_transactions = []; -    var d = new Date(); -    doc.u1db_transactions.push([d.getTime(), body['trans_id']]); - -    // save conflicts as attachment if they were sent -    if (body['update_conflicts']) -        if (body['conflicts'] != null) { -            if (!doc._attachments) -                doc._attachments = {}; -            doc._attachments.u1db_conflicts = { -                content_type: "application/octet-stream", -                data: body['conflicts']  // should be base64 encoded -            } -        } else { -            if(doc._attachments && doc._attachments.u1db_conflicts) -                delete doc._attachments.u1db_conflicts -        } - -    return [doc, 'ok']; -} diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js deleted file mode 100644 index 7ba66cf8..00000000 --- a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js +++ /dev/null @@ -1,39 +0,0 @@ -function(doc, req){ -    /* we expect to receive the following in `req.body`: -     * { -     *     'couch_rev': '<couch_rev>', -     *     'conflicts': '<base64 encoded conflicts>', -     * } -     */ -    var body = JSON.parse(req.body); - -    // fail if no document was given -    if (!doc) { -        return [null, 'document does not exist'] -    }  - -    // fail if couch revisions do not match -    if (body['couch_rev'] != null -        && doc['_rev'] != body['couch_rev']) { -        return [null, 'revision conflict'] -    } - -    // fail if conflicts were not sent -    if (body['conflicts'] == null) -        return [null, 'missing conflicts'] - -    // save conflicts as attachment if they were sent -    if (body['conflicts'] != null) { -        if (!doc._attachments) -            doc._attachments = {}; -        doc._attachments.u1db_conflicts = { -            content_type: "application/octet-stream", -            data: body['conflicts']  // should be base64 encoded -        } -    } -    // or delete attachment if there are no conflicts -    else if (doc._attachments && doc._attachments.u1db_conflicts) -        delete doc._attachments.u1db_conflicts; - -    return [doc, 'ok']; -} diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index 7c2d7296..3a7eadd2 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -25,11 +25,49 @@ from u1db import errors  from u1db.remote import http_errors +def register_exception(cls): +    """ +    A small decorator that registers exceptions in u1db maps. +    """ +    # update u1db "wire description to status" and "wire description to +    # exception" maps. +    http_errors.wire_description_to_status.update({ +        cls.wire_description: cls.status}) +    errors.wire_description_to_exc.update({ +        cls.wire_description: cls}) +    # do not modify the exception +    return cls + + +class SoledadError(errors.U1DBError): +    """ +    Base Soledad HTTP errors. +    """ +    pass + +  # -# LockResource: a lock based on a document in the shared database. +# Authorization errors  # -class InvalidTokenError(errors.U1DBError): +@register_exception +class InvalidAuthTokenError(errors.Unauthorized): +    """ +    Exception raised when failing to get authorization for some action because +    the provided token either does not exist in the tokens database, has a +    distinct structure from the expected one, or is associated with a user +    with a distinct uuid than the one provided by the client. +    """ + +    wire_descrition = "invalid auth token" +    status = 401 + +# +# LockResource errors +# + +@register_exception +class InvalidTokenError(SoledadError):      """      Exception raised when trying to unlock shared database with invalid token.      """ @@ -38,7 +76,8 @@ class InvalidTokenError(errors.U1DBError):      status = 401 -class NotLockedError(errors.U1DBError): +@register_exception +class NotLockedError(SoledadError):      """      Exception raised when trying to unlock shared database when it is not      locked. @@ -48,7 +87,8 @@ class NotLockedError(errors.U1DBError):      status = 404 -class AlreadyLockedError(errors.U1DBError): +@register_exception +class AlreadyLockedError(SoledadError):      """      Exception raised when trying to lock shared database but it is already      locked. @@ -57,13 +97,83 @@ class AlreadyLockedError(errors.U1DBError):      wire_description = "lock is locked"      status = 403 -# update u1db "wire description to status" and "wire description to exception" -# maps. -for e in [InvalidTokenError, NotLockedError, AlreadyLockedError]: -    http_errors.wire_description_to_status.update({ -        e.wire_description: e.status}) -    errors.wire_description_to_exc.update({ -        e.wire_description: e}) + +@register_exception +class LockTimedOutError(SoledadError): +    """ +    Exception raised when timing out while trying to lock the shared database. +    """ + +    wire_description = "lock timed out" +    status = 408 + + +@register_exception +class CouldNotObtainLockError(SoledadError): +    """ +    Exception raised when timing out while trying to lock the shared database. +    """ + +    wire_description = "error obtaining lock" +    status = 500 + + +# +# CouchDatabase errors +# + +@register_exception +class MissingDesignDocError(SoledadError): +    """ +    Raised when trying to access a missing couch design document. +    """ + +    wire_description = "missing design document" +    status = 500 + + +@register_exception +class MissingDesignDocNamedViewError(SoledadError): +    """ +    Raised when trying to access a missing named view on a couch design +    document. +    """ + +    wire_description = "missing design document named function" +    status = 500 + + +@register_exception +class MissingDesignDocListFunctionError(SoledadError): +    """ +    Raised when trying to access a missing list function on a couch design +    document. +    """ + +    wire_description = "missing design document list function" +    status = 500 + + +@register_exception +class MissingDesignDocDeletedError(SoledadError): +    """ +    Raised when trying to access a deleted couch design document. +    """ + +    wire_description = "design document was deleted" +    status = 500 + + +@register_exception +class DesignDocUnknownError(SoledadError): +    """ +    Raised when trying to access a couch design document and getting an +    unknown error. +    """ + +    wire_description = "missing design document unknown error" +    status = 500 +  # u1db error statuses also have to be updated  http_errors.ERROR_STATUSES = set( diff --git a/common/src/leap/soledad/common/tests/couchdb.ini.template b/common/src/leap/soledad/common/tests/couchdb.ini.template index 217ae201..1fc2205b 100644 --- a/common/src/leap/soledad/common/tests/couchdb.ini.template +++ b/common/src/leap/soledad/common/tests/couchdb.ini.template @@ -6,7 +6,7 @@  database_dir = %(tempdir)s/lib  view_index_dir = %(tempdir)s/lib  max_document_size = 4294967296 ; 4 GB -os_process_timeout = 5000 ; 5 seconds. for view and external servers. +os_process_timeout = 120000 ; 120 seconds. for view and external servers.  max_dbs_open = 100  delayed_commits = true ; set this to false to ensure an fsync before 201 Created is returned  uri_file = %(tempdir)s/lib/couch.uri diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 72346333..86bb4b93 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -24,16 +24,17 @@ import re  import copy  import shutil  from base64 import b64decode +from mock import Mock  from couchdb.client import Server -from u1db import errors +from u1db import errors as u1db_errors  from leap.common.files import mkdir_p  from leap.soledad.common.tests import u1db_tests as tests  from leap.soledad.common.tests.u1db_tests import test_backends  from leap.soledad.common.tests.u1db_tests import test_sync -from leap.soledad.common import couch +from leap.soledad.common import couch, errors  import simplejson as json @@ -80,9 +81,10 @@ class CouchDBWrapper(object):          mkdir_p(os.path.join(self.tempdir, 'lib'))          mkdir_p(os.path.join(self.tempdir, 'log'))          args = ['couchdb', '-n', '-a', confPath] -        #null = open('/dev/null', 'w') +        null = open('/dev/null', 'w') +          self.process = subprocess.Popen( -            args, env=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, +            args, env=None, stdout=null.fileno(), stderr=null.fileno(),              close_fds=True)          # find port          logPath = os.path.join(self.tempdir, 'log', 'couch.log') @@ -125,21 +127,21 @@ class CouchDBTestCase(unittest.TestCase):      TestCase base class for tests against a real CouchDB server.      """ -    def setUp(self): +    @classmethod +    def setUpClass(cls):          """          Make sure we have a CouchDB instance for a test.          """ -        self.wrapper = CouchDBWrapper() -        self.wrapper.start() +        cls.wrapper = CouchDBWrapper() +        cls.wrapper.start()          #self.db = self.wrapper.db -        unittest.TestCase.setUp(self) -    def tearDown(self): +    @classmethod +    def tearDownClass(cls):          """          Stop CouchDB instance for test.          """ -        self.wrapper.stop() -        unittest.TestCase.tearDown(self) +        cls.wrapper.stop()  #----------------------------------------------------------------------------- @@ -356,7 +358,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):      def __init__(self, url, dbname, replica_uid=None, full_commit=True,                       session=None, ensure_ddocs=True):          old_class.__init__(self, url, dbname, replica_uid, full_commit, -                           session, ensure_ddocs=True) +                           session, ensure_ddocs=ensure_ddocs)          self._indexes = {}      def _put_doc(self, old_doc, doc): @@ -372,7 +374,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):              if self._indexes[index_name]._definition == list(                      index_expressions):                  return -            raise errors.IndexNameTakenError +            raise u1db_errors.IndexNameTakenError          index = InMemoryIndex(index_name, list(index_expressions))          _, all_docs = self.get_all_docs()          for doc in all_docs: @@ -392,7 +394,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):          try:              index = self._indexes[index_name]          except KeyError: -            raise errors.IndexDoesNotExist +            raise u1db_errors.IndexDoesNotExist          doc_ids = index.lookup(key_values)          result = []          for doc_id in doc_ids: @@ -405,7 +407,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):          try:              index = self._indexes[index_name]          except KeyError: -            raise errors.IndexDoesNotExist +            raise u1db_errors.IndexDoesNotExist          if isinstance(start_value, basestring):              start_value = (start_value,)          if isinstance(end_value, basestring): @@ -420,7 +422,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):          try:              index = self._indexes[index_name]          except KeyError: -            raise errors.IndexDoesNotExist +            raise u1db_errors.IndexDoesNotExist          keys = index.keys()          # XXX inefficiency warning          return list(set([tuple(key.split('\x01')) for key in keys])) @@ -461,4 +463,184 @@ class CouchDatabaseSyncTests(test_sync.DatabaseSyncTests, CouchDBTestCase):          test_sync.DatabaseSyncTests.tearDown(self) +class CouchDatabaseExceptionsTests(CouchDBTestCase): + +    def setUp(self): +        CouchDBTestCase.setUp(self) +        self.db = couch.CouchDatabase( +            'http://127.0.0.1:%d' % self.wrapper.port, 'test', +            ensure_ddocs=False)  # note that we don't enforce ddocs here + +    def tearDown(self): +        self.db.delete_database() + +    def test_missing_design_doc_raises(self): +        """ +        Test that all methods that access design documents will raise if the +        design docs are not present. +        """ +        # _get_generation() +        self.assertRaises( +            errors.MissingDesignDocError, +            self.db._get_generation) +        # _get_generation_info() +        self.assertRaises( +            errors.MissingDesignDocError, +            self.db._get_generation_info) +        # _get_trans_id_for_gen() +        self.assertRaises( +            errors.MissingDesignDocError, +            self.db._get_trans_id_for_gen, 1) +        # _get_transaction_log() +        self.assertRaises( +            errors.MissingDesignDocError, +            self.db._get_transaction_log) +        # whats_changed() +        self.assertRaises( +            errors.MissingDesignDocError, +            self.db.whats_changed) +        # _do_set_replica_gen_and_trans_id() +        self.assertRaises( +            errors.MissingDesignDocError, +            self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) + +    def test_missing_design_doc_functions_raises(self): +        """ +        Test that all methods that access design documents list functions +        will raise if the functions are not present. +        """ +        self.db = couch.CouchDatabase( +            'http://127.0.0.1:%d' % self.wrapper.port, 'test', +            ensure_ddocs=True) +        # erase views from _design/transactions +        transactions = self.db._database['_design/transactions'] +        transactions['lists'] = {} +        self.db._database.save(transactions) +        # _get_generation() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db._get_generation) +        # _get_generation_info() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db._get_generation_info) +        # _get_trans_id_for_gen() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db._get_trans_id_for_gen, 1) +        # whats_changed() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db.whats_changed) + +    def test_absent_design_doc_functions_raises(self): +        """ +        Test that all methods that access design documents list functions +        will raise if the functions are not present. +        """ +        self.db = couch.CouchDatabase( +            'http://127.0.0.1:%d' % self.wrapper.port, 'test', +            ensure_ddocs=True) +        # erase views from _design/transactions +        transactions = self.db._database['_design/transactions'] +        del transactions['lists'] +        self.db._database.save(transactions) +        # _get_generation() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db._get_generation) +        # _get_generation_info() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db._get_generation_info) +        # _get_trans_id_for_gen() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db._get_trans_id_for_gen, 1) +        # whats_changed() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db.whats_changed) + +    def test_missing_design_doc_named_views_raises(self): +        """ +        Test that all methods that access design documents' named views  will +        raise if the views are not present. +        """ +        self.db = couch.CouchDatabase( +            'http://127.0.0.1:%d' % self.wrapper.port, 'test', +            ensure_ddocs=True) +        # erase views from _design/docs +        docs = self.db._database['_design/docs'] +        del docs['views'] +        self.db._database.save(docs) +        # erase views from _design/syncs +        syncs = self.db._database['_design/syncs'] +        del syncs['views'] +        self.db._database.save(syncs) +        # erase views from _design/transactions +        transactions = self.db._database['_design/transactions'] +        del transactions['views'] +        self.db._database.save(transactions) +        # _get_generation() +        self.assertRaises( +            errors.MissingDesignDocNamedViewError, +            self.db._get_generation) +        # _get_generation_info() +        self.assertRaises( +            errors.MissingDesignDocNamedViewError, +            self.db._get_generation_info) +        # _get_trans_id_for_gen() +        self.assertRaises( +            errors.MissingDesignDocNamedViewError, +            self.db._get_trans_id_for_gen, 1) +        # _get_transaction_log() +        self.assertRaises( +            errors.MissingDesignDocNamedViewError, +            self.db._get_transaction_log) +        # whats_changed() +        self.assertRaises( +            errors.MissingDesignDocNamedViewError, +            self.db.whats_changed) + +    def test_deleted_design_doc_raises(self): +        """ +        Test that all methods that access design documents will raise if the +        design docs are not present. +        """ +        self.db = couch.CouchDatabase( +            'http://127.0.0.1:%d' % self.wrapper.port, 'test', +            ensure_ddocs=True) +        # delete _design/docs +        del self.db._database['_design/docs'] +        # delete _design/syncs +        del self.db._database['_design/syncs'] +        # delete _design/transactions +        del self.db._database['_design/transactions'] +        # _get_generation() +        self.assertRaises( +            errors.MissingDesignDocDeletedError, +            self.db._get_generation) +        # _get_generation_info() +        self.assertRaises( +            errors.MissingDesignDocDeletedError, +            self.db._get_generation_info) +        # _get_trans_id_for_gen() +        self.assertRaises( +            errors.MissingDesignDocDeletedError, +            self.db._get_trans_id_for_gen, 1) +        # _get_transaction_log() +        self.assertRaises( +            errors.MissingDesignDocDeletedError, +            self.db._get_transaction_log) +        # whats_changed() +        self.assertRaises( +            errors.MissingDesignDocDeletedError, +            self.db.whats_changed) +        # _do_set_replica_gen_and_trans_id() +        self.assertRaises( +            errors.MissingDesignDocDeletedError, +            self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) + +  load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index a0c473b1..3c457cc5 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -33,11 +33,17 @@ from leap.soledad.common.tests.test_target import (      make_leap_document_for_test,      token_leap_sync_target,  ) +from leap.soledad.common.tests.test_server import _couch_ensure_database  REPEAT_TIMES = 20 +# monkey path CouchServerState so it can ensure databases. + +CouchServerState.ensure_database = _couch_ensure_database + +  class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):      @staticmethod @@ -100,6 +106,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):          self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")      def tearDown(self): +        self.db.delete_database()          CouchDBTestCase.tearDown(self)          TestCaseWithServer.tearDown(self) @@ -337,3 +344,46 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):              self.assertEqual(                  1,                  len(filter(lambda t: t[0] == doc_id, transaction_log))) + +    def test_concurrent_syncs_do_not_fail(self): +        """ +        Assert that concurrent attempts to sync end up being executed +        sequentially and do not fail. +        """ +        threads = [] +        docs = [] +        pool = threading.BoundedSemaphore(value=1) +        self.startServer() +        sol = self._soledad_instance( +            auth_token='auth-token', +            server_url=self.getURL()) + +        def _run_method(self): +            # create a lot of documents +            doc = self._params['sol'].create_doc({}) +            # do the sync! +            sol.sync() +            pool.acquire() +            docs.append(doc.doc_id) +            pool.release() + +        # launch threads to create documents in parallel +        for i in range(0, REPEAT_TIMES): +            thread = self._WorkerThread( +                {'sol': sol, 'syncs': i}, +                _run_method) +            thread.start() +            threads.append(thread) + +        # wait for threads to finish +        for thread in threads: +            thread.join() + +        transaction_log = self.db._get_transaction_log() +        self.assertEqual(REPEAT_TIMES, len(transaction_log)) +        # assert all documents are in the remote log +        self.assertEqual(REPEAT_TIMES, len(docs)) +        for doc_id in docs: +            self.assertEqual( +                1, +                len(filter(lambda t: t[0] == doc_id, transaction_log))) diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index 83df192b..f8d2a64f 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -25,6 +25,7 @@ import tempfile  import simplejson as json  import mock  import time +import binascii  from leap.common.testing.basetest import BaseLeapTest @@ -50,6 +51,17 @@ from leap.soledad.server import SoledadApp, LockResource  from leap.soledad.server.auth import URLToAuthorization +# monkey path CouchServerState so it can ensure databases. + +def _couch_ensure_database(self, dbname): +    db = CouchDatabase.open_database( +        self._couch_url + '/' + dbname, +        create=True) +    return db, db._replica_uid + +CouchServerState.ensure_database = _couch_ensure_database + +  class ServerAuthorizationTestCase(BaseLeapTest):      """      Tests related to Soledad server authorization. @@ -339,15 +351,16 @@ class EncryptedSyncTestCase(          _, doclist = sol1.get_all_docs()          self.assertEqual([], doclist)          doc1 = sol1.create_doc(json.loads(simple_doc)) -        # sync with server -        sol1._server_url = self.getURL() -        sol1.sync() -        # assert doc was sent to couch db +        # ensure remote db exists before syncing          db = CouchDatabase(              self._couch_url,              # the name of the user database is "user-<uuid>".              'user-user-uuid',          ) +        # sync with server +        sol1._server_url = self.getURL() +        sol1.sync() +        # assert doc was sent to couch db          _, doclist = db.get_all_docs()          self.assertEqual(1, len(doclist))          couchdoc = doclist[0] @@ -376,6 +389,7 @@ class EncryptedSyncTestCase(          doc2 = doclist[0]          # assert incoming doc is equal to the first sent doc          self.assertEqual(doc1, doc2) +        db.delete_database()      def test_encrypted_sym_sync_with_unicode_passphrase(self):          """ @@ -393,15 +407,16 @@ class EncryptedSyncTestCase(          _, doclist = sol1.get_all_docs()          self.assertEqual([], doclist)          doc1 = sol1.create_doc(json.loads(simple_doc)) -        # sync with server -        sol1._server_url = self.getURL() -        sol1.sync() -        # assert doc was sent to couch db +        # ensure remote db exists before syncing          db = CouchDatabase(              self._couch_url,              # the name of the user database is "user-<uuid>".              'user-user-uuid',          ) +        # sync with server +        sol1._server_url = self.getURL() +        sol1.sync() +        # assert doc was sent to couch db          _, doclist = db.get_all_docs()          self.assertEqual(1, len(doclist))          couchdoc = doclist[0] @@ -434,7 +449,94 @@ class EncryptedSyncTestCase(          doc2 = doclist[0]          # assert incoming doc is equal to the first sent doc          self.assertEqual(doc1, doc2) +        db.delete_database() +    def test_sync_very_large_files(self): +        """ +        Test if Soledad can sync very large files. +        """ +        # define the size of the "very large file" +        length = 100*(10**6)  # 100 MB +        self.startServer() +        # instantiate soledad and create a document +        sol1 = self._soledad_instance( +            # token is verified in test_target.make_token_soledad_app +            auth_token='auth-token' +        ) +        _, doclist = sol1.get_all_docs() +        self.assertEqual([], doclist) +        content = binascii.hexlify(os.urandom(length/2))  # len() == length +        doc1 = sol1.create_doc({'data': content}) +        # ensure remote db exists before syncing +        db = CouchDatabase( +            self._couch_url, +            # the name of the user database is "user-<uuid>". +            'user-user-uuid', +        ) +        # sync with server +        sol1._server_url = self.getURL() +        sol1.sync() +        # instantiate soledad with empty db, but with same secrets path +        sol2 = self._soledad_instance(prefix='x', auth_token='auth-token') +        _, doclist = sol2.get_all_docs() +        self.assertEqual([], doclist) +        sol2._secrets_path = sol1.secrets_path +        sol2._load_secrets() +        sol2._set_secret_id(sol1._secret_id) +        # sync the new instance +        sol2._server_url = self.getURL() +        sol2.sync() +        _, doclist = sol2.get_all_docs() +        self.assertEqual(1, len(doclist)) +        doc2 = doclist[0] +        # assert incoming doc is equal to the first sent doc +        self.assertEqual(doc1, doc2) +        # delete remote database +        db.delete_database() + + +    def test_sync_many_small_files(self): +        """ +        Test if Soledad can sync many smallfiles. +        """ +        number_of_docs = 100 +        self.startServer() +        # instantiate soledad and create a document +        sol1 = self._soledad_instance( +            # token is verified in test_target.make_token_soledad_app +            auth_token='auth-token' +        ) +        _, doclist = sol1.get_all_docs() +        self.assertEqual([], doclist) +        # create many small files +        for i in range(0, number_of_docs): +            sol1.create_doc(json.loads(simple_doc)) +        # ensure remote db exists before syncing +        db = CouchDatabase( +            self._couch_url, +            # the name of the user database is "user-<uuid>". +            'user-user-uuid', +        ) +        # sync with server +        sol1._server_url = self.getURL() +        sol1.sync() +        # instantiate soledad with empty db, but with same secrets path +        sol2 = self._soledad_instance(prefix='x', auth_token='auth-token') +        _, doclist = sol2.get_all_docs() +        self.assertEqual([], doclist) +        sol2._secrets_path = sol1.secrets_path +        sol2._load_secrets() +        sol2._set_secret_id(sol1._secret_id) +        # sync the new instance +        sol2._server_url = self.getURL() +        sol2.sync() +        _, doclist = sol2.get_all_docs() +        self.assertEqual(number_of_docs, len(doclist)) +        # assert incoming docs are equal to sent docs +        for doc in doclist: +            self.assertEqual(sol1.get_doc(doc.doc_id), doc) +        # delete remote database +        db.delete_database()  class LockResourceTestCase(          CouchDBTestCase, TestCaseWithServer): @@ -455,12 +557,21 @@ class LockResourceTestCase(          CouchDBTestCase.setUp(self)          self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")          self._couch_url = 'http://localhost:' + str(self.wrapper.port) +        # create the databases +        CouchDatabase(self._couch_url, 'shared') +        CouchDatabase(self._couch_url, 'tokens')          self._state = CouchServerState(              self._couch_url, 'shared', 'tokens')      def tearDown(self):          CouchDBTestCase.tearDown(self)          TestCaseWithServer.tearDown(self) +        # delete remote database +        db = CouchDatabase( +            self._couch_url, +            'shared', +        ) +        db.delete_database()      def test__try_obtain_filesystem_lock(self):          responder = mock.Mock()  | 
