diff options
| author | Tomás Touceda <chiiph@leap.se> | 2014-04-04 16:34:57 -0300 | 
|---|---|---|
| committer | Tomás Touceda <chiiph@leap.se> | 2014-04-04 16:34:57 -0300 | 
| commit | ce22976cc0e203e53799e771aa5e3717d498cc5c (patch) | |
| tree | 8c69733f1f8a83ac83e40bf7e522a5fe8eae9b50 /common/src | |
| parent | deb78a5f3502ece98ec3e0b70f93025c4a1b3da5 (diff) | |
| parent | a3fed4d42ab4a7be7bc7ebe86b35805ac73d62de (diff) | |
Diffstat (limited to 'common/src')
23 files changed, 2595 insertions, 922 deletions
diff --git a/common/src/leap/soledad/common/.gitignore b/common/src/leap/soledad/common/.gitignore new file mode 100644 index 00000000..3378c78a --- /dev/null +++ b/common/src/leap/soledad/common/.gitignore @@ -0,0 +1 @@ +ddocs.py diff --git a/common/src/leap/soledad/common/README.txt b/common/src/leap/soledad/common/README.txt new file mode 100644 index 00000000..106efb5e --- /dev/null +++ b/common/src/leap/soledad/common/README.txt @@ -0,0 +1,79 @@ +Soledad common package +====================== + +This package contains Soledad bits used by both server and client. + +Couch U1DB Backend +------------------ + +U1DB backends rely on some atomic operations that modify documents contents +and metadata (conflicts, transaction ids and indexes). The only atomic +operation in Couch is a document put, so every u1db atomic operation has to be +mapped to a couch document put. + +The atomic operations in the U1DB SQLite reference backend implementation may +be identified by the use of a context manager to access the underlying +database. A listing of the methods involved in each atomic operation are +depiced below. The top-level elements correpond to the atomic operations that +have to be mapped, and items on deeper levels of the list have to be +implemented in a way that all changes will be pushed with just one operation. + +    * _set_replica_uid +    * put_doc: +        * _get_doc +        * _put_and_update_indexes +            * insert/update the document +            * insert into transaction log +    * delete_doc +        * _get_doc +        * _put_and_update_indexes +    * get_doc_conflicts +        * _get_conflicts +    * _set_replica_gen_and_trans_id +        * _do_set_replica_gen_and_trans_id +    * _put_doc_if_newer +        * _get_doc +        * _validate_source (**) +            * _get_replica_gen_and_trans_id +        * cases: +            * is newer: +                * _prune_conflicts (**) +                    * _has_conflicts +                    * _delete_conflicts +                * _put_and_update_indexes +            * same content as: +                * _put_and_update_indexes +            * conflicted: +                * _force_doc_sync_conflict +                    * _prune_conflicts +                    * _add_conflict +                    * _put_and_update_indexes +        * _do_set_replica_gen_and_trans_id +    * resolve_doc +        * _get_doc +        * cases: +            * doc is superseded +                * _put_and_update_indexes +            * else +                * _add_conflict +        * _delete_conflicts +    * delete_index +    * create_index + +Couch views and update functions are used in order to achieve atomicity on the +Couch backend. Transactions are stored in the `u1db_transactions` field of the +couch document. Document's content and conflicted versions are stored as couch +document attachments with names, respectivelly, `u1db_content` and +`u1db_conflicts`. + +A map of methods and couch query URI can be found on the `./ddocs/README.txt` +document. + +Notes: + +  * Currently, the couch backend does not implement indexing, so what is +    depicted as `_put_and_update_indexes` above will be found as `_put_doc` in +    the backend. + +  * Conflict updates are part of document put using couch update functions, +    and as such are part of the same atomic operation as document put. diff --git a/common/src/leap/soledad/common/_version.py b/common/src/leap/soledad/common/_version.py index 1d020a14..7d4262b5 100644 --- a/common/src/leap/soledad/common/_version.py +++ b/common/src/leap/soledad/common/_version.py @@ -17,6 +17,7 @@ git_full = "$Format:%H$"  import subprocess  import sys +  def run_command(args, cwd=None, verbose=False):      try:          # remember shell=False, so use git.cmd on windows, not just git @@ -36,11 +37,10 @@ def run_command(args, cwd=None, verbose=False):          return None      return stdout - -import sys  import re  import os.path +  def get_expanded_variables(versionfile_source):      # the code embedded in _version.py can just fetch the value of these      # variables. When used from setup.py, we don't want to import @@ -48,7 +48,7 @@ def get_expanded_variables(versionfile_source):      # used from _version.py.      variables = {}      try: -        f = open(versionfile_source,"r") +        f = open(versionfile_source, "r")          for line in f.readlines():              if line.strip().startswith("git_refnames ="):                  mo = re.search(r'=\s*"(.*)"', line) @@ -63,12 +63,13 @@ def get_expanded_variables(versionfile_source):          pass      return variables +  def versions_from_expanded_variables(variables, tag_prefix, verbose=False):      refnames = variables["refnames"].strip()      if refnames.startswith("$Format"):          if verbose:              print("variables are unexpanded, not using") -        return {} # unexpanded, so not in an unpacked git-archive tarball +        return {}  # unexpanded, so not in an unpacked git-archive tarball      refs = set([r.strip() for r in refnames.strip("()").split(",")])      # starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of      # just "foo-1.0". If we see a "tag: " prefix, prefer those. @@ -93,13 +94,14 @@ def versions_from_expanded_variables(variables, tag_prefix, verbose=False):              r = ref[len(tag_prefix):]              if verbose:                  print("picking %s" % r) -            return { "version": r, -                     "full": variables["full"].strip() } +            return {"version": r, +                    "full": variables["full"].strip()}      # no suitable tags, so we use the full revision id      if verbose:          print("no suitable tags, using full revision id") -    return { "version": variables["full"].strip(), -             "full": variables["full"].strip() } +    return {"version": variables["full"].strip(), +            "full": variables["full"].strip()} +  def versions_from_vcs(tag_prefix, versionfile_source, verbose=False):      # this runs 'git' from the root of the source tree. That either means @@ -116,7 +118,7 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False):          here = os.path.abspath(__file__)      except NameError:          # some py2exe/bbfreeze/non-CPython implementations don't do __file__ -        return {} # not always correct +        return {}  # not always correct      # versionfile_source is the relative path from the top of the source tree      # (where the .git directory might live) to this file. Invert this to find @@ -126,7 +128,16 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False):          for i in range(len(versionfile_source.split("/"))):              root = os.path.dirname(root)      else: -        root = os.path.dirname(here) +        root = os.path.dirname( +            os.path.join('..', here)) + +    ###################################################### +    # XXX patch for our specific configuration with +    # the three projects leap.soledad.{common, client, server} +    # inside the same repo. +    ###################################################### +    root = os.path.dirname(os.path.join('..', root)) +      if not os.path.exists(os.path.join(root, ".git")):          if verbose:              print("no .git in %s" % root) @@ -141,7 +152,8 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False):          return {}      if not stdout.startswith(tag_prefix):          if verbose: -            print("tag '%s' doesn't start with prefix '%s'" % (stdout, tag_prefix)) +            print("tag '%s' doesn't start with prefix '%s'" % ( +                stdout, tag_prefix))          return {}      tag = stdout[len(tag_prefix):]      stdout = run_command([GIT, "rev-parse", "HEAD"], cwd=root) @@ -153,7 +165,8 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False):      return {"version": tag, "full": full} -def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False): +def versions_from_parentdir(parentdir_prefix, versionfile_source, +                            verbose=False):      if IN_LONG_VERSION_PY:          # We're running from _version.py. If it's from a source tree          # (execute-in-place), we can work upwards to find the root of the @@ -163,7 +176,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)              here = os.path.abspath(__file__)          except NameError:              # py2exe/bbfreeze/non-CPython don't have __file__ -            return {} # without __file__, we have no hope +            return {}  # without __file__, we have no hope          # versionfile_source is the relative path from the top of the source          # tree to _version.py. Invert this to find the root from __file__.          root = here @@ -180,7 +193,8 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)      dirname = os.path.basename(root)      if not dirname.startswith(parentdir_prefix):          if verbose: -            print("guessing rootdir is '%s', but '%s' doesn't start with prefix '%s'" % +            print("guessing rootdir is '%s', but '%s' doesn't start with " +                  "prefix '%s'" %                    (root, dirname, parentdir_prefix))          return None      return {"version": dirname[len(parentdir_prefix):], "full": ""} @@ -189,8 +203,9 @@ tag_prefix = ""  parentdir_prefix = "leap.soledad.common-"  versionfile_source = "src/leap/soledad/common/_version.py" +  def get_versions(default={"version": "unknown", "full": ""}, verbose=False): -    variables = { "refnames": git_refnames, "full": git_full } +    variables = {"refnames": git_refnames, "full": git_full}      ver = versions_from_expanded_variables(variables, tag_prefix, verbose)      if not ver:          ver = versions_from_vcs(tag_prefix, versionfile_source, verbose) @@ -200,4 +215,3 @@ def get_versions(default={"version": "unknown", "full": ""}, verbose=False):      if not ver:          ver = default      return ver - diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 1396f4d7..8e8613a1 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -18,189 +18,381 @@  """A U1DB backend that uses CouchDB as its persistence layer.""" -import re  import simplejson as json -import socket +import re +import uuid  import logging +import binascii +import socket +import time +import sys +import threading -from u1db import errors -from u1db.sync import Synchronizer -from u1db.backends.inmemory import InMemoryIndex -from u1db.remote.server_state import ServerState -from u1db.errors import DatabaseDoesNotExist -from couchdb.client import Server, Document as CouchDocument -from couchdb.http import ResourceNotFound, Unauthorized +from StringIO import StringIO +from collections import defaultdict -from leap.soledad.common import USER_DB_PREFIX -from leap.soledad.common.objectstore import ( -    ObjectStoreDatabase, -    ObjectStoreSyncTarget, +from couchdb.client import Server +from couchdb.http import ( +    ResourceConflict, +    ResourceNotFound, +    ServerError, +    Session, +) +from u1db import query_parser, vectorclock +from u1db.errors import ( +    DatabaseDoesNotExist, +    InvalidGeneration, +    RevisionConflict, +    InvalidDocId, +    ConflictedDoc, +    DocumentDoesNotExist, +    DocumentAlreadyDeleted, +    Unauthorized,  ) +from u1db.backends import CommonBackend, CommonSyncTarget +from u1db.remote import http_app +from u1db.remote.server_state import ServerState + + +from leap.soledad.common import USER_DB_PREFIX, ddocs, errors +from leap.soledad.common.document import SoledadDocument  logger = logging.getLogger(__name__) +COUCH_TIMEOUT = 120  # timeout for transfers between Soledad server and Couch + +  class InvalidURLError(Exception):      """      Exception raised when Soledad encounters a malformed URL.      """ -def persistent_class(cls): +class CouchDocument(SoledadDocument):      """ -    Decorator that modifies a class to ensure u1db metadata persists on -    underlying storage. +    This is the document used for maintaining the Couch backend. + +    A CouchDocument can fetch and manipulate conflicts and also holds a +    reference to the couch document revision. This data is used to ensure an +    atomic and consistent update of the database. +    """ + +    def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, +                 syncable=True): +        """ +        Container for handling a document that is stored in couch backend. + +        :param doc_id: The unique document identifier. +        :type doc_id: str +        :param rev: The revision identifier of the document. +        :type rev: str +        :param json: The JSON string for this document. +        :type json: str +        :param has_conflicts: Boolean indicating if this document has conflicts +        :type has_conflicts: bool +        :param syncable: Should this document be synced with remote replicas? +        :type syncable: bool +        """ +        SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) +        self._couch_rev = None +        self._conflicts = None +        self._transactions = None + +    def _ensure_fetch_conflicts(self, get_conflicts_fun): +        """ +        Ensure conflict data has been fetched from the server. + +        :param get_conflicts_fun: A function which, given the document id and +                                  the couch revision, return the conflicted +                                  versions of the current document. +        :type get_conflicts_fun: function +        """ +        if self._conflicts is None: +            self._conflicts = get_conflicts_fun(self.doc_id, +                                                couch_rev=self.couch_rev) +        self.has_conflicts = len(self._conflicts) > 0 + +    def get_conflicts(self): +        """ +        Get the conflicted versions of the document. + +        :return: The conflicted versions of the document. +        :rtype: [CouchDocument] +        """ +        return self._conflicts + +    def set_conflicts(self, conflicts): +        """ +        Set the conflicted versions of the document. + +        :param conflicts: The conflicted versions of the document. +        :type conflicts: list +        """ +        self._conflicts = conflicts +        self.has_conflicts = len(self._conflicts) > 0 -    @param cls: The class that will be modified. -    @type cls: type +    def add_conflict(self, doc): +        """ +        Add a conflict to this document. + +        :param doc: The conflicted version to be added. +        :type doc: CouchDocument +        """ +        if self._conflicts is None: +            raise Exception("Run self._ensure_fetch_conflicts first!") +        self._conflicts.append(doc) +        self.has_conflicts = len(self._conflicts) > 0 + +    def delete_conflicts(self, conflict_revs): +        """ +        Delete conflicted versions of this document. + +        :param conflict_revs: The conflicted revisions to be deleted. +        :type conflict_revs: [str] +        """ +        if self._conflicts is None: +            raise Exception("Run self._ensure_fetch_conflicts first!") +        conflicts_len = len(self._conflicts) +        self._conflicts = filter( +            lambda doc: doc.rev not in conflict_revs, +            self._conflicts) +        self.has_conflicts = len(self._conflicts) > 0 + +    def _get_couch_rev(self): +        return self._couch_rev + +    def _set_couch_rev(self, rev): +        self._couch_rev = rev + +    couch_rev = property(_get_couch_rev, _set_couch_rev) + +    def _get_transactions(self): +        return self._transactions + +    def _set_transactions(self, rev): +        self._transactions = rev + +    transactions = property(_get_transactions, _set_transactions) + + +# monkey-patch the u1db http app to use CouchDocument +http_app.Document = CouchDocument + + +def raise_missing_design_doc_error(exc, ddoc_path): +    """ +    Raise an appropriate exception when catching a ResourceNotFound when +    accessing a design document. + +    :param exc: The exception cought. +    :type exc: ResourceNotFound +    :param ddoc_path: A list representing the requested path. +    :type ddoc_path: list + +    :raise MissingDesignDocError: Raised when tried to access a missing design +                                  document. +    :raise MissingDesignDocListFunctionError: Raised when trying to access a +                                              missing list function on a +                                              design document. +    :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                           missing named view on a design +                                           document. +    :raise MissingDesignDocDeletedError: Raised when trying to access a +                                         deleted design document. +    :raise MissingDesignDocUnknownError: Raised when failed to access a design +                                         document for an yet unknown reason.      """ +    path = "".join(ddoc_path) +    if exc.message[1] == 'missing': +        raise errors.MissingDesignDocError(path) +    elif exc.message[1] == 'missing function' or \ +            exc.message[1].startswith('missing lists function'): +        raise errors.MissingDesignDocListFunctionError(path) +    elif exc.message[1] == 'missing_named_view': +        raise errors.MissingDesignDocNamedViewError(path) +    elif exc.message[1] == 'deleted': +        raise errors.MissingDesignDocDeletedError(path) +    # other errors are unknown for now +    raise errors.DesignDocUnknownError("%s: %s" % (path, str(exc.message))) + + +def raise_server_error(exc, ddoc_path): +    """ +    Raise an appropriate exception when catching a ServerError when +    accessing a design document. + +    :param exc: The exception cought. +    :type exc: ResourceNotFound +    :param ddoc_path: A list representing the requested path. +    :type ddoc_path: list + +    :raise MissingDesignDocListFunctionError: Raised when trying to access a +                                              missing list function on a +                                              design document. +    :raise MissingDesignDocUnknownError: Raised when failed to access a design +                                         document for an yet unknown reason. +    """ +    path = "".join(ddoc_path) +    if exc.message[1][0] == 'unnamed_error': +        raise errors.MissingDesignDocListFunctionError(path) +    # other errors are unknown for now +    raise errors.DesignDocUnknownError(path) + -    def _create_persistent_method(old_method_name, key, load_method_name, -                                  dump_method_name, store): -        """ -        Create a persistent method to replace C{old_method_name}. - -        The new method will load C{key} using C{load_method_name} and stores -        it using C{dump_method_name} depending on the value of C{store}. -        """ -        # get methods -        old_method = getattr(cls, old_method_name) -        load_method = getattr(cls, load_method_name) \ -            if load_method_name is not None \ -            else lambda self, data: setattr(self, key, data) -        dump_method = getattr(cls, dump_method_name) \ -            if dump_method_name is not None \ -            else lambda self: getattr(self, key) - -        def _new_method(self, *args, **kwargs): -            # get u1db data from couch db -            doc = self._get_doc('%s%s' % -                                (self.U1DB_DATA_DOC_ID_PREFIX, key)) -            load_method(self, doc.content['content']) -            # run old method -            retval = old_method(self, *args, **kwargs) -            # store u1db data on couch -            if store: -                doc.content = {'content': dump_method(self)} -                self._put_doc(doc) -            return retval - -        return _new_method - -    # ensure the class has a persistency map -    if not hasattr(cls, 'PERSISTENCY_MAP'): -        logger.error('Class %s has no PERSISTENCY_MAP attribute, skipping ' -                     'persistent methods substitution.' % cls) -        return cls -    # replace old methods with new persistent ones -    for key, ((load_method_name, dump_method_name), -              persistent_methods) in cls.PERSISTENCY_MAP.iteritems(): -        for (method_name, store) in persistent_methods: -            setattr(cls, method_name, -                    _create_persistent_method( -                        method_name, -                        key, -                        load_method_name, -                        dump_method_name, -                        store)) -    return cls - - -@persistent_class -class CouchDatabase(ObjectStoreDatabase): +class MultipartWriter(object):      """ -    A U1DB backend that uses Couch as its persistence layer. +    A multipart writer adapted from python-couchdb's one so we can PUT +    documents using couch's multipart PUT. + +    This stripped down version does not allow for nested structures, and +    contains only the essential things we need to PUT SoledadDocuments to the +    couch backend.      """ -    U1DB_TRANSACTION_LOG_KEY = '_transaction_log' -    U1DB_CONFLICTS_KEY = '_conflicts' -    U1DB_OTHER_GENERATIONS_KEY = '_other_generations' -    U1DB_INDEXES_KEY = '_indexes' -    U1DB_REPLICA_UID_KEY = '_replica_uid' - -    U1DB_DATA_KEYS = [ -        U1DB_TRANSACTION_LOG_KEY, -        U1DB_CONFLICTS_KEY, -        U1DB_OTHER_GENERATIONS_KEY, -        U1DB_INDEXES_KEY, -        U1DB_REPLICA_UID_KEY, -    ] - -    COUCH_ID_KEY = '_id' -    COUCH_REV_KEY = '_rev' -    COUCH_U1DB_ATTACHMENT_KEY = 'u1db_json' -    COUCH_U1DB_REV_KEY = 'u1db_rev' - -    # the following map describes information about methods usage of -    # properties that have to persist on the underlying database. The format -    # of the map is assumed to be: -    # -    #     { -    #         'property_name': [ -    #             ('property_load_method_name', 'property_dump_method_name'), -    #             [('method_1_name', bool), -    #              ... -    #              ('method_N_name', bool)]], -    #         ... -    #     } -    # -    # where the booleans indicate if the property should be stored after -    # each method execution (i.e. if the method alters the property). Property -    # load/dump methods will be run after/before properties are read/written -    # to the underlying db. -    PERSISTENCY_MAP = { -        U1DB_TRANSACTION_LOG_KEY: [ -            ('_load_transaction_log_from_json', None), -            [('_get_transaction_log', False), -             ('_get_generation', False), -             ('_get_generation_info', False), -             ('_get_trans_id_for_gen', False), -             ('whats_changed', False), -             ('_put_and_update_indexes', True)]], -        U1DB_CONFLICTS_KEY: [ -            (None, None), -            [('_has_conflicts', False), -             ('get_doc_conflicts', False), -             ('_prune_conflicts', False), -             ('resolve_doc', False), -             ('_replace_conflicts', True), -             ('_force_doc_sync_conflict', True)]], -        U1DB_OTHER_GENERATIONS_KEY: [ -            ('_load_other_generations_from_json', None), -            [('_get_replica_gen_and_trans_id', False), -             ('_do_set_replica_gen_and_trans_id', True)]], -        U1DB_INDEXES_KEY: [ -            ('_load_indexes_from_json', '_dump_indexes_as_json'), -            [('list_indexes', False), -             ('get_from_index', False), -             ('get_range_from_index', False), -             ('get_index_keys', False), -             ('_put_and_update_indexes', True), -             ('create_index', True), -             ('delete_index', True)]], -        U1DB_REPLICA_UID_KEY: [ -            (None, None), -            [('_allocate_doc_rev', False), -             ('_put_doc_if_newer', False), -             ('_ensure_maximal_rev', False), -             ('_prune_conflicts', False), -             ('_set_replica_uid', True)]]} +    CRLF = '\r\n' + +    def __init__(self, fileobj, headers=None, boundary=None): +        """ +        Initialize the multipart writer. +        """ +        self.fileobj = fileobj +        if boundary is None: +            boundary = self._make_boundary() +        self._boundary = boundary +        self._build_headers('related', headers) + +    def add(self, mimetype, content, headers={}): +        """ +        Add a part to the multipart stream. +        """ +        self.fileobj.write('--') +        self.fileobj.write(self._boundary) +        self.fileobj.write(self.CRLF) +        headers['Content-Type'] = mimetype +        self._write_headers(headers) +        if content: +            # XXX: throw an exception if a boundary appears in the content?? +            self.fileobj.write(content) +            self.fileobj.write(self.CRLF) + +    def close(self): +        """ +        Close the multipart stream. +        """ +        self.fileobj.write('--') +        self.fileobj.write(self._boundary) +        # be careful not to have anything after '--', otherwise old couch +        # versions (including bigcouch) will fail. +        self.fileobj.write('--') + +    def _make_boundary(self): +        """ +        Create a boundary to discern multi parts. +        """ +        try: +            from uuid import uuid4 +            return '==' + uuid4().hex + '==' +        except ImportError: +            from random import randrange +            token = randrange(sys.maxint) +            format = '%%0%dd' % len(repr(sys.maxint - 1)) +            return '===============' + (format % token) + '==' + +    def _write_headers(self, headers): +        """ +        Write a part header in the buffer stream. +        """ +        if headers: +            for name in sorted(headers.keys()): +                value = headers[name] +                self.fileobj.write(name) +                self.fileobj.write(': ') +                self.fileobj.write(value) +                self.fileobj.write(self.CRLF) +        self.fileobj.write(self.CRLF) + +    def _build_headers(self, subtype, headers): +        """ +        Build the main headers of the multipart stream. + +        This is here so we can send headers separete from content using +        python-couchdb API. +        """ +        self.headers = {} +        self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \ +                                       (subtype, self._boundary) +        if headers: +            for name in sorted(headers.keys()): +                value = headers[name] +                self.headers[name] = value + + +class CouchDatabase(CommonBackend): +    """ +    A U1DB implementation that uses CouchDB as its persistence layer. +    """ + +    # We spawn threads to parallelize the CouchDatabase.get_docs() method +    MAX_GET_DOCS_THREADS = 20 + +    update_handler_lock = defaultdict(threading.Lock) + +    class _GetDocThread(threading.Thread): +        """ +        A thread that gets a document from a database. + +        TODO: switch this for a twisted deferred to thread. This depends on +        replacing python-couchdb for paisley in this module. +        """ + +        def __init__(self, db, doc_id, check_for_conflicts, +                     release_fun): +            """ +            :param db: The database from where to get the document. +            :type db: u1db.Database +            :param doc_id: The doc_id of the document to be retrieved. +            :type doc_id: str +            :param check_for_conflicts: Whether the get_doc() method should +                                        check for existing conflicts. +            :type check_for_conflicts: bool +            :param release_fun: A function that releases a semaphore, to be +                                called after the document is fetched. +            :type release_fun: function +            """ +            threading.Thread.__init__(self) +            self._db = db +            self._doc_id = doc_id +            self._check_for_conflicts = check_for_conflicts +            self._release_fun = release_fun +            self._doc = None + +        def run(self): +            """ +            Fetch the document, store it as a property, and call the release +            function. +            """ +            self._doc = self._db._get_doc( +                self._doc_id, self._check_for_conflicts) +            self._release_fun()      @classmethod -    def open_database(cls, url, create): +    def open_database(cls, url, create, ensure_ddocs=False):          """          Open a U1DB database using CouchDB as backend. -        @param url: the url of the database replica -        @type url: str -        @param create: should the replica be created if it does not exist? -        @type create: bool +        :param url: the url of the database replica +        :type url: str +        :param create: should the replica be created if it does not exist? +        :type create: bool +        :param ensure_ddocs: Ensure that the design docs exist on server. +        :type ensure_ddocs: bool -        @return: the database instance -        @rtype: CouchDatabase +        :return: the database instance +        :rtype: CouchDatabase          """          # get database from url          m = re.match('(^https?://[^/]+)/(.+)$', url) @@ -214,308 +406,1057 @@ class CouchDatabase(ObjectStoreDatabase):          except ResourceNotFound:              if not create:                  raise DatabaseDoesNotExist() -        return cls(url, dbname) +        return cls(url, dbname, ensure_ddocs=ensure_ddocs)      def __init__(self, url, dbname, replica_uid=None, full_commit=True, -                 session=None): +                 session=None, ensure_ddocs=True):          """          Create a new Couch data container. -        @param url: the url of the couch database -        @type url: str -        @param dbname: the database name -        @type dbname: str -        @param replica_uid: an optional unique replica identifier -        @type replica_uid: str -        @param full_commit: turn on the X-Couch-Full-Commit header -        @type full_commit: bool -        @param session: an http.Session instance or None for a default session -        @type session: http.Session +        :param url: the url of the couch database +        :type url: str +        :param dbname: the database name +        :type dbname: str +        :param replica_uid: an optional unique replica identifier +        :type replica_uid: str +        :param full_commit: turn on the X-Couch-Full-Commit header +        :type full_commit: bool +        :param session: an http.Session instance or None for a default session +        :type session: http.Session +        :param ensure_ddocs: Ensure that the design docs exist on server. +        :type ensure_ddocs: bool          """          # save params          self._url = url          self._full_commit = full_commit +        if session is None: +            session = Session(timeout=COUCH_TIMEOUT)          self._session = session +        self._factory = CouchDocument +        self._real_replica_uid = None          # configure couch          self._server = Server(url=self._url,                                full_commit=self._full_commit,                                session=self._session)          self._dbname = dbname -        # this will ensure that transaction and sync logs exist and are -        # up-to-date.          try:              self._database = self._server[self._dbname]          except ResourceNotFound:              self._server.create(self._dbname)              self._database = self._server[self._dbname] -        ObjectStoreDatabase.__init__(self, replica_uid=replica_uid) +        if replica_uid is not None: +            self._set_replica_uid(replica_uid) +        if ensure_ddocs: +            self.ensure_ddocs_on_db() +        # initialize a thread pool for parallelizing get_docs() +        self._sem_pool = threading.BoundedSemaphore( +            value=self.MAX_GET_DOCS_THREADS) + +    def ensure_ddocs_on_db(self): +        """ +        Ensure that the design documents used by the backend exist on the +        couch database. +        """ +        # we check for existence of one of the files, and put all of them if +        # that one does not exist +        try: +            self._database['_design/docs'] +            return +        except ResourceNotFound: +            for ddoc_name in ['docs', 'syncs', 'transactions']: +                ddoc = json.loads( +                    binascii.a2b_base64( +                        getattr(ddocs, ddoc_name))) +                self._database.save(ddoc) + +    def get_sync_target(self): +        """ +        Return a SyncTarget object, for another u1db to synchronize with. + +        :return: The sync target. +        :rtype: CouchSyncTarget +        """ +        return CouchSyncTarget(self) -    #------------------------------------------------------------------------- -    # methods from Database -    #------------------------------------------------------------------------- +    def delete_database(self): +        """ +        Delete a U1DB CouchDB database. +        """ +        del(self._server[self._dbname]) + +    def close(self): +        """ +        Release any resources associated with this database. + +        :return: True if db was succesfully closed. +        :rtype: bool +        """ +        self._url = None +        self._full_commit = None +        self._session = None +        self._server = None +        self._database = None +        return True + +    def _set_replica_uid(self, replica_uid): +        """ +        Force the replica uid to be set. + +        :param replica_uid: The new replica uid. +        :type replica_uid: str +        """ +        try: +            # set on existent config document +            doc = self._database['u1db_config'] +            doc['replica_uid'] = replica_uid +        except ResourceNotFound: +            # or create the config document +            doc = { +                '_id': 'u1db_config', +                'replica_uid': replica_uid, +            } +        self._database.save(doc) +        self._real_replica_uid = replica_uid + +    def _get_replica_uid(self): +        """ +        Get the replica uid. + +        :return: The replica uid. +        :rtype: str +        """ +        if self._real_replica_uid is not None: +            return self._real_replica_uid +        try: +            # grab replica_uid from server +            doc = self._database['u1db_config'] +            self._real_replica_uid = doc['replica_uid'] +            return self._real_replica_uid +        except ResourceNotFound: +            # create a unique replica_uid +            self._real_replica_uid = uuid.uuid4().hex +            self._set_replica_uid(self._real_replica_uid) +            return self._real_replica_uid + +    _replica_uid = property(_get_replica_uid, _set_replica_uid) + +    def _get_generation(self): +        """ +        Return the current generation. + +        :return: The current generation. +        :rtype: int + +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason. +        """ +        # query a couch list function +        ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] +        res = self._database.resource(*ddoc_path) +        try: +            response = res.get_json() +            return response[2]['generation'] +        except ResourceNotFound as e: +            raise_missing_design_doc_error(e, ddoc_path) +        except ServerError as e: +            raise_server_error(e, ddoc_path) + +    def _get_generation_info(self): +        """ +        Return the current generation. + +        :return: A tuple containing the current generation and transaction id. +        :rtype: (int, str) + +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason. +        """ +        # query a couch list function +        ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] +        res = self._database.resource(*ddoc_path) +        try: +            response = res.get_json() +            return (response[2]['generation'], response[2]['transaction_id']) +        except ResourceNotFound as e: +            raise_missing_design_doc_error(e, ddoc_path) +        except ServerError as e: +            raise_server_error(e, ddoc_path) + +    def _get_trans_id_for_gen(self, generation): +        """ +        Get the transaction id corresponding to a particular generation. + +        :param generation: The generation for which to get the transaction id. +        :type generation: int + +        :return: The transaction id for C{generation}. +        :rtype: str + +        :raise InvalidGeneration: Raised when the generation does not exist. +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason. +        """ +        if generation == 0: +            return '' +        # query a couch list function +        ddoc_path = [ +            '_design', 'transactions', '_list', 'trans_id_for_gen', 'log' +        ] +        res = self._database.resource(*ddoc_path) +        try: +            response = res.get_json(gen=generation) +            if response[2] == {}: +                raise InvalidGeneration +            return response[2]['transaction_id'] +        except ResourceNotFound as e: +            raise_missing_design_doc_error(e, ddoc_path) +        except ServerError as e: +            raise_server_error(e, ddoc_path) + +    def _get_transaction_log(self): +        """ +        This is only for the test suite, it is not part of the api. + +        :return: The complete transaction log. +        :rtype: [(str, str)] + +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason. +        """ +        # query a couch view +        ddoc_path = ['_design', 'transactions', '_view', 'log'] +        res = self._database.resource(*ddoc_path) +        try: +            response = res.get_json() +            return map( +                lambda row: (row['id'], row['value']), +                response[2]['rows']) +        except ResourceNotFound as e: +            raise_missing_design_doc_error(e, ddoc_path)      def _get_doc(self, doc_id, check_for_conflicts=False):          """ -        Get just the document content, without fancy handling. +        Extract the document from storage. + +        This can return None if the document doesn't exist. -        @param doc_id: The unique document identifier -        @type doc_id: str -        @param include_deleted: If set to True, deleted documents will be +        :param doc_id: The unique document identifier +        :type doc_id: str +        :param check_for_conflicts: If set to False, then the conflict check +                                    will be skipped. +        :type check_for_conflicts: bool + +        :return: The document. +        :rtype: CouchDocument +        """ +        # get document with all attachments (u1db content and eventual +        # conflicts) +        try: +            result = \ +                self._database.resource(doc_id).get_json( +                    attachments=True)[2] +        except ResourceNotFound: +            return None +        # restrict to u1db documents +        if 'u1db_rev' not in result: +            return None +        doc = self._factory(doc_id, result['u1db_rev']) +        # set contents or make tombstone +        if '_attachments' not in result \ +                or 'u1db_content' not in result['_attachments']: +            doc.make_tombstone() +        else: +            doc.content = json.loads( +                binascii.a2b_base64( +                    result['_attachments']['u1db_content']['data'])) +        # determine if there are conflicts +        if check_for_conflicts \ +                and '_attachments' in result \ +                and 'u1db_conflicts' in result['_attachments']: +            doc.has_conflicts = True +            doc.set_conflicts( +                self._build_conflicts( +                    doc.doc_id, +                    json.loads(binascii.a2b_base64( +                        result['_attachments']['u1db_conflicts']['data'])))) +        # store couch revision +        doc.couch_rev = result['_rev'] +        # store transactions +        doc.transactions = result['u1db_transactions'] +        return doc + +    def get_doc(self, doc_id, include_deleted=False): +        """ +        Get the JSON string for the given document. + +        :param doc_id: The unique document identifier +        :type doc_id: str +        :param include_deleted: If set to True, deleted documents will be              returned with empty content. Otherwise asking for a deleted              document will return None. -        @type include_deleted: bool +        :type include_deleted: bool -        @return: a Document object. -        @type: u1db.Document +        :return: A document object. +        :rtype: CouchDocument.          """ -        cdoc = self._database.get(doc_id) -        if cdoc is None: +        doc = self._get_doc(doc_id, check_for_conflicts=True) +        if doc is None: +            return None +        if doc.is_tombstone() and not include_deleted:              return None -        has_conflicts = False -        if check_for_conflicts: -            has_conflicts = self._has_conflicts(doc_id) -        doc = self._factory( -            doc_id=doc_id, -            rev=cdoc[self.COUCH_U1DB_REV_KEY], -            has_conflicts=has_conflicts) -        contents = self._database.get_attachment( -            cdoc, -            self.COUCH_U1DB_ATTACHMENT_KEY) -        if contents: -            doc.content = json.loads(contents.read()) -        else: -            doc.make_tombstone()          return doc      def get_all_docs(self, include_deleted=False):          """          Get the JSON content for all documents in the database. -        @param include_deleted: If set to True, deleted documents will be -            returned with empty content. Otherwise deleted documents will not -            be included in the results. -        @type include_deleted: bool +        :param include_deleted: If set to True, deleted documents will be +                                returned with empty content. Otherwise deleted +                                documents will not be included in the results. +        :type include_deleted: bool -        @return: (generation, [Document]) +        :return: (generation, [CouchDocument])              The current generation of the database, followed by a list of all              the documents in the database. -        @rtype: tuple +        :rtype: (int, [CouchDocument])          """ +          generation = self._get_generation()          results = [] -        for doc_id in self._database: -            if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): -                continue -            doc = self._get_doc(doc_id, check_for_conflicts=True) -            if doc.content is None and not include_deleted: -                continue -            results.append(doc) +        for row in self._database.view('_all_docs'): +            doc = self.get_doc(row.id, include_deleted=include_deleted) +            if doc is not None: +                results.append(doc)          return (generation, results) -    def _put_doc(self, doc): +    def _put_doc(self, old_doc, doc): +        """ +        Put the document in the Couch backend database. + +        Note that C{old_doc} must have been fetched with the parameter +        C{check_for_conflicts} equal to True, so we can properly update the +        new document using the conflict information from the old one. + +        :param old_doc: The old document version. +        :type old_doc: CouchDocument +        :param doc: The document to be put. +        :type doc: CouchDocument + +        :raise RevisionConflict: Raised when trying to update a document but +                                 couch revisions mismatch. +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason. +        """ +        attachments = {}  # we save content and conflicts as attachments +        parts = []  # and we put it using couch's multipart PUT +        # save content as attachment +        if doc.is_tombstone() is False: +            content = doc.get_json() +            attachments['u1db_content'] = { +                'follows': True, +                'content_type': 'application/octet-stream', +                'length': len(content), +            } +            parts.append(content) +        # save conflicts as attachment +        if doc.has_conflicts is True: +            conflicts = json.dumps( +                map(lambda cdoc: (cdoc.rev, cdoc.content), +                    doc.get_conflicts())) +            attachments['u1db_conflicts'] = { +                'follows': True, +                'content_type': 'application/octet-stream', +                'length': len(conflicts), +            } +            parts.append(conflicts) +        # store old transactions, if any +        transactions = old_doc.transactions[:] if old_doc is not None else [] +        # create a new transaction id and timestamp it so the transaction log +        # is consistent when querying the database. +        transactions.append( +            # here we store milliseconds to keep consistent with javascript +            # Date.prototype.getTime() which was used before inside a couchdb +            # update handler. +            (int(time.time() * 1000), +            self._allocate_transaction_id())) +        # build the couch document +        couch_doc = { +            '_id': doc.doc_id, +            'u1db_rev': doc.rev, +            'u1db_transactions': transactions, +            '_attachments': attachments, +        } +        # if we are updating a doc we have to add the couch doc revision +        if old_doc is not None: +            couch_doc['_rev'] = old_doc.couch_rev +        # prepare the multipart PUT +        buf = StringIO() +        envelope = MultipartWriter(buf) +        envelope.add('application/json', json.dumps(couch_doc)) +        for part in parts: +            envelope.add('application/octet-stream', part) +        envelope.close() +        # try to save and fail if there's a revision conflict +        try: +            self._database.resource.put_json( +                doc.doc_id, body=buf.getvalue(), headers=envelope.headers) +            self._renew_couch_session() +        except ResourceConflict: +            raise RevisionConflict() + +    def put_doc(self, doc):          """          Update a document. -        This is called everytime we just want to do a raw put on the db (i.e. -        without index updates, document constraint checks, and conflict -        checks). - -        @param doc: The document to update. -        @type doc: u1db.Document - -        @return: The new revision identifier for the document. -        @rtype: str -        """ -        # prepare couch's Document -        cdoc = CouchDocument() -        cdoc[self.COUCH_ID_KEY] = doc.doc_id -        # we have to guarantee that couch's _rev is consistent -        old_cdoc = self._database.get(doc.doc_id) -        if old_cdoc is not None: -            cdoc[self.COUCH_REV_KEY] = old_cdoc[self.COUCH_REV_KEY] -        # store u1db's rev -        cdoc[self.COUCH_U1DB_REV_KEY] = doc.rev -        # save doc in db -        self._database.save(cdoc) -        # store u1db's content as json string -        if not doc.is_tombstone(): -            self._database.put_attachment( -                cdoc, doc.get_json(), -                filename=self.COUCH_U1DB_ATTACHMENT_KEY) -        else: -            self._database.delete_attachment( -                cdoc, -                self.COUCH_U1DB_ATTACHMENT_KEY) +        If the document currently has conflicts, put will fail. +        If the database specifies a maximum document size and the document +        exceeds it, put will fail and raise a DocumentTooBig exception. -    def get_sync_target(self): -        """ -        Return a SyncTarget object, for another u1db to synchronize with. +        :param doc: A Document with new content. +        :return: new_doc_rev - The new revision identifier for the document. +            The Document object will also be updated. -        @return: The sync target. -        @rtype: CouchSyncTarget +        :raise InvalidDocId: Raised if the document's id is invalid. +        :raise DocumentTooBig: Raised if the document size is too big. +        :raise ConflictedDoc: Raised if the document has conflicts.          """ -        return CouchSyncTarget(self) - -    def create_index(self, index_name, *index_expressions): +        if doc.doc_id is None: +            raise InvalidDocId() +        self._check_doc_id(doc.doc_id) +        self._check_doc_size(doc) +        old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) +        if old_doc and old_doc.has_conflicts: +            raise ConflictedDoc() +        if old_doc and doc.rev is None and old_doc.is_tombstone(): +            new_rev = self._allocate_doc_rev(old_doc.rev) +        else: +            if old_doc is not None: +                    if old_doc.rev != doc.rev: +                        raise RevisionConflict() +            else: +                if doc.rev is not None: +                    raise RevisionConflict() +            new_rev = self._allocate_doc_rev(doc.rev) +        doc.rev = new_rev +        self._put_doc(old_doc, doc) +        return new_rev + +    def whats_changed(self, old_generation=0):          """ -        Create a named index, which can then be queried for future lookups. - -        @param index_name: A unique name which can be used as a key prefix. -        @param index_expressions: Index expressions defining the index -            information. +        Return a list of documents that have changed since old_generation. + +        :param old_generation: The generation of the database in the old +                               state. +        :type old_generation: int + +        :return: (generation, trans_id, [(doc_id, generation, trans_id),...]) +                 The current generation of the database, its associated +                 transaction id, and a list of of changed documents since +                 old_generation, represented by tuples with for each document +                 its doc_id and the generation and transaction id corresponding +                 to the last intervening change and sorted by generation (old +                 changes first) +        :rtype: (int, str, [(str, int, str)]) + +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason.          """ -        if index_name in self._indexes: -            if self._indexes[index_name]._definition == list( -                    index_expressions): -                return -            raise errors.IndexNameTakenError -        index = InMemoryIndex(index_name, list(index_expressions)) -        for doc_id in self._database: -            if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): -                continue  # skip special files -            doc = self._get_doc(doc_id) -            if doc.content is not None: -                index.add_json(doc_id, doc.get_json()) -        self._indexes[index_name] = index - -    def close(self): +        # query a couch list function +        ddoc_path = [ +            '_design', 'transactions', '_list', 'whats_changed', 'log' +        ] +        res = self._database.resource(*ddoc_path) +        try: +            response = res.get_json(old_gen=old_generation) +            results = map( +                lambda row: +                    (row['generation'], row['doc_id'], row['transaction_id']), +                response[2]['transactions']) +            results.reverse() +            cur_gen = old_generation +            seen = set() +            changes = [] +            newest_trans_id = '' +            for generation, doc_id, trans_id in results: +                if doc_id not in seen: +                    changes.append((doc_id, generation, trans_id)) +                    seen.add(doc_id) +            if changes: +                cur_gen = changes[0][1]  # max generation +                newest_trans_id = changes[0][2] +                changes.reverse() +            else: +                cur_gen, newest_trans_id = self._get_generation_info() + +            return cur_gen, newest_trans_id, changes +        except ResourceNotFound as e: +            raise_missing_design_doc_error(e, ddoc_path) +        except ServerError as e: +            raise_server_error(e, ddoc_path) + +    def delete_doc(self, doc):          """ -        Release any resources associated with this database. +        Mark a document as deleted. + +        Will abort if the current revision doesn't match doc.rev. +        This will also set doc.content to None. + +        :param doc: The document to mark as deleted. +        :type doc: CouchDocument. -        @return: True if db was succesfully closed. -        @rtype: bool +        :raise DocumentDoesNotExist: Raised if the document does not +                                            exist. +        :raise RevisionConflict: Raised if the revisions do not match. +        :raise DocumentAlreadyDeleted: Raised if the document is +                                              already deleted. +        :raise ConflictedDoc: Raised if the doc has conflicts.          """ -        # TODO: fix this method so the connection is properly closed and -        # test_close (+tearDown, which deletes the db) works without problems. -        self._url = None -        self._full_commit = None -        self._session = None -        #self._server = None -        self._database = None -        return True +        old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) +        if old_doc is None: +            raise DocumentDoesNotExist +        if old_doc.rev != doc.rev: +            raise RevisionConflict() +        if old_doc.is_tombstone(): +            raise DocumentAlreadyDeleted +        if old_doc.has_conflicts: +            raise ConflictedDoc() +        new_rev = self._allocate_doc_rev(doc.rev) +        doc.rev = new_rev +        doc.make_tombstone() +        self._put_doc(old_doc, doc) +        return new_rev + +    def _build_conflicts(self, doc_id, attached_conflicts): +        """ +        Build the conflicted documents list from the conflicts attachment +        fetched from a couch document. -    def sync(self, url, creds=None, autocreate=True): +        :param attached_conflicts: The document's conflicts as fetched from a +                                   couch document attachment. +        :type attached_conflicts: dict +        """ +        conflicts = [] +        for doc_rev, content in attached_conflicts: +            doc = self._factory(doc_id, doc_rev) +            if content is None: +                doc.make_tombstone() +            else: +                doc.content = content +            conflicts.append(doc) +        return conflicts + +    def _get_conflicts(self, doc_id, couch_rev=None):          """ -        Synchronize documents with remote replica exposed at url. +        Get the conflicted versions of a document. -        @param url: The url of the target replica to sync with. -        @type url: str -        @param creds: optional dictionary giving credentials. -            to authorize the operation with the server. -        @type creds: dict -        @param autocreate: Ask the target to create the db if non-existent. -        @type autocreate: bool +        If the C{couch_rev} parameter is not None, conflicts for a specific +        document's couch revision are returned. -        @return: The local generation before the synchronisation was performed. -        @rtype: int -        """ -        return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync( -            autocreate=autocreate) +        :param couch_rev: The couch document revision. +        :type couch_rev: str -    #------------------------------------------------------------------------- -    # methods from ObjectStoreDatabase -    #------------------------------------------------------------------------- +        :return: A list of conflicted versions of the document. +        :rtype: list +        """ +        # request conflicts attachment from server +        params = {} +        if couch_rev is not None: +            params['rev'] = couch_rev  # restric document's couch revision +        resource = self._database.resource(doc_id, 'u1db_conflicts') +        try: +            response = resource.get_json(**params) +            return self._build_conflicts( +                doc_id, json.loads(response[2].read())) +        except ResourceNotFound: +            return [] -    def _init_u1db_data(self): +    def get_doc_conflicts(self, doc_id):          """ -        Initialize u1db configuration data on backend storage. +        Get the list of conflicts for the given document. -        A U1DB database needs to keep track of all database transactions, -        document conflicts, the generation of other replicas it has seen, -        indexes created by users and so on. +        The order of the conflicts is such that the first entry is the value +        that would be returned by "get_doc". -        In this implementation, all this information is stored in special -        documents stored in the underlying with doc_id prefix equal to -        U1DB_DATA_DOC_ID_PREFIX. Those documents ids are reserved: put_doc(), -        get_doc() and delete_doc() will not allow documents with a doc_id with -        that prefix to be accessed or modified. +        :return: A list of the document entries that are conflicted. +        :rtype: [CouchDocument]          """ -        for key in self.U1DB_DATA_KEYS: -            doc_id = '%s%s' % (self.U1DB_DATA_DOC_ID_PREFIX, key) -            doc = self._get_doc(doc_id) -            if doc is None: -                doc = self._factory(doc_id) -                doc.content = {'content': getattr(self, key)} -                self._put_doc(doc) - -    #------------------------------------------------------------------------- -    # Couch specific methods -    #------------------------------------------------------------------------- +        conflict_docs = self._get_conflicts(doc_id) +        if len(conflict_docs) == 0: +            return [] +        this_doc = self._get_doc(doc_id, check_for_conflicts=True) +        return [this_doc] + conflict_docs -    INDEX_NAME_KEY = 'name' -    INDEX_DEFINITION_KEY = 'definition' -    INDEX_VALUES_KEY = 'values' +    def _get_replica_gen_and_trans_id(self, other_replica_uid): +        """ +        Return the last known generation and transaction id for the other db +        replica. + +        When you do a synchronization with another replica, the Database keeps +        track of what generation the other database replica was at, and what +        the associated transaction id was.  This is used to determine what data +        needs to be sent, and if two databases are claiming to be the same +        replica. + +        :param other_replica_uid: The identifier for the other replica. +        :type other_replica_uid: str + +        :return: A tuple containing the generation and transaction id we +                 encountered during synchronization. If we've never +                 synchronized with the replica, this is (0, ''). +        :rtype: (int, str) +        """ +        # query a couch view +        result = self._database.view('syncs/log') +        if len(result[other_replica_uid].rows) == 0: +            return (0, '') +        return ( +            result[other_replica_uid].rows[0]['value']['known_generation'], +            result[other_replica_uid].rows[0]['value']['known_transaction_id'] +        ) + +    def _set_replica_gen_and_trans_id(self, other_replica_uid, +                                      other_generation, other_transaction_id): +        """ +        Set the last-known generation and transaction id for the other +        database replica. + +        We have just performed some synchronization, and we want to track what +        generation the other replica was at. See also +        _get_replica_gen_and_trans_id. + +        :param other_replica_uid: The U1DB identifier for the other replica. +        :type other_replica_uid: str +        :param other_generation: The generation number for the other replica. +        :type other_generation: int +        :param other_transaction_id: The transaction id associated with the +            generation. +        :type other_transaction_id: str +        """ +        self._do_set_replica_gen_and_trans_id( +            other_replica_uid, other_generation, other_transaction_id) -    def delete_database(self): +    def _do_set_replica_gen_and_trans_id( +            self, other_replica_uid, other_generation, other_transaction_id):          """ -        Delete a U1DB CouchDB database. +        Set the last-known generation and transaction id for the other +        database replica. + +        We have just performed some synchronization, and we want to track what +        generation the other replica was at. See also +        _get_replica_gen_and_trans_id. + +        :param other_replica_uid: The U1DB identifier for the other replica. +        :type other_replica_uid: str +        :param other_generation: The generation number for the other replica. +        :type other_generation: int +        :param other_transaction_id: The transaction id associated with the +                                     generation. +        :type other_transaction_id: str + +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason.          """ -        del(self._server[self._dbname]) +        # query a couch update function +        ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log'] +        res = self._database.resource(*ddoc_path) +        try: +            with CouchDatabase.update_handler_lock[self._get_replica_uid()]: +                res.put_json( +                    body={ +                        'other_replica_uid': other_replica_uid, +                        'other_generation': other_generation, +                        'other_transaction_id': other_transaction_id, +                    }, +                    headers={'content-type': 'application/json'}) +        except ResourceNotFound as e: +            raise_missing_design_doc_error(e, ddoc_path) + +    def _add_conflict(self, doc, my_doc_rev, my_content): +        """ +        Add a conflict to the document. + +        Note that this method does not actually update the backend; rather, it +        updates the CouchDocument object which will provide the conflict data +        when the atomic document update is made. + +        :param doc: The document to have conflicts added to. +        :type doc: CouchDocument +        :param my_doc_rev: The revision of the conflicted document. +        :type my_doc_rev: str +        :param my_content: The content of the conflicted document as a JSON +                           serialized string. +        :type my_content: str +        """ +        doc._ensure_fetch_conflicts(self._get_conflicts) +        doc.add_conflict( +            self._factory(doc_id=doc.doc_id, rev=my_doc_rev, +                          json=my_content)) -    def _dump_indexes_as_json(self): +    def _delete_conflicts(self, doc, conflict_revs):          """ -        Dump index definitions as JSON. +        Delete the conflicted revisions from the list of conflicts of C{doc}. + +        Note that this method does not actually update the backend; rather, it +        updates the CouchDocument object which will provide the conflict data +        when the atomic document update is made. + +        :param doc: The document to have conflicts deleted. +        :type doc: CouchDocument +        :param conflict_revs: A list of the revisions to be deleted. +        :param conflict_revs: [str]          """ -        indexes = {} -        for name, idx in self._indexes.iteritems(): -            indexes[name] = {} -            for attr in [self.INDEX_NAME_KEY, self.INDEX_DEFINITION_KEY, -                         self.INDEX_VALUES_KEY]: -                indexes[name][attr] = getattr(idx, '_' + attr) -        return indexes +        doc._ensure_fetch_conflicts(self._get_conflicts) +        doc.delete_conflicts(conflict_revs) -    def _load_indexes_from_json(self, indexes): +    def _prune_conflicts(self, doc, doc_vcr):          """ -        Load index definitions from stored JSON. +        Prune conflicts that are older then the current document's revision, or +        whose content match to the current document's content. + +        :param doc: The document to have conflicts pruned. +        :type doc: CouchDocument +        :param doc_vcr: A vector clock representing the current document's +                        revision. +        :type doc_vcr: u1db.vectorclock.VectorClock +        """ +        if doc.has_conflicts is True: +            autoresolved = False +            c_revs_to_prune = [] +            for c_doc in doc.get_conflicts(): +                c_vcr = vectorclock.VectorClockRev(c_doc.rev) +                if doc_vcr.is_newer(c_vcr): +                    c_revs_to_prune.append(c_doc.rev) +                elif doc.same_content_as(c_doc): +                    c_revs_to_prune.append(c_doc.rev) +                    doc_vcr.maximize(c_vcr) +                    autoresolved = True +            if autoresolved: +                doc_vcr.increment(self._replica_uid) +                doc.rev = doc_vcr.as_str() +            self._delete_conflicts(doc, c_revs_to_prune) + +    def _force_doc_sync_conflict(self, doc): +        """ +        Add a conflict and force a document put. -        @param indexes: A JSON representation of indexes as -            [('index-name', ['field', 'field2', ...]), ...]. -        @type indexes: str +        :param doc: The document to be put. +        :type doc: CouchDocument          """ -        self._indexes = {} -        for name, idx_dict in indexes.iteritems(): -            idx = InMemoryIndex(name, idx_dict[self.INDEX_DEFINITION_KEY]) -            idx._values = idx_dict[self.INDEX_VALUES_KEY] -            self._indexes[name] = idx +        my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) +        self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) +        self._add_conflict(doc, my_doc.rev, my_doc.get_json()) +        doc.has_conflicts = True +        self._put_doc(my_doc, doc) -    def _load_transaction_log_from_json(self, transaction_log): +    def resolve_doc(self, doc, conflicted_doc_revs):          """ -        Load transaction log from stored JSON. +        Mark a document as no longer conflicted. + +        We take the list of revisions that the client knows about that it is +        superseding. This may be a different list from the actual current +        conflicts, in which case only those are removed as conflicted.  This +        may fail if the conflict list is significantly different from the +        supplied information. (sync could have happened in the background from +        the time you GET_DOC_CONFLICTS until the point where you RESOLVE) + +        :param doc: A Document with the new content to be inserted. +        :type doc: CouchDocument +        :param conflicted_doc_revs: A list of revisions that the new content +                                    supersedes. +        :type conflicted_doc_revs: [str] + +        :raise MissingDesignDocError: Raised when tried to access a missing +                                      design document. +        :raise MissingDesignDocListFunctionError: Raised when trying to access +                                                  a missing list function on a +                                                  design document. +        :raise MissingDesignDocNamedViewError: Raised when trying to access a +                                               missing named view on a design +                                               document. +        :raise MissingDesignDocDeletedError: Raised when trying to access a +                                             deleted design document. +        :raise MissingDesignDocUnknownError: Raised when failed to access a +                                             design document for an yet +                                             unknown reason. +        """ +        cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) +        new_rev = self._ensure_maximal_rev(cur_doc.rev, +                                           conflicted_doc_revs) +        superseded_revs = set(conflicted_doc_revs) +        doc.rev = new_rev +        # this backend stores conflicts as properties of the documents, so we +        # have to copy these conflicts over to the document being updated. +        if cur_doc.rev in superseded_revs: +            # the newer doc version will supersede the one in the database, so +            # we copy conflicts before updating the backend. +            doc.set_conflicts(cur_doc.get_conflicts())  # copy conflicts over. +            self._delete_conflicts(doc, superseded_revs) +            self._put_doc(cur_doc, doc) +        else: +            # the newer doc version does not supersede the one in the +            # database, so we will add a conflict to the database and copy +            # those over to the document the user has in her hands. +            self._add_conflict(cur_doc, new_rev, doc.get_json()) +            self._delete_conflicts(cur_doc, superseded_revs) +            self._put_doc(cur_doc, cur_doc)  # just update conflicts +            # backend has been updated with current conflicts, now copy them +            # to the current document. +            doc.set_conflicts(cur_doc.get_conflicts()) + +    def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, +                          replica_trans_id=''): +        """ +        Insert/update document into the database with a given revision. + +        This api is used during synchronization operations. + +        If a document would conflict and save_conflict is set to True, the +        content will be selected as the 'current' content for doc.doc_id, +        even though doc.rev doesn't supersede the currently stored revision. +        The currently stored document will be added to the list of conflict +        alternatives for the given doc_id. + +        This forces the new content to be 'current' so that we get convergence +        after synchronizing, even if people don't resolve conflicts. Users can +        then notice that their content is out of date, update it, and +        synchronize again. (The alternative is that users could synchronize and +        think the data has propagated, but their local copy looks fine, and the +        remote copy is never updated again.) + +        :param doc: A document object +        :type doc: CouchDocument +        :param save_conflict: If this document is a conflict, do you want to +                              save it as a conflict, or just ignore it. +        :type save_conflict: bool +        :param replica_uid: A unique replica identifier. +        :type replica_uid: str +        :param replica_gen: The generation of the replica corresponding to the +                            this document. The replica arguments are optional, +                            but are used during synchronization. +        :type replica_gen: int +        :param replica_trans_id: The transaction_id associated with the +                                 generation. +        :type replica_trans_id: str + +        :return: (state, at_gen) -  If we don't have doc_id already, or if +                 doc_rev supersedes the existing document revision, then the +                 content will be inserted, and state is 'inserted'.  If +                 doc_rev is less than or equal to the existing revision, then +                 the put is ignored and state is respecitvely 'superseded' or +                 'converged'.  If doc_rev is not strictly superseded or +                 supersedes, then state is 'conflicted'. The document will not +                 be inserted if save_conflict is False.  For 'inserted' or +                 'converged', at_gen is the insertion/current generation. +        :rtype: (str, int) +        """ +        cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) +        # at this point, `doc` has arrived from the other syncing party, and +        # we will decide what to do with it. +        # First, we prepare the arriving doc to update couch database. +        old_doc = doc +        doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) +        if cur_doc is not None: +            doc.couch_rev = cur_doc.couch_rev +        # fetch conflicts because we will eventually manipulate them +        doc._ensure_fetch_conflicts(self._get_conflicts) +        # from now on, it works just like u1db sqlite backend +        doc_vcr = vectorclock.VectorClockRev(doc.rev) +        if cur_doc is None: +            cur_vcr = vectorclock.VectorClockRev(None) +        else: +            cur_vcr = vectorclock.VectorClockRev(cur_doc.rev) +        self._validate_source(replica_uid, replica_gen, replica_trans_id) +        if doc_vcr.is_newer(cur_vcr): +            rev = doc.rev +            self._prune_conflicts(doc, doc_vcr) +            if doc.rev != rev: +                # conflicts have been autoresolved +                state = 'superseded' +            else: +                state = 'inserted' +            self._put_doc(cur_doc, doc) +        elif doc.rev == cur_doc.rev: +            # magical convergence +            state = 'converged' +        elif cur_vcr.is_newer(doc_vcr): +            # Don't add this to seen_ids, because we have something newer, +            # so we should send it back, and we should not generate a +            # conflict +            state = 'superseded' +        elif cur_doc.same_content_as(doc): +            # the documents have been edited to the same thing at both ends +            doc_vcr.maximize(cur_vcr) +            doc_vcr.increment(self._replica_uid) +            doc.rev = doc_vcr.as_str() +            self._put_doc(cur_doc, doc) +            state = 'superseded' +        else: +            state = 'conflicted' +            if save_conflict: +                self._force_doc_sync_conflict(doc) +        if replica_uid is not None and replica_gen is not None: +            self._set_replica_gen_and_trans_id( +                replica_uid, replica_gen, replica_trans_id) +        # update info +        old_doc.rev = doc.rev +        if doc.is_tombstone(): +            old_doc.is_tombstone() +        else: +            old_doc.content = doc.content +        old_doc.has_conflicts = doc.has_conflicts +        return state, self._get_generation() -        @param transaction_log: A JSON representation of transaction_log as -            [('generation', 'transaction_id'), ...]. -        @type transaction_log: list +    def get_docs(self, doc_ids, check_for_conflicts=True, +                 include_deleted=False):          """ -        self._transaction_log = [] -        for gen, trans_id in transaction_log: -            self._transaction_log.append((gen, trans_id)) +        Get the JSON content for many documents. + +        :param doc_ids: A list of document identifiers. +        :type doc_ids: list +        :param check_for_conflicts: If set to False, then the conflict check +                                    will be skipped, and 'None' will be +                                    returned instead of True/False. +        :type check_for_conflictsa: bool +        :param include_deleted: If set to True, deleted documents will be +                                returned with empty content. Otherwise deleted +                                documents will not be included in the results. +        :return: iterable giving the Document object for each document id +                 in matching doc_ids order. +        :rtype: iterable +        """ +        # Workaround for: +        # +        #   http://bugs.python.org/issue7980 +        #   https://leap.se/code/issues/5449 +        # +        # python-couchdb uses time.strptime, which is not thread safe. In +        # order to avoid the problem described on the issues above, we preload +        # strptime here by evaluating the conversion of an arbitrary date. +        # This will not be needed when/if we switch from python-couchdb to +        # paisley. +        time.strptime('Mar 4 1917', '%b %d %Y') +        # spawn threads to retrieve docs +        threads = [] +        for doc_id in doc_ids: +            self._sem_pool.acquire() +            t = self._GetDocThread(self, doc_id, check_for_conflicts, +                                   self._sem_pool.release) +            t.start() +            threads.append(t) +        # join threads and yield docs +        for t in threads: +            t.join() +            if t._doc.is_tombstone() and not include_deleted: +                continue +            yield t._doc -    def _load_other_generations_from_json(self, other_generations): +    def _renew_couch_session(self):          """ -        Load other generations from stored JSON. +        Create a new couch connection session. -        @param other_generations: A JSON representation of other_generations -            as {'replica_uid': ('generation', 'transaction_id'), ...}. -        @type other_generations: dict +        This is a workaround for #5448. Will not be needed once bigcouch is +        merged with couchdb.          """ -        self._other_generations = {} -        for replica_uid, [gen, trans_id] in other_generations.iteritems(): -            self._other_generations[replica_uid] = (gen, trans_id) +        self._database.resource.session = Session(timeout=COUCH_TIMEOUT) -class CouchSyncTarget(ObjectStoreSyncTarget): +class CouchSyncTarget(CommonSyncTarget):      """      Functionality for using a CouchDatabase as a synchronization target.      """ -    pass +    def get_sync_info(self, source_replica_uid): +        source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( +            source_replica_uid) +        my_gen, my_trans_id = self._db._get_generation_info() +        return ( +            self._db._replica_uid, my_gen, my_trans_id, source_gen, +            source_trans_id) -class NotEnoughCouchPermissions(Exception): -    """ -    Raised when failing to assert for enough permissions on underlying Couch -    Database. -    """ -    pass +    def record_sync_info(self, source_replica_uid, source_replica_generation, +                         source_replica_transaction_id): +        if self._trace_hook: +            self._trace_hook('record_sync_info') +        self._db._set_replica_gen_and_trans_id( +            source_replica_uid, source_replica_generation, +            source_replica_transaction_id)  class CouchServerState(ServerState): @@ -527,121 +1468,66 @@ class CouchServerState(ServerState):          """          Initialize the couch server state. -        @param couch_url: The URL for the couch database. -        @type couch_url: str -        @param shared_db_name: The name of the shared database. -        @type shared_db_name: str -        @param tokens_db_name: The name of the tokens database. -        @type tokens_db_name: str +        :param couch_url: The URL for the couch database. +        :type couch_url: str +        :param shared_db_name: The name of the shared database. +        :type shared_db_name: str +        :param tokens_db_name: The name of the tokens database. +        :type tokens_db_name: str          """          self._couch_url = couch_url          self._shared_db_name = shared_db_name          self._tokens_db_name = tokens_db_name -        try: -            self._check_couch_permissions() -        except NotEnoughCouchPermissions: -            logger.error("Not enough permissions on underlying couch " -                         "database (%s)." % self._couch_url) -        except (socket.error, socket.gaierror, socket.herror, -                socket.timeout), e: -            logger.error("Socket problem while trying to reach underlying " -                         "couch database: (%s, %s)." % -                         (self._couch_url, e)) - -    def _check_couch_permissions(self): -        """ -        Assert that Soledad Server has enough permissions on the underlying -        couch database. - -        Soledad Server has to be able to do the following in the couch server: - -            * Create, read and write from/to 'shared' db. -            * Create, read and write from/to 'user-<anything>' dbs. -            * Read from 'tokens' db. - -        This function tries to perform the actions above using the "low level" -        couch library to ensure that Soledad Server can do everything it needs -        on the underlying couch database. - -        @param couch_url: The URL of the couch database. -        @type couch_url: str - -        @raise NotEnoughCouchPermissions: Raised in case there are not enough -            permissions to read/write/create the needed couch databases. -        @rtype: bool -        """ - -        def _open_couch_db(dbname): -            server = Server(url=self._couch_url) -            try: -                server[dbname] -            except ResourceNotFound: -                server.create(dbname) -            return server[dbname] - -        def _create_delete_test_doc(db): -            doc_id, _ = db.save({'test': 'document'}) -            doc = db[doc_id] -            db.delete(doc) - -        try: -            # test read/write auth for shared db -            _create_delete_test_doc( -                _open_couch_db(self._shared_db_name)) -            # test read/write auth for user-<something> db -            _create_delete_test_doc( -                _open_couch_db('%stest-db' % USER_DB_PREFIX)) -            # test read auth for tokens db -            tokensdb = _open_couch_db(self._tokens_db_name) -            tokensdb.info() -        except Unauthorized: -            raise NotEnoughCouchPermissions(self._couch_url)      def open_database(self, dbname):          """          Open a couch database. -        @param dbname: The name of the database to open. -        @type dbname: str +        :param dbname: The name of the database to open. +        :type dbname: str -        @return: The CouchDatabase object. -        @rtype: CouchDatabase +        :return: The CouchDatabase object. +        :rtype: CouchDatabase          """ -        # TODO: open couch          return CouchDatabase.open_database(              self._couch_url + '/' + dbname, -            create=False) +            create=False, +            ensure_ddocs=False)      def ensure_database(self, dbname):          """          Ensure couch database exists. -        @param dbname: The name of the database to ensure. -        @type dbname: str +        Usually, this method is used by the server to ensure the existence of +        a database. In our setup, the Soledad user that accesses the underlying +        couch server should never have permission to create (or delete) +        databases. But, in case it ever does, by raising an exception here we +        have one more guarantee that no modified client will be able to +        enforce creation of a database when syncing. + +        :param dbname: The name of the database to ensure. +        :type dbname: str -        @return: The CouchDatabase object and the replica uid. -        @rtype: (CouchDatabase, str) +        :return: The CouchDatabase object and the replica uid. +        :rtype: (CouchDatabase, str)          """ -        db = CouchDatabase.open_database( -            self._couch_url + '/' + dbname, -            create=True) -        return db, db._replica_uid +        raise Unauthorized()      def delete_database(self, dbname):          """          Delete couch database. -        @param dbname: The name of the database to delete. -        @type dbname: str +        :param dbname: The name of the database to delete. +        :type dbname: str          """ -        CouchDatabase.delete_database(self._couch_url + '/' + dbname) +        raise Unauthorized()      def _set_couch_url(self, url):          """          Set the couchdb URL -        @param url: CouchDB URL -        @type url: str +        :param url: CouchDB URL +        :type url: str          """          self._couch_url = url @@ -649,7 +1535,7 @@ class CouchServerState(ServerState):          """          Return CouchDB URL -        @rtype: str +        :rtype: str          """          return self._couch_url diff --git a/common/src/leap/soledad/common/ddocs/README.txt b/common/src/leap/soledad/common/ddocs/README.txt new file mode 100644 index 00000000..5569d929 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/README.txt @@ -0,0 +1,34 @@ +This directory holds a folder structure containing javascript files that +represent the design documents needed by the CouchDB U1DB backend. These files +are compiled into the `../ddocs.py` file by setuptools when creating the +source distribution. + +The following table depicts the U1DB CouchDB backend method and the URI that +is queried to obtain/update data from/to the server. + +   +----------------------------------+------------------------------------------------------------------+ +   | u1db backend method              | URI                                                              | +   |----------------------------------+------------------------------------------------------------------| +   | _get_generation                  | _design/transactions/_list/generation/log                        | +   | _get_generation_info             | _design/transactions/_list/generation/log                        | +   | _get_trans_id_for_gen            | _design/transactions/_list/trans_id_for_gen/log                  | +   | _get_transaction_log             | _design/transactions/_view/log                                   | +   | _get_doc (*)                     | _design/docs/_view/get?key=<doc_id>                              | +   | _has_conflicts                   | _design/docs/_view/get?key=<doc_id>                              | +   | get_all_docs                     | _design/docs/_view/get                                           | +   | _put_doc                         | _design/docs/_update/put/<doc_id>                                | +   | _whats_changed                   | _design/transactions/_list/whats_changed/log?old_gen=<gen>       | +   | _get_conflicts (*)               | _design/docs/_view/conflicts?key=<doc_id>                        | +   | _get_replica_gen_and_trans_id    | _design/syncs/_view/log?other_replica_uid=<uid>                  | +   | _do_set_replica_gen_and_trans_id | _design/syncs/_update/put/u1db_sync_log                          | +   | _add_conflict                    | _design/docs/_update/add_conflict/<doc_id>                       | +   | _delete_conflicts                | _design/docs/_update/delete_conflicts/<doc_id>?doc_rev=<doc_rev> | +   | list_indexes                     | not implemented                                                  | +   | _get_index_definition            | not implemented                                                  | +   | delete_index                     | not implemented                                                  | +   | _get_indexed_fields              | not implemented                                                  | +   | _put_and_update_indexes          | not implemented                                                  | +   +----------------------------------+------------------------------------------------------------------+ + +(*) These methods also request CouchDB document attachments that store U1DB +    document contents. diff --git a/common/src/leap/soledad/common/ddocs/docs/views/get/map.js b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js new file mode 100644 index 00000000..ae08d9e9 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js @@ -0,0 +1,20 @@ +function(doc) { +    if (doc.u1db_rev) { +        var is_tombstone = true; +        var has_conflicts = false; +        if (doc._attachments) { +            if (doc._attachments.u1db_content) +                is_tombstone = false; +            if (doc._attachments.u1db_conflicts) +                has_conflicts = true; +        } +        emit(doc._id, +            { +                "couch_rev": doc._rev, +                "u1db_rev": doc.u1db_rev, +                "is_tombstone": is_tombstone, +                "has_conflicts": has_conflicts, +            } +        ); +    } +} diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js new file mode 100644 index 00000000..722f695a --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -0,0 +1,22 @@ +function(doc, req){ +    if (!doc) { +        doc = {} +        doc['_id'] = 'u1db_sync_log'; +        doc['syncs'] = []; +    } +    body = JSON.parse(req.body); +    // remove outdated info +    doc['syncs'] = doc['syncs'].filter( +        function (entry) { +            return entry[0] != body['other_replica_uid']; +        } +    ); +    // store u1db rev +    doc['syncs'].push([ +        body['other_replica_uid'], +        body['other_generation'], +        body['other_transaction_id'] +    ]); +    return [doc, 'ok']; +} + diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js new file mode 100644 index 00000000..a63c7cf4 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js @@ -0,0 +1,12 @@ +function(doc) { +    if (doc._id == 'u1db_sync_log') { +        if (doc.syncs) +            doc.syncs.forEach(function (entry) { +                emit(entry[0], +                    { +                        'known_generation': entry[1], +                        'known_transaction_id': entry[2] +                    }); +            }); +    } +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js new file mode 100644 index 00000000..dbdfff0d --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js @@ -0,0 +1,20 @@ +function(head, req) { +    var row; +    var rows=[]; +    // fetch all rows +    while(row = getRow()) { +        rows.push(row); +    } +    if (rows.length > 0) +        send(JSON.stringify({ +            "generation": rows.length, +            "doc_id": rows[rows.length-1]['id'], +            "transaction_id": rows[rows.length-1]['value'] +        })); +    else +        send(JSON.stringify({ +            "generation": 0, +            "doc_id": "", +            "transaction_id": "", +        })); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js new file mode 100644 index 00000000..2ec91794 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js @@ -0,0 +1,19 @@ +function(head, req) { +    var row; +    var rows=[]; +    var i = 1; +    var gen = 1; +    if (req.query.gen) +        gen = parseInt(req.query['gen']); +    // fetch all rows +    while(row = getRow()) +        rows.push(row); +    if (gen <= rows.length) +        send(JSON.stringify({ +            "generation": gen, +            "doc_id": rows[gen-1]['id'], +            "transaction_id": rows[gen-1]['value'], +        })); +    else +        send('{}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js new file mode 100644 index 00000000..b35cdf51 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js @@ -0,0 +1,22 @@ +function(head, req) { +    var row; +    var gen = 1; +    var old_gen = 0; +    if (req.query.old_gen) +        old_gen = parseInt(req.query['old_gen']); +    send('{"transactions":[\n'); +    // fetch all rows +    while(row = getRow()) { +        if (gen > old_gen) { +            if (gen > old_gen+1) +                send(',\n'); +            send(JSON.stringify({ +                "generation": gen, +                "doc_id": row["id"], +                "transaction_id": row["value"] +            })); +        } +        gen++; +    } +    send('\n]}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js new file mode 100644 index 00000000..94ef63ca --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js @@ -0,0 +1,7 @@ +function(doc) { +    if (doc.u1db_transactions) +        doc.u1db_transactions.forEach(function(t) { +            emit(t[0],  // use timestamp as key so the results are ordered +                 t[1]); // value is the transaction_id +        }); +} diff --git a/common/src/leap/soledad/common/document.py b/common/src/leap/soledad/common/document.py index cc24b53a..919ade12 100644 --- a/common/src/leap/soledad/common/document.py +++ b/common/src/leap/soledad/common/document.py @@ -29,6 +29,7 @@ from u1db import Document  #  class SoledadDocument(Document): +      """      Encryptable and syncable document. @@ -107,5 +108,3 @@ class SoledadDocument(Document):          _get_rev,          _set_rev,          doc="Wrapper to ensure `doc.rev` is always returned as bytes.") - - diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index 7c2d7296..3a7eadd2 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -25,11 +25,49 @@ from u1db import errors  from u1db.remote import http_errors +def register_exception(cls): +    """ +    A small decorator that registers exceptions in u1db maps. +    """ +    # update u1db "wire description to status" and "wire description to +    # exception" maps. +    http_errors.wire_description_to_status.update({ +        cls.wire_description: cls.status}) +    errors.wire_description_to_exc.update({ +        cls.wire_description: cls}) +    # do not modify the exception +    return cls + + +class SoledadError(errors.U1DBError): +    """ +    Base Soledad HTTP errors. +    """ +    pass + +  # -# LockResource: a lock based on a document in the shared database. +# Authorization errors  # -class InvalidTokenError(errors.U1DBError): +@register_exception +class InvalidAuthTokenError(errors.Unauthorized): +    """ +    Exception raised when failing to get authorization for some action because +    the provided token either does not exist in the tokens database, has a +    distinct structure from the expected one, or is associated with a user +    with a distinct uuid than the one provided by the client. +    """ + +    wire_descrition = "invalid auth token" +    status = 401 + +# +# LockResource errors +# + +@register_exception +class InvalidTokenError(SoledadError):      """      Exception raised when trying to unlock shared database with invalid token.      """ @@ -38,7 +76,8 @@ class InvalidTokenError(errors.U1DBError):      status = 401 -class NotLockedError(errors.U1DBError): +@register_exception +class NotLockedError(SoledadError):      """      Exception raised when trying to unlock shared database when it is not      locked. @@ -48,7 +87,8 @@ class NotLockedError(errors.U1DBError):      status = 404 -class AlreadyLockedError(errors.U1DBError): +@register_exception +class AlreadyLockedError(SoledadError):      """      Exception raised when trying to lock shared database but it is already      locked. @@ -57,13 +97,83 @@ class AlreadyLockedError(errors.U1DBError):      wire_description = "lock is locked"      status = 403 -# update u1db "wire description to status" and "wire description to exception" -# maps. -for e in [InvalidTokenError, NotLockedError, AlreadyLockedError]: -    http_errors.wire_description_to_status.update({ -        e.wire_description: e.status}) -    errors.wire_description_to_exc.update({ -        e.wire_description: e}) + +@register_exception +class LockTimedOutError(SoledadError): +    """ +    Exception raised when timing out while trying to lock the shared database. +    """ + +    wire_description = "lock timed out" +    status = 408 + + +@register_exception +class CouldNotObtainLockError(SoledadError): +    """ +    Exception raised when timing out while trying to lock the shared database. +    """ + +    wire_description = "error obtaining lock" +    status = 500 + + +# +# CouchDatabase errors +# + +@register_exception +class MissingDesignDocError(SoledadError): +    """ +    Raised when trying to access a missing couch design document. +    """ + +    wire_description = "missing design document" +    status = 500 + + +@register_exception +class MissingDesignDocNamedViewError(SoledadError): +    """ +    Raised when trying to access a missing named view on a couch design +    document. +    """ + +    wire_description = "missing design document named function" +    status = 500 + + +@register_exception +class MissingDesignDocListFunctionError(SoledadError): +    """ +    Raised when trying to access a missing list function on a couch design +    document. +    """ + +    wire_description = "missing design document list function" +    status = 500 + + +@register_exception +class MissingDesignDocDeletedError(SoledadError): +    """ +    Raised when trying to access a deleted couch design document. +    """ + +    wire_description = "design document was deleted" +    status = 500 + + +@register_exception +class DesignDocUnknownError(SoledadError): +    """ +    Raised when trying to access a couch design document and getting an +    unknown error. +    """ + +    wire_description = "missing design document unknown error" +    status = 500 +  # u1db error statuses also have to be updated  http_errors.ERROR_STATUSES = set( diff --git a/common/src/leap/soledad/common/objectstore.py b/common/src/leap/soledad/common/objectstore.py deleted file mode 100644 index 7aff3e32..00000000 --- a/common/src/leap/soledad/common/objectstore.py +++ /dev/null @@ -1,282 +0,0 @@ -# -*- coding: utf-8 -*- -# objectstore.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -Abstract U1DB backend to handle storage using object stores (like CouchDB, for -example). - -This backend uses special documents to store all U1DB data (replica uid, -indexes, transaction logs and info about other dbs). The id of these documents -are reserved and have prefix equal to ObjectStore.U1DB_DATA_DOC_ID_PREFIX. - -Right now, this is only used by CouchDatabase backend, but can also be -extended to implement OpenStack or Amazon S3 storage, for example. - -See U1DB documentation for more information on how to use databases. -""" - - -from base64 import b64encode, b64decode - - -import uuid -import simplejson as json - - -from u1db import errors -from u1db.backends.inmemory import ( -    InMemoryDatabase, -    InMemorySyncTarget, -) - - -class ObjectStoreDatabase(InMemoryDatabase): -    """ -    A backend for storing u1db data in an object store. -    """ - -    U1DB_DATA_DOC_ID_PREFIX = 'u1db/' - -    @classmethod -    def open_database(cls, url, create, document_factory=None): -        """ -        Open a U1DB database using an object store as backend. - -        @param url: the url of the database replica -        @type url: str -        @param create: should the replica be created if it does not exist? -        @type create: bool -        @param document_factory: A function that will be called with the same -            parameters as Document.__init__. -        @type document_factory: callable - -        @return: the database instance -        @rtype: CouchDatabase -        """ -        raise NotImplementedError(cls.open_database) - -    def __init__(self, replica_uid=None, document_factory=None): -        """ -        Initialize the object store database. - -        @param replica_uid: an optional unique replica identifier -        @type replica_uid: str -        @param document_factory: A function that will be called with the same -            parameters as Document.__init__. -        @type document_factory: callable -        """ -        InMemoryDatabase.__init__( -            self, -            replica_uid, -            document_factory=document_factory) -        if self._replica_uid is None: -            self._replica_uid = uuid.uuid4().hex -        self._init_u1db_data() - -    def _init_u1db_data(self): -        """ -        Initialize u1db configuration data on backend storage. - -        A U1DB database needs to keep track of all database transactions, -        document conflicts, the generation of other replicas it has seen, -        indexes created by users and so on. - -        In this implementation, all this information is stored in special -        documents stored in the couch db with id prefix equal to -        U1DB_DATA_DOC_ID_PREFIX.  Those documents ids are reserved: -        put_doc(), get_doc() and delete_doc() will not allow documents with -        a doc_id with that prefix to be accessed or modified. -        """ -        raise NotImplementedError(self._init_u1db_data) - -    #------------------------------------------------------------------------- -    # methods from Database -    #------------------------------------------------------------------------- - -    def put_doc(self, doc): -        """ -        Update a document. - -        If the document currently has conflicts, put will fail. -        If the database specifies a maximum document size and the document -        exceeds it, put will fail and raise a DocumentTooBig exception. - -        This method prevents from updating the document with doc_id equals to -        self.U1DB_DATA_DOC_ID, which contains U1DB data. - -        @param doc: A Document with new content. -        @type doc: Document - -        @return: new_doc_rev - The new revision identifier for the document. -            The Document object will also be updated. -        @rtype: str -        """ -        if doc.doc_id is not None and \ -                doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): -            raise errors.InvalidDocId() -        return InMemoryDatabase.put_doc(self, doc) - -    def _put_doc(self, doc): -        """ -        Update a document. - -        This is called everytime we just want to do a raw put on the db (i.e. -        without index updates, document constraint checks, and conflict -        checks). - -        @param doc: The document to update. -        @type doc: u1db.Document - -        @return: The new revision identifier for the document. -        @rtype: str -        """ -        raise NotImplementedError(self._put_doc) - -    def get_doc(self, doc_id, include_deleted=False): -        """ -        Get the JSON string for the given document. - -        This method prevents from getting the document with doc_id equals to -        self.U1DB_DATA_DOC_ID, which contains U1DB data. - -        @param doc_id: The unique document identifier -        @type doc_id: str -        @param include_deleted: If set to True, deleted documents will be -            returned with empty content. Otherwise asking for a deleted -            document will return None. -        @type include_deleted: bool - -        @return: a Document object. -        @rtype: Document -        """ -        if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): -            raise errors.InvalidDocId() -        return InMemoryDatabase.get_doc(self, doc_id, include_deleted) - -    def _get_doc(self, doc_id): -        """ -        Get just the document content, without fancy handling. - -        @param doc_id: The unique document identifier -        @type doc_id: str -        @param include_deleted: If set to True, deleted documents will be -            returned with empty content. Otherwise asking for a deleted -            document will return None. -        @type include_deleted: bool - -        @return: a Document object. -        @type: u1db.Document -        """ -        raise NotImplementedError(self._get_doc) - -    def get_all_docs(self, include_deleted=False): -        """ -        Get the JSON content for all documents in the database. - -        @param include_deleted: If set to True, deleted documents will be -            returned with empty content. Otherwise deleted documents will not -            be included in the results. -        @type include_deleted: bool - -        @return: (generation, [Document]) -            The current generation of the database, followed by a list of all -            the documents in the database. -        @rtype: tuple -        """ -        generation = self._get_generation() -        results = [] -        for doc_id in self._database: -            if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): -                continue -            doc = self._get_doc(doc_id, check_for_conflicts=True) -            if doc.content is None and not include_deleted: -                continue -            results.append(doc) -        return (generation, results) - -    def delete_doc(self, doc): -        """ -        Mark a document as deleted. - -        This method prevents from deleting the document with doc_id equals to -        self.U1DB_DATA_DOC_ID, which contains U1DB data. - -        @param doc: The document to mark as deleted. -        @type doc: u1db.Document - -        @return: The new revision id of the document. -        @type: str -        """ -        if doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): -            raise errors.InvalidDocId() -        old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) -        if old_doc is None: -            raise errors.DocumentDoesNotExist -        if old_doc.rev != doc.rev: -            raise errors.RevisionConflict() -        if old_doc.is_tombstone(): -            raise errors.DocumentAlreadyDeleted -        if old_doc.has_conflicts: -            raise errors.ConflictedDoc() -        new_rev = self._allocate_doc_rev(doc.rev) -        doc.rev = new_rev -        doc.make_tombstone() -        self._put_and_update_indexes(old_doc, doc) -        return new_rev - -    # index-related methods - -    def create_index(self, index_name, *index_expressions): -        """ -        Create a named index, which can then be queried for future lookups. - -        See U1DB documentation for more information. - -        @param index_name: A unique name which can be used as a key prefix. -        @param index_expressions: Index expressions defining the index -            information. -        """ -        raise NotImplementedError(self.create_index) - -    #------------------------------------------------------------------------- -    # implemented methods from CommonBackend -    #------------------------------------------------------------------------- - -    def _put_and_update_indexes(self, old_doc, doc): -        """ -        Update a document and all indexes related to it. - -        @param old_doc: The old version of the document. -        @type old_doc: u1db.Document -        @param doc: The new version of the document. -        @type doc: u1db.Document -        """ -        for index in self._indexes.itervalues(): -            if old_doc is not None and not old_doc.is_tombstone(): -                index.remove_json(old_doc.doc_id, old_doc.get_json()) -            if not doc.is_tombstone(): -                index.add_json(doc.doc_id, doc.get_json()) -        trans_id = self._allocate_transaction_id() -        self._put_doc(doc) -        self._transaction_log.append((doc.doc_id, trans_id)) - - -class ObjectStoreSyncTarget(InMemorySyncTarget): -    """ -    Functionality for using an ObjectStore as a synchronization target. -    """ diff --git a/common/src/leap/soledad/common/tests/couchdb.ini.template b/common/src/leap/soledad/common/tests/couchdb.ini.template index 7d0316f0..1fc2205b 100644 --- a/common/src/leap/soledad/common/tests/couchdb.ini.template +++ b/common/src/leap/soledad/common/tests/couchdb.ini.template @@ -6,7 +6,7 @@  database_dir = %(tempdir)s/lib  view_index_dir = %(tempdir)s/lib  max_document_size = 4294967296 ; 4 GB -os_process_timeout = 5000 ; 5 seconds. for view and external servers. +os_process_timeout = 120000 ; 120 seconds. for view and external servers.  max_dbs_open = 100  delayed_commits = true ; set this to false to ensure an fsync before 201 Created is returned  uri_file = %(tempdir)s/lib/couch.uri @@ -74,6 +74,8 @@ use_users_db = false  [query_servers]  ; javascript = %(tempdir)s/server/main.js +javascript = /usr/bin/couchjs /usr/share/couchdb/server/main.js +coffeescript = /usr/bin/couchjs /usr/share/couchdb/server/main-coffee.js  ; Changing reduce_limit to false will disable reduce_limit. @@ -219,4 +221,4 @@ min_file_size = 131072  ;[admins]  ;testuser = -hashed-f50a252c12615697c5ed24ec5cd56b05d66fe91e,b05471ba260132953930cf9f97f327f5 -; pass for above user is 'testpass'
\ No newline at end of file +; pass for above user is 'testpass' diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 42edf9fe..86bb4b93 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -24,14 +24,17 @@ import re  import copy  import shutil  from base64 import b64decode +from mock import Mock + +from couchdb.client import Server +from u1db import errors as u1db_errors  from leap.common.files import mkdir_p -from leap.soledad.common.document import SoledadDocument  from leap.soledad.common.tests import u1db_tests as tests  from leap.soledad.common.tests.u1db_tests import test_backends  from leap.soledad.common.tests.u1db_tests import test_sync -from leap.soledad.common import couch +from leap.soledad.common import couch, errors  import simplejson as json @@ -78,9 +81,10 @@ class CouchDBWrapper(object):          mkdir_p(os.path.join(self.tempdir, 'lib'))          mkdir_p(os.path.join(self.tempdir, 'log'))          args = ['couchdb', '-n', '-a', confPath] -        #null = open('/dev/null', 'w') +        null = open('/dev/null', 'w') +          self.process = subprocess.Popen( -            args, env=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, +            args, env=None, stdout=null.fileno(), stderr=null.fileno(),              close_fds=True)          # find port          logPath = os.path.join(self.tempdir, 'log', 'couch.log') @@ -123,21 +127,21 @@ class CouchDBTestCase(unittest.TestCase):      TestCase base class for tests against a real CouchDB server.      """ -    def setUp(self): +    @classmethod +    def setUpClass(cls):          """          Make sure we have a CouchDB instance for a test.          """ -        self.wrapper = CouchDBWrapper() -        self.wrapper.start() +        cls.wrapper = CouchDBWrapper() +        cls.wrapper.start()          #self.db = self.wrapper.db -        unittest.TestCase.setUp(self) -    def tearDown(self): +    @classmethod +    def tearDownClass(cls):          """          Stop CouchDB instance for test.          """ -        self.wrapper.stop() -        unittest.TestCase.tearDown(self) +        cls.wrapper.stop()  #----------------------------------------------------------------------------- @@ -148,7 +152,7 @@ class TestCouchBackendImpl(CouchDBTestCase):      def test__allocate_doc_id(self):          db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') +                                 'u1db_tests', ensure_ddocs=True)          doc_id1 = db._allocate_doc_id()          self.assertTrue(doc_id1.startswith('D-'))          self.assertEqual(34, len(doc_id1)) @@ -163,32 +167,51 @@ class TestCouchBackendImpl(CouchDBTestCase):  def make_couch_database_for_test(test, replica_uid):      port = str(test.wrapper.port)      return couch.CouchDatabase('http://localhost:' + port, replica_uid, -                               replica_uid=replica_uid or 'test') +                               replica_uid=replica_uid or 'test', +                               ensure_ddocs=True)  def copy_couch_database_for_test(test, db):      port = str(test.wrapper.port) -    new_db = couch.CouchDatabase('http://localhost:' + port, -                                 db._replica_uid + '_copy', +    couch_url = 'http://localhost:' + port +    new_dbname = db._replica_uid + '_copy' +    new_db = couch.CouchDatabase(couch_url, +                                 new_dbname,                                   replica_uid=db._replica_uid or 'test') -    gen, docs = db.get_all_docs(include_deleted=True) -    for doc in docs: -        new_db._put_doc(doc) -    new_db._transaction_log = copy.deepcopy(db._transaction_log) -    new_db._conflicts = copy.deepcopy(db._conflicts) -    new_db._other_generations = copy.deepcopy(db._other_generations) -    new_db._indexes = copy.deepcopy(db._indexes) -    # save u1db data on couch -    for key in new_db.U1DB_DATA_KEYS: -        doc_id = '%s%s' % (new_db.U1DB_DATA_DOC_ID_PREFIX, key) -        doc = new_db._get_doc(doc_id) -        doc.content = {'content': getattr(new_db, key)} -        new_db._put_doc(doc) +    # copy all docs +    old_couch_db = Server(couch_url)[db._replica_uid] +    new_couch_db = Server(couch_url)[new_dbname] +    for doc_id in old_couch_db: +        doc = old_couch_db.get(doc_id) +        # copy design docs +        if ('u1db_rev' not in doc): +            new_couch_db.save(doc) +        # copy u1db docs +        else: +            new_doc = { +                '_id': doc['_id'], +                'u1db_transactions': doc['u1db_transactions'], +                'u1db_rev': doc['u1db_rev'] +            } +            attachments = [] +            if ('u1db_conflicts' in doc): +                new_doc['u1db_conflicts'] = doc['u1db_conflicts'] +                for c_rev in doc['u1db_conflicts']: +                    attachments.append('u1db_conflict_%s' % c_rev) +            new_couch_db.save(new_doc) +            # save conflict data +            attachments.append('u1db_content') +            for att_name in attachments: +                att = old_couch_db.get_attachment(doc_id, att_name) +                if (att is not None): +                    new_couch_db.put_attachment(new_doc, att, +                                                filename=att_name)      return new_db  def make_document_for_test(test, doc_id, rev, content, has_conflicts=False): -    return SoledadDocument(doc_id, rev, content, has_conflicts=has_conflicts) +    return couch.CouchDocument( +        doc_id, rev, content, has_conflicts=has_conflicts)  COUCH_SCENARIOS = [ @@ -202,8 +225,22 @@ class CouchTests(test_backends.AllDatabaseTests, CouchDBTestCase):      scenarios = COUCH_SCENARIOS +    def setUp(self): +        test_backends.AllDatabaseTests.setUp(self) +        # save db info because of test_close +        self._server = self.db._server +        self._dbname = self.db._dbname +      def tearDown(self): -        self.db.delete_database() +        # if current test is `test_close` we have to use saved objects to +        # delete the database because the close() method will have removed the +        # references needed to do it using the CouchDatabase. +        if self.id() == \ +                'leap.soledad.common.tests.test_couch.CouchTests.' \ +                'test_close(couch)': +            del(self._server[self._dbname]) +        else: +            self.db.delete_database()          test_backends.AllDatabaseTests.tearDown(self) @@ -246,17 +283,16 @@ class CouchWithConflictsTests(          test_backends.LocalDatabaseWithConflictsTests.tearDown(self) -# Notice: the CouchDB backend is currently used for storing encrypted data in -# the server, so indexing makes no sense. Thus, we ignore index testing for -# now. - -class CouchIndexTests(test_backends.DatabaseIndexTests, CouchDBTestCase): - -    scenarios = COUCH_SCENARIOS +# Notice: the CouchDB backend does not have indexing capabilities, so we do +# not test indexing now. -    def tearDown(self): -        self.db.delete_database() -        test_backends.DatabaseIndexTests.tearDown(self) +#class CouchIndexTests(test_backends.DatabaseIndexTests, CouchDBTestCase): +# +#    scenarios = COUCH_SCENARIOS +# +#    def tearDown(self): +#        self.db.delete_database() +#        test_backends.DatabaseIndexTests.tearDown(self)  #----------------------------------------------------------------------------- @@ -311,6 +347,89 @@ class CouchDatabaseSyncTargetTests(test_sync.DatabaseSyncTargetTests,                   [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]}) +# The following tests need that the database have an index, so we fake one. +old_class = couch.CouchDatabase + +from u1db.backends.inmemory import InMemoryIndex + + +class IndexedCouchDatabase(couch.CouchDatabase): + +    def __init__(self, url, dbname, replica_uid=None, full_commit=True, +                     session=None, ensure_ddocs=True): +        old_class.__init__(self, url, dbname, replica_uid, full_commit, +                           session, ensure_ddocs=ensure_ddocs) +        self._indexes = {} + +    def _put_doc(self, old_doc, doc): +        for index in self._indexes.itervalues(): +            if old_doc is not None and not old_doc.is_tombstone(): +                index.remove_json(old_doc.doc_id, old_doc.get_json()) +            if not doc.is_tombstone(): +                index.add_json(doc.doc_id, doc.get_json()) +        old_class._put_doc(self, old_doc, doc) + +    def create_index(self, index_name, *index_expressions): +        if index_name in self._indexes: +            if self._indexes[index_name]._definition == list( +                    index_expressions): +                return +            raise u1db_errors.IndexNameTakenError +        index = InMemoryIndex(index_name, list(index_expressions)) +        _, all_docs = self.get_all_docs() +        for doc in all_docs: +            index.add_json(doc.doc_id, doc.get_json()) +        self._indexes[index_name] = index + +    def delete_index(self, index_name): +        del self._indexes[index_name] + +    def list_indexes(self): +        definitions = [] +        for idx in self._indexes.itervalues(): +            definitions.append((idx._name, idx._definition)) +        return definitions + +    def get_from_index(self, index_name, *key_values): +        try: +            index = self._indexes[index_name] +        except KeyError: +            raise u1db_errors.IndexDoesNotExist +        doc_ids = index.lookup(key_values) +        result = [] +        for doc_id in doc_ids: +            result.append(self._get_doc(doc_id, check_for_conflicts=True)) +        return result + +    def get_range_from_index(self, index_name, start_value=None, +                             end_value=None): +        """Return all documents with key values in the specified range.""" +        try: +            index = self._indexes[index_name] +        except KeyError: +            raise u1db_errors.IndexDoesNotExist +        if isinstance(start_value, basestring): +            start_value = (start_value,) +        if isinstance(end_value, basestring): +            end_value = (end_value,) +        doc_ids = index.lookup_range(start_value, end_value) +        result = [] +        for doc_id in doc_ids: +            result.append(self._get_doc(doc_id, check_for_conflicts=True)) +        return result + +    def get_index_keys(self, index_name): +        try: +            index = self._indexes[index_name] +        except KeyError: +            raise u1db_errors.IndexDoesNotExist +        keys = index.keys() +        # XXX inefficiency warning +        return list(set([tuple(key.split('\x01')) for key in keys])) + + +couch.CouchDatabase = IndexedCouchDatabase +  sync_scenarios = []  for name, scenario in COUCH_SCENARIOS:      scenario = dict(scenario) @@ -344,98 +463,184 @@ class CouchDatabaseSyncTests(test_sync.DatabaseSyncTests, CouchDBTestCase):          test_sync.DatabaseSyncTests.tearDown(self) -#----------------------------------------------------------------------------- -# The following tests test extra functionality introduced by our backends -#----------------------------------------------------------------------------- - -class CouchDatabaseStorageTests(CouchDBTestCase): - -    def _listify(self, l): -        if type(l) is dict: -            return { -                self._listify(a): self._listify(b) for a, b in l.iteritems()} -        if hasattr(l, '__iter__'): -            return [self._listify(i) for i in l] -        return l - -    def _fetch_u1db_data(self, db, key): -        doc = db._get_doc("%s%s" % (db.U1DB_DATA_DOC_ID_PREFIX, key)) -        return doc.content['content'] - -    def test_transaction_log_storage_after_put(self): -        db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') -        db.create_doc({'simple': 'doc'}) -        content = self._fetch_u1db_data(db, db.U1DB_TRANSACTION_LOG_KEY) -        self.assertEqual( -            self._listify(db._transaction_log), -            self._listify(content)) - -    def test_conflict_log_storage_after_put_if_newer(self): -        db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') -        doc = db.create_doc({'simple': 'doc'}) -        doc.set_json(nested_doc) -        doc.rev = db._replica_uid + ':2' -        db._force_doc_sync_conflict(doc) -        content = self._fetch_u1db_data(db, db.U1DB_CONFLICTS_KEY) -        self.assertEqual( -            self._listify(db._conflicts), -            self._listify(content)) - -    def test_other_gens_storage_after_set(self): -        db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') -        doc = db.create_doc({'simple': 'doc'}) -        db._set_replica_gen_and_trans_id('a', 'b', 'c') -        content = self._fetch_u1db_data(db, db.U1DB_OTHER_GENERATIONS_KEY) -        self.assertEqual( -            self._listify(db._other_generations), -            self._listify(content)) +class CouchDatabaseExceptionsTests(CouchDBTestCase): -    def test_index_storage_after_create(self): -        db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') -        doc = db.create_doc({'name': 'john'}) -        db.create_index('myindex', 'name') -        content = self._fetch_u1db_data(db, db.U1DB_INDEXES_KEY) -        myind = db._indexes['myindex'] -        index = { -            'myindex': { -                'definition': myind._definition, -                'name': myind._name, -                'values': myind._values, -            } -        } -        self.assertEqual( -            self._listify(index), -            self._listify(content)) +    def setUp(self): +        CouchDBTestCase.setUp(self) +        self.db = couch.CouchDatabase( +            'http://127.0.0.1:%d' % self.wrapper.port, 'test', +            ensure_ddocs=False)  # note that we don't enforce ddocs here -    def test_index_storage_after_delete(self): -        db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') -        doc = db.create_doc({'name': 'john'}) -        db.create_index('myindex', 'name') -        db.create_index('myindex2', 'name') -        db.delete_index('myindex') -        content = self._fetch_u1db_data(db, db.U1DB_INDEXES_KEY) -        myind = db._indexes['myindex2'] -        index = { -            'myindex2': { -                'definition': myind._definition, -                'name': myind._name, -                'values': myind._values, -            } -        } -        self.assertEqual( -            self._listify(index), -            self._listify(content)) +    def tearDown(self): +        self.db.delete_database() -    def test_replica_uid_storage_after_db_creation(self): -        db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') -        content = self._fetch_u1db_data(db, db.U1DB_REPLICA_UID_KEY) -        self.assertEqual(db._replica_uid, content) +    def test_missing_design_doc_raises(self): +        """ +        Test that all methods that access design documents will raise if the +        design docs are not present. +        """ +        # _get_generation() +        self.assertRaises( +            errors.MissingDesignDocError, +            self.db._get_generation) +        # _get_generation_info() +        self.assertRaises( +            errors.MissingDesignDocError, +            self.db._get_generation_info) +        # _get_trans_id_for_gen() +        self.assertRaises( +            errors.MissingDesignDocError, +            self.db._get_trans_id_for_gen, 1) +        # _get_transaction_log() +        self.assertRaises( +            errors.MissingDesignDocError, +            self.db._get_transaction_log) +        # whats_changed() +        self.assertRaises( +            errors.MissingDesignDocError, +            self.db.whats_changed) +        # _do_set_replica_gen_and_trans_id() +        self.assertRaises( +            errors.MissingDesignDocError, +            self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) + +    def test_missing_design_doc_functions_raises(self): +        """ +        Test that all methods that access design documents list functions +        will raise if the functions are not present. +        """ +        self.db = couch.CouchDatabase( +            'http://127.0.0.1:%d' % self.wrapper.port, 'test', +            ensure_ddocs=True) +        # erase views from _design/transactions +        transactions = self.db._database['_design/transactions'] +        transactions['lists'] = {} +        self.db._database.save(transactions) +        # _get_generation() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db._get_generation) +        # _get_generation_info() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db._get_generation_info) +        # _get_trans_id_for_gen() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db._get_trans_id_for_gen, 1) +        # whats_changed() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db.whats_changed) + +    def test_absent_design_doc_functions_raises(self): +        """ +        Test that all methods that access design documents list functions +        will raise if the functions are not present. +        """ +        self.db = couch.CouchDatabase( +            'http://127.0.0.1:%d' % self.wrapper.port, 'test', +            ensure_ddocs=True) +        # erase views from _design/transactions +        transactions = self.db._database['_design/transactions'] +        del transactions['lists'] +        self.db._database.save(transactions) +        # _get_generation() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db._get_generation) +        # _get_generation_info() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db._get_generation_info) +        # _get_trans_id_for_gen() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db._get_trans_id_for_gen, 1) +        # whats_changed() +        self.assertRaises( +            errors.MissingDesignDocListFunctionError, +            self.db.whats_changed) + +    def test_missing_design_doc_named_views_raises(self): +        """ +        Test that all methods that access design documents' named views  will +        raise if the views are not present. +        """ +        self.db = couch.CouchDatabase( +            'http://127.0.0.1:%d' % self.wrapper.port, 'test', +            ensure_ddocs=True) +        # erase views from _design/docs +        docs = self.db._database['_design/docs'] +        del docs['views'] +        self.db._database.save(docs) +        # erase views from _design/syncs +        syncs = self.db._database['_design/syncs'] +        del syncs['views'] +        self.db._database.save(syncs) +        # erase views from _design/transactions +        transactions = self.db._database['_design/transactions'] +        del transactions['views'] +        self.db._database.save(transactions) +        # _get_generation() +        self.assertRaises( +            errors.MissingDesignDocNamedViewError, +            self.db._get_generation) +        # _get_generation_info() +        self.assertRaises( +            errors.MissingDesignDocNamedViewError, +            self.db._get_generation_info) +        # _get_trans_id_for_gen() +        self.assertRaises( +            errors.MissingDesignDocNamedViewError, +            self.db._get_trans_id_for_gen, 1) +        # _get_transaction_log() +        self.assertRaises( +            errors.MissingDesignDocNamedViewError, +            self.db._get_transaction_log) +        # whats_changed() +        self.assertRaises( +            errors.MissingDesignDocNamedViewError, +            self.db.whats_changed) + +    def test_deleted_design_doc_raises(self): +        """ +        Test that all methods that access design documents will raise if the +        design docs are not present. +        """ +        self.db = couch.CouchDatabase( +            'http://127.0.0.1:%d' % self.wrapper.port, 'test', +            ensure_ddocs=True) +        # delete _design/docs +        del self.db._database['_design/docs'] +        # delete _design/syncs +        del self.db._database['_design/syncs'] +        # delete _design/transactions +        del self.db._database['_design/transactions'] +        # _get_generation() +        self.assertRaises( +            errors.MissingDesignDocDeletedError, +            self.db._get_generation) +        # _get_generation_info() +        self.assertRaises( +            errors.MissingDesignDocDeletedError, +            self.db._get_generation_info) +        # _get_trans_id_for_gen() +        self.assertRaises( +            errors.MissingDesignDocDeletedError, +            self.db._get_trans_id_for_gen, 1) +        # _get_transaction_log() +        self.assertRaises( +            errors.MissingDesignDocDeletedError, +            self.db._get_transaction_log) +        # whats_changed() +        self.assertRaises( +            errors.MissingDesignDocDeletedError, +            self.db.whats_changed) +        # _do_set_replica_gen_and_trans_id() +        self.assertRaises( +            errors.MissingDesignDocDeletedError, +            self.db._do_set_replica_gen_and_trans_id, 1, 2, 3)  load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py new file mode 100644 index 00000000..3c457cc5 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -0,0 +1,389 @@ +# -*- coding: utf-8 -*- +# test_soledad.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +""" + +import os +import mock +import tempfile +import threading + +from leap.soledad.client import Soledad +from leap.soledad.common.couch import CouchDatabase, CouchServerState +from leap.soledad.common.tests.test_couch import CouchDBTestCase +from leap.soledad.common.tests.u1db_tests import TestCaseWithServer +from leap.soledad.common.tests.test_target import ( +    make_token_soledad_app, +    make_leap_document_for_test, +    token_leap_sync_target, +) +from leap.soledad.common.tests.test_server import _couch_ensure_database + + +REPEAT_TIMES = 20 + + +# monkey path CouchServerState so it can ensure databases. + +CouchServerState.ensure_database = _couch_ensure_database + + +class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): + +    @staticmethod +    def make_app_after_state(state): +        return make_token_soledad_app(state) + +    make_document_for_test = make_leap_document_for_test + +    sync_target = token_leap_sync_target + +    def _soledad_instance(self, user='user-uuid', passphrase=u'123', +                          prefix='', +                          secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME, +                          local_db_path='soledad.u1db', server_url='', +                          cert_file=None, auth_token=None, secret_id=None): +        """ +        Instantiate Soledad. +        """ + +        # this callback ensures we save a document which is sent to the shared +        # db. +        def _put_doc_side_effect(doc): +            self._doc_put = doc + +        # we need a mocked shared db or else Soledad will try to access the +        # network to find if there are uploaded secrets. +        class MockSharedDB(object): + +            get_doc = mock.Mock(return_value=None) +            put_doc = mock.Mock(side_effect=_put_doc_side_effect) +            lock = mock.Mock(return_value=('atoken', 300)) +            unlock = mock.Mock() + +            def __call__(self): +                return self + +        Soledad._shared_db = MockSharedDB() +        return Soledad( +            user, +            passphrase, +            secrets_path=os.path.join(self.tempdir, prefix, secrets_path), +            local_db_path=os.path.join( +                self.tempdir, prefix, local_db_path), +            server_url=server_url, +            cert_file=cert_file, +            auth_token=auth_token, +            secret_id=secret_id) + +    def make_app(self): +        self.request_state = CouchServerState(self._couch_url, 'shared', +                                              'tokens') +        return self.make_app_after_state(self.request_state) + +    def setUp(self): +        TestCaseWithServer.setUp(self) +        CouchDBTestCase.setUp(self) +        self._couch_url = 'http://localhost:' + str(self.wrapper.port) +        self.db = CouchDatabase( +            self._couch_url, 'user-user-uuid', replica_uid='replica') +        self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") + +    def tearDown(self): +        self.db.delete_database() +        CouchDBTestCase.tearDown(self) +        TestCaseWithServer.tearDown(self) + +    # +    # Sequential tests +    # + +    def test_correct_transaction_log_after_sequential_puts(self): +        """ +        Assert that the transaction_log increases accordingly with sequential +        puts. +        """ +        doc = self.db.create_doc({'ops': 0}) +        ops = 1 +        docs = [doc.doc_id] +        for i in range(0, REPEAT_TIMES): +            self.assertEqual( +                i+1, len(self.db._get_transaction_log())) +            doc.content['ops'] += 1 +            self.db.put_doc(doc) +            docs.append(doc.doc_id) + +        # assert length of transaction_log +        transaction_log = self.db._get_transaction_log() +        self.assertEqual( +            REPEAT_TIMES+1, len(transaction_log)) + +        # assert that all entries in the log belong to the same doc +        self.assertEqual(REPEAT_TIMES+1, len(docs)) +        for doc_id in docs: +            self.assertEqual( +                REPEAT_TIMES+1, +                len(filter(lambda t: t[0] == doc_id, transaction_log))) + +    def test_correct_transaction_log_after_sequential_deletes(self): +        """ +        Assert that the transaction_log increases accordingly with sequential +        puts and deletes. +        """ +        docs = [] +        for i in range(0, REPEAT_TIMES): +            doc = self.db.create_doc({'ops': 0}) +            self.assertEqual( +                2*i+1, len(self.db._get_transaction_log())) +            docs.append(doc.doc_id) +            self.db.delete_doc(doc) +            self.assertEqual( +                2*i+2, len(self.db._get_transaction_log())) + +        # assert length of transaction_log +        transaction_log = self.db._get_transaction_log() +        self.assertEqual( +            2*REPEAT_TIMES, len(transaction_log)) + +        # assert that each doc appears twice in the transaction_log +        self.assertEqual(REPEAT_TIMES, len(docs)) +        for doc_id in docs: +            self.assertEqual( +                2, +                len(filter(lambda t: t[0] == doc_id, transaction_log))) + +    def test_correct_sync_log_after_sequential_syncs(self): +        """ +        Assert that the sync_log increases accordingly with sequential syncs. +        """ +        self.startServer() +        sol = self._soledad_instance( +            auth_token='auth-token', +            server_url=self.getURL()) + +        def _create_docs_and_sync(sol, syncs): +            # create a lot of documents +            for i in range(0, REPEAT_TIMES): +                sol.create_doc({}) +            # assert sizes of transaction and sync logs +            self.assertEqual( +                syncs*REPEAT_TIMES, +                len(self.db._get_transaction_log())) +            self.assertEqual( +                1 if syncs > 0 else 0, +                len(self.db._database.view('syncs/log').rows)) +            # sync to the remote db +            sol.sync() +            gen, docs = self.db.get_all_docs() +            self.assertEqual((syncs+1)*REPEAT_TIMES, gen) +            self.assertEqual((syncs+1)*REPEAT_TIMES, len(docs)) +            # assert sizes of transaction and sync logs +            self.assertEqual((syncs+1)*REPEAT_TIMES, +                             len(self.db._get_transaction_log())) +            sync_log_rows = self.db._database.view('syncs/log').rows +            sync_log = sync_log_rows[0].value +            replica_uid = sync_log_rows[0].key +            known_gen = sync_log['known_generation'] +            known_trans_id = sync_log['known_transaction_id'] +            # assert sync_log has exactly 1 row +            self.assertEqual(1, len(sync_log_rows)) +            # assert it has the correct replica_uid, gen and trans_id +            self.assertEqual(sol._db._replica_uid, replica_uid) +            sol_gen, sol_trans_id = sol._db._get_generation_info() +            self.assertEqual(sol_gen, known_gen) +            self.assertEqual(sol_trans_id, known_trans_id) + +        _create_docs_and_sync(sol, 0) +        _create_docs_and_sync(sol, 1) + +    # +    # Concurrency tests +    # +     +    class _WorkerThread(threading.Thread): +         +        def __init__(self, params, run_method): +            threading.Thread.__init__(self) +            self._params = params +            self._run_method = run_method + +        def run(self): +            self._run_method(self) + +    def test_correct_transaction_log_after_concurrent_puts(self): +        """ +        Assert that the transaction_log increases accordingly with concurrent +        puts. +        """ +        pool = threading.BoundedSemaphore(value=1) +        threads = [] +        docs = [] + +        def _run_method(self): +            doc = self._params['db'].create_doc({}) +            pool.acquire() +            self._params['docs'].append(doc.doc_id) +            pool.release() + + +        for i in range(0, REPEAT_TIMES): +            thread = self._WorkerThread( +                {'docs': docs, 'db': self.db}, +                _run_method) +            thread.start() +            threads.append(thread) + +        for thread in threads: +            thread.join() +         +        # assert length of transaction_log +        transaction_log = self.db._get_transaction_log() +        self.assertEqual( +            REPEAT_TIMES, len(transaction_log)) + +        # assert all documents are in the log +        self.assertEqual(REPEAT_TIMES, len(docs)) +        for doc_id in docs: +            self.assertEqual( +                1, +                len(filter(lambda t: t[0] == doc_id, transaction_log))) + +    def test_correct_transaction_log_after_concurrent_deletes(self): +        """ +        Assert that the transaction_log increases accordingly with concurrent +        puts and deletes. +        """ +        threads = [] +        docs = [] +        pool = threading.BoundedSemaphore(value=1) + +        # create/delete method that will be run concurrently +        def _run_method(self): +            doc = self._params['db'].create_doc({}) +            pool.acquire() +            docs.append(doc.doc_id) +            pool.release() +            self._params['db'].delete_doc(doc) + +        # launch concurrent threads +        for i in range(0, REPEAT_TIMES): +            thread = self._WorkerThread({'db': self.db}, _run_method) +            thread.start() +            threads.append(thread) + +        # wait for threads to finish +        for thread in threads: +            thread.join() + +        # assert transaction log +        transaction_log = self.db._get_transaction_log() +        self.assertEqual( +            2*REPEAT_TIMES, len(transaction_log)) +        # assert that each doc appears twice in the transaction_log +        self.assertEqual(REPEAT_TIMES, len(docs)) +        for doc_id in docs: +            self.assertEqual( +                2, +                len(filter(lambda t: t[0] == doc_id, transaction_log))) + +    def test_correct_sync_log_after_concurrent_puts_and_sync(self): +        """ +        Assert that the sync_log is correct after concurrent syncs. +        """ +        threads = [] +        docs = [] +        pool = threading.BoundedSemaphore(value=1) +        self.startServer() +        sol = self._soledad_instance( +            auth_token='auth-token', +            server_url=self.getURL()) + +        def _run_method(self): +            # create a lot of documents +            doc = self._params['sol'].create_doc({}) +            pool.acquire() +            docs.append(doc.doc_id) +            pool.release() + +        # launch threads to create documents in parallel +        for i in range(0, REPEAT_TIMES): +            thread = self._WorkerThread( +                {'sol': sol, 'syncs': i}, +                _run_method) +            thread.start() +            threads.append(thread) + +        # wait for threads to finish +        for thread in threads: +            thread.join() +         +        # do the sync! +        sol.sync() + +        transaction_log = self.db._get_transaction_log() +        self.assertEqual(REPEAT_TIMES, len(transaction_log)) +        # assert all documents are in the remote log +        self.assertEqual(REPEAT_TIMES, len(docs)) +        for doc_id in docs: +            self.assertEqual( +                1, +                len(filter(lambda t: t[0] == doc_id, transaction_log))) + +    def test_concurrent_syncs_do_not_fail(self): +        """ +        Assert that concurrent attempts to sync end up being executed +        sequentially and do not fail. +        """ +        threads = [] +        docs = [] +        pool = threading.BoundedSemaphore(value=1) +        self.startServer() +        sol = self._soledad_instance( +            auth_token='auth-token', +            server_url=self.getURL()) + +        def _run_method(self): +            # create a lot of documents +            doc = self._params['sol'].create_doc({}) +            # do the sync! +            sol.sync() +            pool.acquire() +            docs.append(doc.doc_id) +            pool.release() + +        # launch threads to create documents in parallel +        for i in range(0, REPEAT_TIMES): +            thread = self._WorkerThread( +                {'sol': sol, 'syncs': i}, +                _run_method) +            thread.start() +            threads.append(thread) + +        # wait for threads to finish +        for thread in threads: +            thread.join() + +        transaction_log = self.db._get_transaction_log() +        self.assertEqual(REPEAT_TIMES, len(transaction_log)) +        # assert all documents are in the remote log +        self.assertEqual(REPEAT_TIMES, len(docs)) +        for doc_id in docs: +            self.assertEqual( +                1, +                len(filter(lambda t: t[0] == doc_id, transaction_log))) diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index 83df192b..f8d2a64f 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -25,6 +25,7 @@ import tempfile  import simplejson as json  import mock  import time +import binascii  from leap.common.testing.basetest import BaseLeapTest @@ -50,6 +51,17 @@ from leap.soledad.server import SoledadApp, LockResource  from leap.soledad.server.auth import URLToAuthorization +# monkey path CouchServerState so it can ensure databases. + +def _couch_ensure_database(self, dbname): +    db = CouchDatabase.open_database( +        self._couch_url + '/' + dbname, +        create=True) +    return db, db._replica_uid + +CouchServerState.ensure_database = _couch_ensure_database + +  class ServerAuthorizationTestCase(BaseLeapTest):      """      Tests related to Soledad server authorization. @@ -339,15 +351,16 @@ class EncryptedSyncTestCase(          _, doclist = sol1.get_all_docs()          self.assertEqual([], doclist)          doc1 = sol1.create_doc(json.loads(simple_doc)) -        # sync with server -        sol1._server_url = self.getURL() -        sol1.sync() -        # assert doc was sent to couch db +        # ensure remote db exists before syncing          db = CouchDatabase(              self._couch_url,              # the name of the user database is "user-<uuid>".              'user-user-uuid',          ) +        # sync with server +        sol1._server_url = self.getURL() +        sol1.sync() +        # assert doc was sent to couch db          _, doclist = db.get_all_docs()          self.assertEqual(1, len(doclist))          couchdoc = doclist[0] @@ -376,6 +389,7 @@ class EncryptedSyncTestCase(          doc2 = doclist[0]          # assert incoming doc is equal to the first sent doc          self.assertEqual(doc1, doc2) +        db.delete_database()      def test_encrypted_sym_sync_with_unicode_passphrase(self):          """ @@ -393,15 +407,16 @@ class EncryptedSyncTestCase(          _, doclist = sol1.get_all_docs()          self.assertEqual([], doclist)          doc1 = sol1.create_doc(json.loads(simple_doc)) -        # sync with server -        sol1._server_url = self.getURL() -        sol1.sync() -        # assert doc was sent to couch db +        # ensure remote db exists before syncing          db = CouchDatabase(              self._couch_url,              # the name of the user database is "user-<uuid>".              'user-user-uuid',          ) +        # sync with server +        sol1._server_url = self.getURL() +        sol1.sync() +        # assert doc was sent to couch db          _, doclist = db.get_all_docs()          self.assertEqual(1, len(doclist))          couchdoc = doclist[0] @@ -434,7 +449,94 @@ class EncryptedSyncTestCase(          doc2 = doclist[0]          # assert incoming doc is equal to the first sent doc          self.assertEqual(doc1, doc2) +        db.delete_database() +    def test_sync_very_large_files(self): +        """ +        Test if Soledad can sync very large files. +        """ +        # define the size of the "very large file" +        length = 100*(10**6)  # 100 MB +        self.startServer() +        # instantiate soledad and create a document +        sol1 = self._soledad_instance( +            # token is verified in test_target.make_token_soledad_app +            auth_token='auth-token' +        ) +        _, doclist = sol1.get_all_docs() +        self.assertEqual([], doclist) +        content = binascii.hexlify(os.urandom(length/2))  # len() == length +        doc1 = sol1.create_doc({'data': content}) +        # ensure remote db exists before syncing +        db = CouchDatabase( +            self._couch_url, +            # the name of the user database is "user-<uuid>". +            'user-user-uuid', +        ) +        # sync with server +        sol1._server_url = self.getURL() +        sol1.sync() +        # instantiate soledad with empty db, but with same secrets path +        sol2 = self._soledad_instance(prefix='x', auth_token='auth-token') +        _, doclist = sol2.get_all_docs() +        self.assertEqual([], doclist) +        sol2._secrets_path = sol1.secrets_path +        sol2._load_secrets() +        sol2._set_secret_id(sol1._secret_id) +        # sync the new instance +        sol2._server_url = self.getURL() +        sol2.sync() +        _, doclist = sol2.get_all_docs() +        self.assertEqual(1, len(doclist)) +        doc2 = doclist[0] +        # assert incoming doc is equal to the first sent doc +        self.assertEqual(doc1, doc2) +        # delete remote database +        db.delete_database() + + +    def test_sync_many_small_files(self): +        """ +        Test if Soledad can sync many smallfiles. +        """ +        number_of_docs = 100 +        self.startServer() +        # instantiate soledad and create a document +        sol1 = self._soledad_instance( +            # token is verified in test_target.make_token_soledad_app +            auth_token='auth-token' +        ) +        _, doclist = sol1.get_all_docs() +        self.assertEqual([], doclist) +        # create many small files +        for i in range(0, number_of_docs): +            sol1.create_doc(json.loads(simple_doc)) +        # ensure remote db exists before syncing +        db = CouchDatabase( +            self._couch_url, +            # the name of the user database is "user-<uuid>". +            'user-user-uuid', +        ) +        # sync with server +        sol1._server_url = self.getURL() +        sol1.sync() +        # instantiate soledad with empty db, but with same secrets path +        sol2 = self._soledad_instance(prefix='x', auth_token='auth-token') +        _, doclist = sol2.get_all_docs() +        self.assertEqual([], doclist) +        sol2._secrets_path = sol1.secrets_path +        sol2._load_secrets() +        sol2._set_secret_id(sol1._secret_id) +        # sync the new instance +        sol2._server_url = self.getURL() +        sol2.sync() +        _, doclist = sol2.get_all_docs() +        self.assertEqual(number_of_docs, len(doclist)) +        # assert incoming docs are equal to sent docs +        for doc in doclist: +            self.assertEqual(sol1.get_doc(doc.doc_id), doc) +        # delete remote database +        db.delete_database()  class LockResourceTestCase(          CouchDBTestCase, TestCaseWithServer): @@ -455,12 +557,21 @@ class LockResourceTestCase(          CouchDBTestCase.setUp(self)          self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")          self._couch_url = 'http://localhost:' + str(self.wrapper.port) +        # create the databases +        CouchDatabase(self._couch_url, 'shared') +        CouchDatabase(self._couch_url, 'tokens')          self._state = CouchServerState(              self._couch_url, 'shared', 'tokens')      def tearDown(self):          CouchDBTestCase.tearDown(self)          TestCaseWithServer.tearDown(self) +        # delete remote database +        db = CouchDatabase( +            self._couch_url, +            'shared', +        ) +        db.delete_database()      def test__try_obtain_filesystem_lock(self):          responder = mock.Mock() diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_http_database.py b/common/src/leap/soledad/common/tests/u1db_tests/test_http_database.py index 3f3c7bba..9251000e 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_http_database.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_http_database.py @@ -133,12 +133,11 @@ class TestHTTPDatabaseSimpleOperations(tests.TestCase):               None), self.got)      def test_get_doc_deleted_include_deleted(self): -        self.response_val = errors.HTTPError(404, -                                             json.dumps( -                                             {"error": errors.DOCUMENT_DELETED} -                                             ), -                                             {'x-u1db-rev': 'doc-rev-gone', -                                              'x-u1db-has-conflicts': 'false'}) +        self.response_val = errors.HTTPError( +            404, +            json.dumps({"error": errors.DOCUMENT_DELETED}), +            {'x-u1db-rev': 'doc-rev-gone', +             'x-u1db-has-conflicts': 'false'})          doc = self.db.get_doc('deleted', include_deleted=True)          self.assertEqual('deleted', doc.doc_id)          self.assertEqual('doc-rev-gone', doc.rev) diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_open.py b/common/src/leap/soledad/common/tests/u1db_tests/test_open.py index 13425b4f..63406245 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_open.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_open.py @@ -24,7 +24,8 @@ from u1db import (  )  from leap.soledad.common.tests import u1db_tests as tests  from u1db.backends import sqlite_backend -from leap.soledad.common.tests.u1db_tests.test_backends import TestAlternativeDocument +from leap.soledad.common.tests.u1db_tests.test_backends \ +    import TestAlternativeDocument  class TestU1DBOpen(tests.TestCase): diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_sqlite_backend.py b/common/src/leap/soledad/common/tests/u1db_tests/test_sqlite_backend.py index a53ea6cc..8292dd07 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_sqlite_backend.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_sqlite_backend.py @@ -30,7 +30,8 @@ from u1db import (  from leap.soledad.common.tests import u1db_tests as tests  from u1db.backends import sqlite_backend -from leap.soledad.common.tests.u1db_tests.test_backends import TestAlternativeDocument +from leap.soledad.common.tests.u1db_tests.test_backends \ +    import TestAlternativeDocument  simple_doc = '{"key": "value"}' diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py index 5346d540..1f78f912 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py @@ -760,7 +760,7 @@ class DatabaseSyncTests(tests.DatabaseBaseTests,                                         {'docs': [], 'last_known_gen': 0},                                      'return':                                         {'docs': [(doc.doc_id, doc.rev)], -                                    'last_gen': 1}}) +                                        'last_gen': 1}})          self.assertEqual([doc], self.db1.get_from_index('test-idx', 'value'))      def test_sync_pulling_doesnt_update_other_if_changed(self): @@ -785,7 +785,7 @@ class DatabaseSyncTests(tests.DatabaseBaseTests,                                         {'docs': [], 'last_known_gen': 0},                                      'return':                                         {'docs': [(doc.doc_id, doc.rev)], -                                    'last_gen': 1}}) +                                        'last_gen': 1}})          self.assertEqual(1, self.db1._get_replica_gen_and_trans_id('test2')[0])          # c2 should not have gotten a '_record_sync_info' call, because the          # local database had been updated more than just by the messages @@ -819,8 +819,8 @@ class DatabaseSyncTests(tests.DatabaseBaseTests,          self.assertLastExchangeLog(self.db2,                                     {'receive':                                         {'docs': [(doc.doc_id, doc.rev)], -                                    'source_uid': 'test1', -                                    'source_gen': 1, 'last_known_gen': 0}, +                                        'source_uid': 'test1', +                                        'source_gen': 1, 'last_known_gen': 0},                                      'return': {'docs': [], 'last_gen': 1}})      def test_sync_ignores_superseded(self): @@ -839,11 +839,11 @@ class DatabaseSyncTests(tests.DatabaseBaseTests,          self.assertLastExchangeLog(self.db1,                                     {'receive':                                         {'docs': [(doc.doc_id, doc_rev1)], -                                    'source_uid': 'test2', -                                    'source_gen': 1, 'last_known_gen': 0}, +                                        'source_uid': 'test2', +                                        'source_gen': 1, 'last_known_gen': 0},                                      'return':                                         {'docs': [(doc.doc_id, doc_rev2)], -                                    'last_gen': 2}}) +                                        'last_gen': 2}})          self.assertGetDoc(self.db1, doc.doc_id, doc_rev2, new_content, False)      def test_sync_sees_remote_conflicted(self): @@ -861,11 +861,11 @@ class DatabaseSyncTests(tests.DatabaseBaseTests,          self.assertLastExchangeLog(self.db2,                                     {'receive':                                         {'docs': [(doc_id, doc1_rev)], -                                    'source_uid': 'test1', -                                    'source_gen': 1, 'last_known_gen': 0}, +                                        'source_uid': 'test1', +                                        'source_gen': 1, 'last_known_gen': 0},                                      'return':                                         {'docs': [(doc_id, doc2_rev)], -                                    'last_gen': 1}}) +                                        'last_gen': 1}})          self.assertTransactionLog([doc_id, doc_id], self.db1)          self.assertGetDoc(self.db1, doc_id, doc2_rev, new_doc, True)          self.assertGetDoc(self.db2, doc_id, doc2_rev, new_doc, False) @@ -892,10 +892,10 @@ class DatabaseSyncTests(tests.DatabaseBaseTests,          self.assertLastExchangeLog(self.db2,                                     {'receive':                                         {'docs': [(doc_id, doc1.rev)], -                                    'source_uid': 'test1', -                                    'source_gen': 2, 'last_known_gen': 1}, +                                        'source_uid': 'test1', +                                        'source_gen': 2, 'last_known_gen': 1},                                      'return': {'docs': [(doc_id, doc2.rev)], -                                    'last_gen': 2}}) +                                               'last_gen': 2}})          self.assertTransactionLog([doc_id, doc_id, doc_id], self.db1)          self.assertGetDocIncludeDeleted(self.db1, doc_id, doc2.rev, None, True)          self.assertGetDocIncludeDeleted( @@ -950,8 +950,8 @@ class DatabaseSyncTests(tests.DatabaseBaseTests,          self.assertLastExchangeLog(self.db2,                                     {'receive':                                         {'docs': [(doc_id, deleted_rev)], -                                    'source_uid': 'test1', -                                    'source_gen': 2, 'last_known_gen': 1}, +                                        'source_uid': 'test1', +                                        'source_gen': 2, 'last_known_gen': 1},                                      'return': {'docs': [], 'last_gen': 2}})          self.assertGetDocIncludeDeleted(              self.db1, doc_id, deleted_rev, None, False) @@ -1121,6 +1121,7 @@ class DatabaseSyncTests(tests.DatabaseBaseTests,  class TestDbSync(tests.TestCaseWithServer): +      """Test db.sync remote sync shortcut"""      scenarios = [ @@ -1189,6 +1190,7 @@ class TestDbSync(tests.TestCaseWithServer):  class TestRemoteSyncIntegration(tests.TestCaseWithServer): +      """Integration tests for the most common sync scenario local -> remote"""      make_app_with_state = staticmethod(make_http_app) @@ -1204,7 +1206,7 @@ class TestRemoteSyncIntegration(tests.TestCaseWithServer):          doc12 = self.db1.create_doc_from_json('{"a": 2}')          doc21 = self.db2.create_doc_from_json('{"b": 1}')          doc22 = self.db2.create_doc_from_json('{"b": 2}') -        #sanity +        # sanity          self.assertEqual(2, len(self.db1._get_transaction_log()))          self.assertEqual(2, len(self.db2._get_transaction_log()))          progress1 = []  | 
