diff options
21 files changed, 2141 insertions, 789 deletions
| @@ -10,5 +10,3 @@ MANIFEST  *.pyc  *.log  *.*~ - - diff --git a/common/changes/bug_4475_remodel-couch-backend b/common/changes/bug_4475_remodel-couch-backend new file mode 100644 index 00000000..13a1b121 --- /dev/null +++ b/common/changes/bug_4475_remodel-couch-backend @@ -0,0 +1,2 @@ +  o Remodel couch backend to fix concurrency and scalability. Closes #4475, +    #4682, #4683 and #4680. diff --git a/common/setup.py b/common/setup.py index bcc2b4b3..e142d958 100644 --- a/common/setup.py +++ b/common/setup.py @@ -103,7 +103,163 @@ def get_versions(default={}, verbose=False):              f.write(subst_template) +# +# Couch backend design docs file generation. +# + +from os import listdir +from os.path import realpath, dirname, isdir, join, isfile, basename +import json +import binascii + + +old_cmd_sdist = cmdclass["sdist"] + + +def build_ddocs_py(basedir=None, with_src=True): +    """ +    Build `ddocs.py` file. + +    For ease of development, couch backend design documents are stored as +    `.js` files in  subdirectories of `src/leap/soledad/common/ddocs`. This +    function scans that directory for javascript files, builds the design +    documents structure, and encode those structures in the `ddocs.py` file. + +    This function is used when installing in develop mode, building or +    generating source distributions (see the next classes and the `cmdclass` +    setuptools parameter. + +    This funciton uses the following conventions to generate design documents: + +      - Design documents are represented by directories in the form +        `<prefix>/<ddoc>`, there prefix is the `src/leap/soledad/common/ddocs` +        directory. +      - Design document directories might contain `views`, `lists` and +        `updates` subdirectories. +      - Views subdirectories must contain a `map.js` file and may contain a +        `reduce.js` file. +      - List and updates subdirectories may contain any number of javascript +        files (i.e. ending in `.js`) whose names will be mapped to the +        corresponding list or update function name. +    """ +    cur_pwd = dirname(realpath(__file__)) +    common_path = ('src', 'leap', 'soledad', 'common') +    dest_common_path = common_path +    if not with_src: +        dest_common_path = common_path[1:] +    prefix = join(cur_pwd, *common_path) + +    dest_prefix = prefix +    if basedir is not None: +        # we're bulding a sdist +        dest_prefix = join(basedir, *dest_common_path) + +    ddocs_prefix = join(prefix, 'ddocs') +    ddocs = {} + +    # design docs are represented by subdirectories of `ddocs_prefix` +    for ddoc in [f for f in listdir(ddocs_prefix) +                 if isdir(join(ddocs_prefix, f))]: + +        ddocs[ddoc] = {'_id': '_design/%s' % ddoc} + +        for t in ['views', 'lists', 'updates']: +            tdir = join(ddocs_prefix, ddoc, t) +            if isdir(tdir): + +                ddocs[ddoc][t] = {} + +                if t == 'views':  # handle views (with map/reduce functions) +                    for view in [f for f in listdir(tdir) +                                 if isdir(join(tdir, f))]: +                        # look for map.js and reduce.js +                        mapfile = join(tdir, view, 'map.js') +                        reducefile = join(tdir, view, 'reduce.js') +                        mapfun = None +                        reducefun = None +                        try: +                            with open(mapfile) as f: +                                mapfun = f.read() +                        except IOError: +                            pass +                        try: +                            with open(reducefile) as f: +                                reducefun = f.read() +                        except IOError: +                            pass +                        ddocs[ddoc]['views'][view] = {} + +                        if mapfun is not None: +                            ddocs[ddoc]['views'][view]['map'] = mapfun +                        if reducefun is not None: +                            ddocs[ddoc]['views'][view]['reduce'] = reducefun + +                else:  # handle lists, updates, etc +                    for fun in [f for f in listdir(tdir) +                                if isfile(join(tdir, f))]: +                        funfile = join(tdir, fun) +                        funname = basename(funfile).replace('.js', '') +                        try: +                            with open(funfile) as f: +                                ddocs[ddoc][t][funname] = f.read() +                        except IOError: +                            pass +    # write file containing design docs strings +    ddoc_filename = "ddocs.py" +    with open(join(dest_prefix, ddoc_filename), 'w') as f: +        for ddoc in ddocs: +            f.write( +                "%s = '%s'\n" % +                (ddoc, binascii.b2a_base64(json.dumps(ddocs[ddoc]))[:-1])) +    print "Wrote design docs in %s" % (dest_prefix + '/' + ddoc_filename,) + + +from setuptools.command.develop import develop as _cmd_develop + + +class cmd_develop(_cmd_develop): +    def run(self): +        # versioneer: +        versions = versioneer.get_versions(verbose=True) +        self._versioneer_generated_versions = versions +        # unless we update this, the command will keep using the old version +        self.distribution.metadata.version = versions["version"] +        _cmd_develop.run(self) +        build_ddocs_py() + + +# versioneer powered +old_cmd_sdist = cmdclass["sdist"] + + +class cmd_sdist(old_cmd_sdist): +    """ +    Generate 'src/leap/soledad/common/ddocs.py' which contains couch design +    documents scripts. +    """ +    def run(self): +        old_cmd_sdist.run(self) + +    def make_release_tree(self, base_dir, files): +        old_cmd_sdist.make_release_tree(self, base_dir, files) +        build_ddocs_py(basedir=base_dir) + + +# versioneer powered +old_cmd_build = cmdclass["build"] + + +class cmd_build(old_cmd_build): +    def run(self): +        old_cmd_build.run(self) +        build_ddocs_py(basedir=self.build_lib, with_src=False) + +  cmdclass["freeze_debianver"] = freeze_debianver +cmdclass["build"] = cmd_build +cmdclass["sdist"] = cmd_sdist +cmdclass["develop"] = cmd_develop +  # XXX add ref to docs diff --git a/common/src/leap/soledad/common/.gitignore b/common/src/leap/soledad/common/.gitignore new file mode 100644 index 00000000..3378c78a --- /dev/null +++ b/common/src/leap/soledad/common/.gitignore @@ -0,0 +1 @@ +ddocs.py diff --git a/common/src/leap/soledad/common/README.txt b/common/src/leap/soledad/common/README.txt new file mode 100644 index 00000000..106efb5e --- /dev/null +++ b/common/src/leap/soledad/common/README.txt @@ -0,0 +1,79 @@ +Soledad common package +====================== + +This package contains Soledad bits used by both server and client. + +Couch U1DB Backend +------------------ + +U1DB backends rely on some atomic operations that modify documents contents +and metadata (conflicts, transaction ids and indexes). The only atomic +operation in Couch is a document put, so every u1db atomic operation has to be +mapped to a couch document put. + +The atomic operations in the U1DB SQLite reference backend implementation may +be identified by the use of a context manager to access the underlying +database. A listing of the methods involved in each atomic operation are +depiced below. The top-level elements correpond to the atomic operations that +have to be mapped, and items on deeper levels of the list have to be +implemented in a way that all changes will be pushed with just one operation. + +    * _set_replica_uid +    * put_doc: +        * _get_doc +        * _put_and_update_indexes +            * insert/update the document +            * insert into transaction log +    * delete_doc +        * _get_doc +        * _put_and_update_indexes +    * get_doc_conflicts +        * _get_conflicts +    * _set_replica_gen_and_trans_id +        * _do_set_replica_gen_and_trans_id +    * _put_doc_if_newer +        * _get_doc +        * _validate_source (**) +            * _get_replica_gen_and_trans_id +        * cases: +            * is newer: +                * _prune_conflicts (**) +                    * _has_conflicts +                    * _delete_conflicts +                * _put_and_update_indexes +            * same content as: +                * _put_and_update_indexes +            * conflicted: +                * _force_doc_sync_conflict +                    * _prune_conflicts +                    * _add_conflict +                    * _put_and_update_indexes +        * _do_set_replica_gen_and_trans_id +    * resolve_doc +        * _get_doc +        * cases: +            * doc is superseded +                * _put_and_update_indexes +            * else +                * _add_conflict +        * _delete_conflicts +    * delete_index +    * create_index + +Couch views and update functions are used in order to achieve atomicity on the +Couch backend. Transactions are stored in the `u1db_transactions` field of the +couch document. Document's content and conflicted versions are stored as couch +document attachments with names, respectivelly, `u1db_content` and +`u1db_conflicts`. + +A map of methods and couch query URI can be found on the `./ddocs/README.txt` +document. + +Notes: + +  * Currently, the couch backend does not implement indexing, so what is +    depicted as `_put_and_update_indexes` above will be found as `_put_doc` in +    the backend. + +  * Conflict updates are part of document put using couch update functions, +    and as such are part of the same atomic operation as document put. diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 1396f4d7..d2414477 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -18,26 +18,24 @@  """A U1DB backend that uses CouchDB as its persistence layer.""" -import re  import simplejson as json -import socket +import re +import uuid  import logging +import binascii +import socket -from u1db import errors -from u1db.sync import Synchronizer -from u1db.backends.inmemory import InMemoryIndex -from u1db.remote.server_state import ServerState -from u1db.errors import DatabaseDoesNotExist -from couchdb.client import Server, Document as CouchDocument +from couchdb.client import Server  from couchdb.http import ResourceNotFound, Unauthorized +from u1db import errors, query_parser, vectorclock +from u1db.backends import CommonBackend, CommonSyncTarget +from u1db.remote import http_app +from u1db.remote.server_state import ServerState -from leap.soledad.common import USER_DB_PREFIX -from leap.soledad.common.objectstore import ( -    ObjectStoreDatabase, -    ObjectStoreSyncTarget, -) +from leap.soledad.common import USER_DB_PREFIX, ddocs +from leap.soledad.common.document import SoledadDocument  logger = logging.getLogger(__name__) @@ -49,158 +47,129 @@ class InvalidURLError(Exception):      """ -def persistent_class(cls): +class CouchDocument(SoledadDocument):      """ -    Decorator that modifies a class to ensure u1db metadata persists on -    underlying storage. +    This is the document used for maintaining the Couch backend. -    @param cls: The class that will be modified. -    @type cls: type +    A CouchDocument can fetch and manipulate conflicts and also holds a +    reference to the couch document revision. This data is used to ensure an +    atomic and consistent update of the database.      """ -    def _create_persistent_method(old_method_name, key, load_method_name, -                                  dump_method_name, store): -        """ -        Create a persistent method to replace C{old_method_name}. - -        The new method will load C{key} using C{load_method_name} and stores -        it using C{dump_method_name} depending on the value of C{store}. -        """ -        # get methods -        old_method = getattr(cls, old_method_name) -        load_method = getattr(cls, load_method_name) \ -            if load_method_name is not None \ -            else lambda self, data: setattr(self, key, data) -        dump_method = getattr(cls, dump_method_name) \ -            if dump_method_name is not None \ -            else lambda self: getattr(self, key) - -        def _new_method(self, *args, **kwargs): -            # get u1db data from couch db -            doc = self._get_doc('%s%s' % -                                (self.U1DB_DATA_DOC_ID_PREFIX, key)) -            load_method(self, doc.content['content']) -            # run old method -            retval = old_method(self, *args, **kwargs) -            # store u1db data on couch -            if store: -                doc.content = {'content': dump_method(self)} -                self._put_doc(doc) -            return retval - -        return _new_method - -    # ensure the class has a persistency map -    if not hasattr(cls, 'PERSISTENCY_MAP'): -        logger.error('Class %s has no PERSISTENCY_MAP attribute, skipping ' -                     'persistent methods substitution.' % cls) -        return cls -    # replace old methods with new persistent ones -    for key, ((load_method_name, dump_method_name), -              persistent_methods) in cls.PERSISTENCY_MAP.iteritems(): -        for (method_name, store) in persistent_methods: -            setattr(cls, method_name, -                    _create_persistent_method( -                        method_name, -                        key, -                        load_method_name, -                        dump_method_name, -                        store)) -    return cls - - -@persistent_class -class CouchDatabase(ObjectStoreDatabase): +    def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, +                 syncable=True): +        """ +        Container for handling a document that is stored in couch backend. + +        :param doc_id: The unique document identifier. +        :type doc_id: str +        :param rev: The revision identifier of the document. +        :type rev: str +        :param json: The JSON string for this document. +        :type json: str +        :param has_conflicts: Boolean indicating if this document has conflicts +        :type has_conflicts: bool +        :param syncable: Should this document be synced with remote replicas? +        :type syncable: bool +        """ +        SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) +        self._couch_rev = None +        self._conflicts = None +        self._modified_conflicts = False + +    def ensure_fetch_conflicts(self, get_conflicts_fun): +        """ +        Ensure conflict data has been fetched from the server. + +        :param get_conflicts_fun: A function which, given the document id and +                                  the couch revision, return the conflicted +                                  versions of the current document. +        :type get_conflicts_fun: function +        """ +        if self._conflicts is None: +            self._conflicts = get_conflicts_fun(self.doc_id, +                                                couch_rev=self.couch_rev) +        self.has_conflicts = len(self._conflicts) > 0 + +    def get_conflicts(self): +        """ +        Get the conflicted versions of the document. + +        :return: The conflicted versions of the document. +        :rtype: [CouchDocument] +        """ +        return self._conflicts + +    def add_conflict(self, doc): +        """ +        Add a conflict to this document. + +        :param doc: The conflicted version to be added. +        :type doc: CouchDocument +        """ +        if self._conflicts is None: +            raise Exception("Run self.ensure_fetch_conflicts first!") +        self._modified_conflicts = True +        self._conflicts.append(doc) +        self.has_conflicts = len(self._conflicts) > 0 + +    def delete_conflicts(self, conflict_revs): +        """ +        Delete conflicted versions of this document. + +        :param conflict_revs: The conflicted revisions to be deleted. +        :type conflict_revs: [str] +        """ +        if self._conflicts is None: +            raise Exception("Run self.ensure_fetch_conflicts first!") +        conflicts_len = len(self._conflicts) +        self._conflicts = filter( +            lambda doc: doc.rev not in conflict_revs, +            self._conflicts) +        if len(self._conflicts) < conflicts_len: +            self._modified_conflicts = True +        self.has_conflicts = len(self._conflicts) > 0 + +    def modified_conflicts(self): +        """ +        Return whether this document's conflicts have been modified. + +        :return: Whether this document's conflicts have been modified. +        :rtype: bool +        """ +        return self._conflicts is not None and \ +            self._modified_conflicts is True + +    def _get_couch_rev(self): +        return self._couch_rev + +    def _set_couch_rev(self, rev): +        self._couch_rev = rev + +    couch_rev = property(_get_couch_rev, _set_couch_rev) + + +# monkey-patch the u1db http app to use CouchDocument +http_app.Document = CouchDocument + + +class CouchDatabase(CommonBackend):      """ -    A U1DB backend that uses Couch as its persistence layer. +    A U1DB implementation that uses CouchDB as its persistence layer.      """ -    U1DB_TRANSACTION_LOG_KEY = '_transaction_log' -    U1DB_CONFLICTS_KEY = '_conflicts' -    U1DB_OTHER_GENERATIONS_KEY = '_other_generations' -    U1DB_INDEXES_KEY = '_indexes' -    U1DB_REPLICA_UID_KEY = '_replica_uid' - -    U1DB_DATA_KEYS = [ -        U1DB_TRANSACTION_LOG_KEY, -        U1DB_CONFLICTS_KEY, -        U1DB_OTHER_GENERATIONS_KEY, -        U1DB_INDEXES_KEY, -        U1DB_REPLICA_UID_KEY, -    ] - -    COUCH_ID_KEY = '_id' -    COUCH_REV_KEY = '_rev' -    COUCH_U1DB_ATTACHMENT_KEY = 'u1db_json' -    COUCH_U1DB_REV_KEY = 'u1db_rev' - -    # the following map describes information about methods usage of -    # properties that have to persist on the underlying database. The format -    # of the map is assumed to be: -    # -    #     { -    #         'property_name': [ -    #             ('property_load_method_name', 'property_dump_method_name'), -    #             [('method_1_name', bool), -    #              ... -    #              ('method_N_name', bool)]], -    #         ... -    #     } -    # -    # where the booleans indicate if the property should be stored after -    # each method execution (i.e. if the method alters the property). Property -    # load/dump methods will be run after/before properties are read/written -    # to the underlying db. -    PERSISTENCY_MAP = { -        U1DB_TRANSACTION_LOG_KEY: [ -            ('_load_transaction_log_from_json', None), -            [('_get_transaction_log', False), -             ('_get_generation', False), -             ('_get_generation_info', False), -             ('_get_trans_id_for_gen', False), -             ('whats_changed', False), -             ('_put_and_update_indexes', True)]], -        U1DB_CONFLICTS_KEY: [ -            (None, None), -            [('_has_conflicts', False), -             ('get_doc_conflicts', False), -             ('_prune_conflicts', False), -             ('resolve_doc', False), -             ('_replace_conflicts', True), -             ('_force_doc_sync_conflict', True)]], -        U1DB_OTHER_GENERATIONS_KEY: [ -            ('_load_other_generations_from_json', None), -            [('_get_replica_gen_and_trans_id', False), -             ('_do_set_replica_gen_and_trans_id', True)]], -        U1DB_INDEXES_KEY: [ -            ('_load_indexes_from_json', '_dump_indexes_as_json'), -            [('list_indexes', False), -             ('get_from_index', False), -             ('get_range_from_index', False), -             ('get_index_keys', False), -             ('_put_and_update_indexes', True), -             ('create_index', True), -             ('delete_index', True)]], -        U1DB_REPLICA_UID_KEY: [ -            (None, None), -            [('_allocate_doc_rev', False), -             ('_put_doc_if_newer', False), -             ('_ensure_maximal_rev', False), -             ('_prune_conflicts', False), -             ('_set_replica_uid', True)]]} -      @classmethod      def open_database(cls, url, create):          """          Open a U1DB database using CouchDB as backend. -        @param url: the url of the database replica -        @type url: str -        @param create: should the replica be created if it does not exist? -        @type create: bool +        :param url: the url of the database replica +        :type url: str +        :param create: should the replica be created if it does not exist? +        :type create: bool -        @return: the database instance -        @rtype: CouchDatabase +        :return: the database instance +        :rtype: CouchDatabase          """          # get database from url          m = re.match('(^https?://[^/]+)/(.+)$', url) @@ -213,301 +182,819 @@ class CouchDatabase(ObjectStoreDatabase):              server[dbname]          except ResourceNotFound:              if not create: -                raise DatabaseDoesNotExist() +                raise errors.DatabaseDoesNotExist()          return cls(url, dbname)      def __init__(self, url, dbname, replica_uid=None, full_commit=True, -                 session=None): +                 session=None, ensure_ddocs=True):          """          Create a new Couch data container. -        @param url: the url of the couch database -        @type url: str -        @param dbname: the database name -        @type dbname: str -        @param replica_uid: an optional unique replica identifier -        @type replica_uid: str -        @param full_commit: turn on the X-Couch-Full-Commit header -        @type full_commit: bool -        @param session: an http.Session instance or None for a default session -        @type session: http.Session +        :param url: the url of the couch database +        :type url: str +        :param dbname: the database name +        :type dbname: str +        :param replica_uid: an optional unique replica identifier +        :type replica_uid: str +        :param full_commit: turn on the X-Couch-Full-Commit header +        :type full_commit: bool +        :param session: an http.Session instance or None for a default session +        :type session: http.Session +        :param ensure_ddocs: Ensure that the design docs exist on server. +        :type ensure_ddocs: bool          """          # save params          self._url = url          self._full_commit = full_commit          self._session = session +        self._factory = CouchDocument +        self._real_replica_uid = None          # configure couch          self._server = Server(url=self._url,                                full_commit=self._full_commit,                                session=self._session)          self._dbname = dbname -        # this will ensure that transaction and sync logs exist and are -        # up-to-date.          try:              self._database = self._server[self._dbname]          except ResourceNotFound:              self._server.create(self._dbname)              self._database = self._server[self._dbname] -        ObjectStoreDatabase.__init__(self, replica_uid=replica_uid) +        if replica_uid is not None: +            self._set_replica_uid(replica_uid) +        if ensure_ddocs: +            self.ensure_ddocs_on_db() + +    def ensure_ddocs_on_db(self): +        """ +        Ensure that the design documents used by the backend exist on the +        couch database. +        """ +        # we check for existence of one of the files, and put all of them if +        # that one does not exist +        try: +            self._database['_design/docs'] +            return +        except ResourceNotFound: +            for ddoc_name in ['docs', 'syncs', 'transactions']: +                ddoc = json.loads( +                    binascii.a2b_base64( +                        getattr(ddocs, ddoc_name))) +                self._database.save(ddoc) + +    def get_sync_target(self): +        """ +        Return a SyncTarget object, for another u1db to synchronize with. + +        :return: The sync target. +        :rtype: CouchSyncTarget +        """ +        return CouchSyncTarget(self) + +    def delete_database(self): +        """ +        Delete a U1DB CouchDB database. +        """ +        del(self._server[self._dbname]) + +    def close(self): +        """ +        Release any resources associated with this database. + +        :return: True if db was succesfully closed. +        :rtype: bool +        """ +        self._url = None +        self._full_commit = None +        self._session = None +        self._server = None +        self._database = None +        return True + +    def _set_replica_uid(self, replica_uid): +        """ +        Force the replica uid to be set. + +        :param replica_uid: The new replica uid. +        :type replica_uid: str +        """ +        try: +            # set on existent config document +            doc = self._database['u1db_config'] +            doc['replica_uid'] = replica_uid +        except ResourceNotFound: +            # or create the config document +            doc = { +                '_id': 'u1db_config', +                'replica_uid': replica_uid, +            } +        self._database.save(doc) +        self._real_replica_uid = replica_uid + +    def _get_replica_uid(self): +        """ +        Get the replica uid. + +        :return: The replica uid. +        :rtype: str +        """ +        if self._real_replica_uid is not None: +            return self._real_replica_uid +        try: +            # grab replica_uid from server +            doc = self._database['u1db_config'] +            self._real_replica_uid = doc['replica_uid'] +            return self._real_replica_uid +        except ResourceNotFound: +            # create a unique replica_uid +            self._real_replica_uid = uuid.uuid4().hex +            self._set_replica_uid(self._real_replica_uid) +            return self._real_replica_uid + +    _replica_uid = property(_get_replica_uid, _set_replica_uid) + +    def _get_generation(self): +        """ +        Return the current generation. -    #------------------------------------------------------------------------- -    # methods from Database -    #------------------------------------------------------------------------- +        :return: The current generation. +        :rtype: int +        """ +        # query a couch list function +        res = self._database.resource( +            '_design', 'transactions', '_list', 'generation', 'log') +        response = res.get_json() +        return response[2]['generation'] + +    def _get_generation_info(self): +        """ +        Return the current generation. + +        :return: A tuple containing the current generation and transaction id. +        :rtype: (int, str) +        """ +        # query a couch list function +        res = self._database.resource( +            '_design', 'transactions', '_list', 'generation', 'log') +        response = res.get_json() +        return (response[2]['generation'], response[2]['transaction_id']) + +    def _get_trans_id_for_gen(self, generation): +        """ +        Get the transaction id corresponding to a particular generation. + +        :param generation: The generation for which to get the transaction id. +        :type generation: int + +        :return: The transaction id for C{generation}. +        :rtype: str + +        :raise InvalidGeneration: Raised when the generation does not exist. +        """ +        if generation == 0: +            return '' +        # query a couch list function +        res = self._database.resource( +            '_design', 'transactions', '_list', 'trans_id_for_gen', 'log') +        response = res.get_json(gen=generation) +        if response[2] == {}: +            raise errors.InvalidGeneration +        return response[2]['transaction_id'] + +    def _get_transaction_log(self): +        """ +        This is only for the test suite, it is not part of the api. + +        :return: The complete transaction log. +        :rtype: [(str, str)] +        """ +        # query a couch view +        res = self._database.resource( +            '_design', 'transactions', '_view', 'log') +        response = res.get_json() +        return map(lambda row: (row['id'], row['value']), response[2]['rows'])      def _get_doc(self, doc_id, check_for_conflicts=False):          """ -        Get just the document content, without fancy handling. +        Extract the document from storage. + +        This can return None if the document doesn't exist. + +        :param doc_id: The unique document identifier +        :type doc_id: str +        :param check_for_conflicts: If set to False, then the conflict check +                                    will be skipped. +        :type check_for_conflicts: bool + +        :return: The document. +        :rtype: CouchDocument +        """ +        # get document with all attachments (u1db content and eventual +        # conflicts) +        try: +            result = \ +                self._database.resource(doc_id).get_json( +                    attachments=True)[2] +        except ResourceNotFound: +            return None +        # restrict to u1db documents +        if 'u1db_rev' not in result: +            return None +        doc = self._factory(doc_id, result['u1db_rev']) +        # set contents or make tombstone +        if '_attachments' not in result \ +                or 'u1db_content' not in result['_attachments']: +            doc.make_tombstone() +        else: +            doc.content = json.loads( +                binascii.a2b_base64( +                    result['_attachments']['u1db_content']['data'])) +        # determine if there are conflicts +        if check_for_conflicts \ +                and '_attachments' in result \ +                and 'u1db_conflicts' in result['_attachments']: +            doc.has_conflicts = True +        # store couch revision +        doc.couch_rev = result['_rev'] +        return doc + +    def get_doc(self, doc_id, include_deleted=False): +        """ +        Get the JSON string for the given document. -        @param doc_id: The unique document identifier -        @type doc_id: str -        @param include_deleted: If set to True, deleted documents will be +        :param doc_id: The unique document identifier +        :type doc_id: str +        :param include_deleted: If set to True, deleted documents will be              returned with empty content. Otherwise asking for a deleted              document will return None. -        @type include_deleted: bool +        :type include_deleted: bool -        @return: a Document object. -        @type: u1db.Document +        :return: A document object. +        :rtype: CouchDocument.          """ -        cdoc = self._database.get(doc_id) -        if cdoc is None: +        doc = self._get_doc(doc_id, check_for_conflicts=True) +        if doc is None: +            return None +        if doc.is_tombstone() and not include_deleted:              return None -        has_conflicts = False -        if check_for_conflicts: -            has_conflicts = self._has_conflicts(doc_id) -        doc = self._factory( -            doc_id=doc_id, -            rev=cdoc[self.COUCH_U1DB_REV_KEY], -            has_conflicts=has_conflicts) -        contents = self._database.get_attachment( -            cdoc, -            self.COUCH_U1DB_ATTACHMENT_KEY) -        if contents: -            doc.content = json.loads(contents.read()) -        else: -            doc.make_tombstone()          return doc      def get_all_docs(self, include_deleted=False):          """          Get the JSON content for all documents in the database. -        @param include_deleted: If set to True, deleted documents will be -            returned with empty content. Otherwise deleted documents will not -            be included in the results. -        @type include_deleted: bool +        :param include_deleted: If set to True, deleted documents will be +                                returned with empty content. Otherwise deleted +                                documents will not be included in the results. +        :type include_deleted: bool -        @return: (generation, [Document]) +        :return: (generation, [CouchDocument])              The current generation of the database, followed by a list of all              the documents in the database. -        @rtype: tuple +        :rtype: (int, [CouchDocument])          """ +          generation = self._get_generation()          results = [] -        for doc_id in self._database: -            if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): -                continue -            doc = self._get_doc(doc_id, check_for_conflicts=True) -            if doc.content is None and not include_deleted: -                continue -            results.append(doc) +        for row in self._database.view('_all_docs'): +            doc = self.get_doc(row.id, include_deleted=include_deleted) +            if doc is not None: +                results.append(doc)          return (generation, results) -    def _put_doc(self, doc): +    def _put_doc(self, old_doc, doc):          """ -        Update a document. +        Put the document in the Couch backend database. -        This is called everytime we just want to do a raw put on the db (i.e. -        without index updates, document constraint checks, and conflict -        checks). - -        @param doc: The document to update. -        @type doc: u1db.Document - -        @return: The new revision identifier for the document. -        @rtype: str -        """ -        # prepare couch's Document -        cdoc = CouchDocument() -        cdoc[self.COUCH_ID_KEY] = doc.doc_id -        # we have to guarantee that couch's _rev is consistent -        old_cdoc = self._database.get(doc.doc_id) -        if old_cdoc is not None: -            cdoc[self.COUCH_REV_KEY] = old_cdoc[self.COUCH_REV_KEY] -        # store u1db's rev -        cdoc[self.COUCH_U1DB_REV_KEY] = doc.rev -        # save doc in db -        self._database.save(cdoc) -        # store u1db's content as json string -        if not doc.is_tombstone(): -            self._database.put_attachment( -                cdoc, doc.get_json(), -                filename=self.COUCH_U1DB_ATTACHMENT_KEY) -        else: -            self._database.delete_attachment( -                cdoc, -                self.COUCH_U1DB_ATTACHMENT_KEY) +        :param old_doc: The old document version. +        :type old_doc: CouchDocument +        :param doc: The document to be put. +        :type doc: CouchDocument -    def get_sync_target(self): +        :raise RevisionConflict: Raised when trying to update a document but +                                 couch revisions mismatch.          """ -        Return a SyncTarget object, for another u1db to synchronize with. - -        @return: The sync target. -        @rtype: CouchSyncTarget +        trans_id = self._allocate_transaction_id() +        # encode content +        content = doc.get_json() +        if content is not None: +            content = binascii.b2a_base64(content)[:-1]  # exclude trailing \n +        # encode conflicts +        conflicts = None +        update_conflicts = doc.modified_conflicts() +        if update_conflicts is True: +            if doc.has_conflicts: +                conflicts = binascii.b2a_base64( +                    json.dumps( +                        map(lambda cdoc: (cdoc.rev, cdoc.content), +                            doc.get_conflicts())) +            )[:-1]  # exclude \n +        # perform the request +        resource = self._database.resource( +            '_design', 'docs', '_update', 'put', doc.doc_id) +        response = resource.put_json( +            body={ +                'couch_rev': old_doc.couch_rev +                    if old_doc is not None else None, +                'u1db_rev': doc.rev, +                'content': content, +                'trans_id': trans_id, +                'conflicts': conflicts, +                'update_conflicts': update_conflicts, +            }, +            headers={'content-type': 'application/json'}) +        # the document might have been updated in between, so we check for the +        # return message +        msg = response[2].read() +        if msg == 'ok': +            return +        elif msg == 'revision conflict': +            raise errors.RevisionConflict() + +    def put_doc(self, doc):          """ -        return CouchSyncTarget(self) +        Update a document. -    def create_index(self, index_name, *index_expressions): -        """ -        Create a named index, which can then be queried for future lookups. +        If the document currently has conflicts, put will fail. +        If the database specifies a maximum document size and the document +        exceeds it, put will fail and raise a DocumentTooBig exception. -        @param index_name: A unique name which can be used as a key prefix. -        @param index_expressions: Index expressions defining the index -            information. -        """ -        if index_name in self._indexes: -            if self._indexes[index_name]._definition == list( -                    index_expressions): -                return -            raise errors.IndexNameTakenError -        index = InMemoryIndex(index_name, list(index_expressions)) -        for doc_id in self._database: -            if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): -                continue  # skip special files -            doc = self._get_doc(doc_id) -            if doc.content is not None: -                index.add_json(doc_id, doc.get_json()) -        self._indexes[index_name] = index +        :param doc: A Document with new content. +        :return: new_doc_rev - The new revision identifier for the document. +            The Document object will also be updated. -    def close(self): +        :raise errors.InvalidDocId: Raised if the document's id is invalid. +        :raise errors.DocumentTooBig: Raised if the document size is too big. +        :raise errors.ConflictedDoc: Raised if the document has conflicts.          """ -        Release any resources associated with this database. - -        @return: True if db was succesfully closed. -        @rtype: bool +        if doc.doc_id is None: +            raise errors.InvalidDocId() +        self._check_doc_id(doc.doc_id) +        self._check_doc_size(doc) +        old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) +        if old_doc and old_doc.has_conflicts: +            raise errors.ConflictedDoc() +        if old_doc and doc.rev is None and old_doc.is_tombstone(): +            new_rev = self._allocate_doc_rev(old_doc.rev) +        else: +            if old_doc is not None: +                    if old_doc.rev != doc.rev: +                        raise errors.RevisionConflict() +            else: +                if doc.rev is not None: +                    raise errors.RevisionConflict() +            new_rev = self._allocate_doc_rev(doc.rev) +        doc.rev = new_rev +        self._put_doc(old_doc, doc) +        return new_rev + +    def whats_changed(self, old_generation=0):          """ -        # TODO: fix this method so the connection is properly closed and -        # test_close (+tearDown, which deletes the db) works without problems. -        self._url = None -        self._full_commit = None -        self._session = None -        #self._server = None -        self._database = None -        return True - -    def sync(self, url, creds=None, autocreate=True): +        Return a list of documents that have changed since old_generation. + +        :param old_generation: The generation of the database in the old +                               state. +        :type old_generation: int + +        :return: (generation, trans_id, [(doc_id, generation, trans_id),...]) +                 The current generation of the database, its associated +                 transaction id, and a list of of changed documents since +                 old_generation, represented by tuples with for each document +                 its doc_id and the generation and transaction id corresponding +                 to the last intervening change and sorted by generation (old +                 changes first) +        :rtype: (int, str, [(str, int, str)])          """ -        Synchronize documents with remote replica exposed at url. +        # query a couch list function +        res = self._database.resource( +            '_design', 'transactions', '_list', 'whats_changed', 'log') +        response = res.get_json(old_gen=old_generation) +        results = map( +            lambda row: +                (row['generation'], row['doc_id'], row['transaction_id']), +            response[2]['transactions']) +        results.reverse() +        cur_gen = old_generation +        seen = set() +        changes = [] +        newest_trans_id = '' +        for generation, doc_id, trans_id in results: +            if doc_id not in seen: +                changes.append((doc_id, generation, trans_id)) +                seen.add(doc_id) +        if changes: +            cur_gen = changes[0][1]  # max generation +            newest_trans_id = changes[0][2] +            changes.reverse() +        else: +            cur_gen, newest_trans_id = self._get_generation_info() -        @param url: The url of the target replica to sync with. -        @type url: str -        @param creds: optional dictionary giving credentials. -            to authorize the operation with the server. -        @type creds: dict -        @param autocreate: Ask the target to create the db if non-existent. -        @type autocreate: bool +        return cur_gen, newest_trans_id, changes -        @return: The local generation before the synchronisation was performed. -        @rtype: int +    def delete_doc(self, doc):          """ -        return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync( -            autocreate=autocreate) +        Mark a document as deleted. + +        Will abort if the current revision doesn't match doc.rev. +        This will also set doc.content to None. -    #------------------------------------------------------------------------- -    # methods from ObjectStoreDatabase -    #------------------------------------------------------------------------- +        :param doc: The document to mark as deleted. +        :type doc: CouchDocument. -    def _init_u1db_data(self): +        :raise errors.DocumentDoesNotExist: Raised if the document does not +                                            exist. +        :raise errors.RevisionConflict: Raised if the revisions do not match. +        :raise errors.DocumentAlreadyDeleted: Raised if the document is +                                              already deleted. +        :raise errors.ConflictedDoc: Raised if the doc has conflicts. +        """ +        old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) +        if old_doc is None: +            raise errors.DocumentDoesNotExist +        if old_doc.rev != doc.rev: +            raise errors.RevisionConflict() +        if old_doc.is_tombstone(): +            raise errors.DocumentAlreadyDeleted +        if old_doc.has_conflicts: +            raise errors.ConflictedDoc() +        new_rev = self._allocate_doc_rev(doc.rev) +        doc.rev = new_rev +        doc.make_tombstone() +        self._put_doc(old_doc, doc) +        return new_rev + +    def _get_conflicts(self, doc_id, couch_rev=None):          """ -        Initialize u1db configuration data on backend storage. +        Get the conflicted versions of a document. -        A U1DB database needs to keep track of all database transactions, -        document conflicts, the generation of other replicas it has seen, -        indexes created by users and so on. +        If the C{couch_rev} parameter is not None, conflicts for a specific +        document's couch revision are returned. -        In this implementation, all this information is stored in special -        documents stored in the underlying with doc_id prefix equal to -        U1DB_DATA_DOC_ID_PREFIX. Those documents ids are reserved: put_doc(), -        get_doc() and delete_doc() will not allow documents with a doc_id with -        that prefix to be accessed or modified. +        :param couch_rev: The couch document revision. +        :type couch_rev: str + +        :return: A list of conflicted versions of the document. +        :rtype: list          """ -        for key in self.U1DB_DATA_KEYS: -            doc_id = '%s%s' % (self.U1DB_DATA_DOC_ID_PREFIX, key) -            doc = self._get_doc(doc_id) -            if doc is None: -                doc = self._factory(doc_id) -                doc.content = {'content': getattr(self, key)} -                self._put_doc(doc) +        # request conflicts attachment from server +        params = {} +        if couch_rev is not None: +            params['rev'] = couch_rev  # restric document's couch revision +        resource = self._database.resource(doc_id, 'u1db_conflicts') +        try: +            response = resource.get_json(**params) +            conflicts = [] +            # build the conflicted versions +            for doc_rev, content in json.loads(response[2].read()): +                doc = self._factory(doc_id, doc_rev) +                if content is None: +                    doc.make_tombstone() +                else: +                    doc.content = content +                conflicts.append(doc) +            return conflicts +        except ResourceNotFound: +            return [] -    #------------------------------------------------------------------------- -    # Couch specific methods -    #------------------------------------------------------------------------- +    def get_doc_conflicts(self, doc_id): +        """ +        Get the list of conflicts for the given document. -    INDEX_NAME_KEY = 'name' -    INDEX_DEFINITION_KEY = 'definition' -    INDEX_VALUES_KEY = 'values' +        The order of the conflicts is such that the first entry is the value +        that would be returned by "get_doc". -    def delete_database(self): -        """ -        Delete a U1DB CouchDB database. +        :return: A list of the document entries that are conflicted. +        :rtype: [CouchDocument]          """ -        del(self._server[self._dbname]) +        conflict_docs = self._get_conflicts(doc_id) +        if len(conflict_docs) == 0: +            return [] +        this_doc = self._get_doc(doc_id, check_for_conflicts=True) +        return [this_doc] + conflict_docs -    def _dump_indexes_as_json(self): +    def _get_replica_gen_and_trans_id(self, other_replica_uid): +        """ +        Return the last known generation and transaction id for the other db +        replica. + +        When you do a synchronization with another replica, the Database keeps +        track of what generation the other database replica was at, and what +        the associated transaction id was.  This is used to determine what data +        needs to be sent, and if two databases are claiming to be the same +        replica. + +        :param other_replica_uid: The identifier for the other replica. +        :type other_replica_uid: str + +        :return: A tuple containing the generation and transaction id we +                 encountered during synchronization. If we've never +                 synchronized with the replica, this is (0, ''). +        :rtype: (int, str) +        """ +        # query a couch view +        result = self._database.view('syncs/log') +        if len(result[other_replica_uid].rows) == 0: +            return (0, '') +        return ( +            result[other_replica_uid].rows[0]['value']['known_generation'], +            result[other_replica_uid].rows[0]['value']['known_transaction_id'] +        ) + +    def _set_replica_gen_and_trans_id(self, other_replica_uid, +                                      other_generation, other_transaction_id):          """ -        Dump index definitions as JSON. +        Set the last-known generation and transaction id for the other +        database replica. + +        We have just performed some synchronization, and we want to track what +        generation the other replica was at. See also +        _get_replica_gen_and_trans_id. + +        :param other_replica_uid: The U1DB identifier for the other replica. +        :type other_replica_uid: str +        :param other_generation: The generation number for the other replica. +        :type other_generation: int +        :param other_transaction_id: The transaction id associated with the +            generation. +        :type other_transaction_id: str          """ -        indexes = {} -        for name, idx in self._indexes.iteritems(): -            indexes[name] = {} -            for attr in [self.INDEX_NAME_KEY, self.INDEX_DEFINITION_KEY, -                         self.INDEX_VALUES_KEY]: -                indexes[name][attr] = getattr(idx, '_' + attr) -        return indexes +        self._do_set_replica_gen_and_trans_id( +            other_replica_uid, other_generation, other_transaction_id) -    def _load_indexes_from_json(self, indexes): +    def _do_set_replica_gen_and_trans_id( +            self, other_replica_uid, other_generation, other_transaction_id): +        """ +        Set the last-known generation and transaction id for the other +        database replica. + +        We have just performed some synchronization, and we want to track what +        generation the other replica was at. See also +        _get_replica_gen_and_trans_id. + +        :param other_replica_uid: The U1DB identifier for the other replica. +        :type other_replica_uid: str +        :param other_generation: The generation number for the other replica. +        :type other_generation: int +        :param other_transaction_id: The transaction id associated with the +                                     generation. +        :type other_transaction_id: str +        """ +        # query a couch update function +        res = self._database.resource( +            '_design', 'syncs', '_update', 'put', 'u1db_sync_log') +        res.put_json( +            body={ +                'other_replica_uid': other_replica_uid, +                'other_generation': other_generation, +                'other_transaction_id': other_transaction_id, +            }, +            headers={'content-type': 'application/json'}) + +    def _add_conflict(self, doc, my_doc_rev, my_content):          """ -        Load index definitions from stored JSON. +        Add a conflict to the document. + +        Note that this method does not actually update the backend; rather, it +        updates the CouchDocument object which will provide the conflict data +        when the atomic document update is made. + +        :param doc: The document to have conflicts added to. +        :type doc: CouchDocument +        :param my_doc_rev: The revision of the conflicted document. +        :type my_doc_rev: str +        :param my_content: The content of the conflicted document as a JSON +                           serialized string. +        :type my_content: str +        """ +        doc.ensure_fetch_conflicts(self._get_conflicts) +        doc.add_conflict( +            self._factory(doc_id=doc.doc_id, rev=my_doc_rev, +                          json=my_content)) -        @param indexes: A JSON representation of indexes as -            [('index-name', ['field', 'field2', ...]), ...]. -        @type indexes: str +    def _delete_conflicts(self, doc, conflict_revs):          """ -        self._indexes = {} -        for name, idx_dict in indexes.iteritems(): -            idx = InMemoryIndex(name, idx_dict[self.INDEX_DEFINITION_KEY]) -            idx._values = idx_dict[self.INDEX_VALUES_KEY] -            self._indexes[name] = idx +        Delete the conflicted revisions from the list of conflicts of C{doc}. + +        Note that thie method does not actually update the backed; rather, it +        updates the CouchDocument object which will provide the conflict data +        when the atomic document update is made. -    def _load_transaction_log_from_json(self, transaction_log): +        :param doc: The document to have conflicts deleted. +        :type doc: CouchDocument +        :param conflict_revs: A list of the revisions to be deleted. +        :param conflict_revs: [str]          """ -        Load transaction log from stored JSON. +        doc.ensure_fetch_conflicts(self._get_conflicts) +        doc.delete_conflicts(conflict_revs) -        @param transaction_log: A JSON representation of transaction_log as -            [('generation', 'transaction_id'), ...]. -        @type transaction_log: list +    def _prune_conflicts(self, doc, doc_vcr): +        """ +        Prune conflicts that are older then the current document's revision, or +        whose content match to the current document's content. + +        :param doc: The document to have conflicts pruned. +        :type doc: CouchDocument +        :param doc_vcr: A vector clock representing the current document's +                        revision. +        :type doc_vcr: u1db.vectorclock.VectorClock          """ -        self._transaction_log = [] -        for gen, trans_id in transaction_log: -            self._transaction_log.append((gen, trans_id)) +        if doc.has_conflicts is True: +            autoresolved = False +            c_revs_to_prune = [] +            for c_doc in doc.get_conflicts(): +                c_vcr = vectorclock.VectorClockRev(c_doc.rev) +                if doc_vcr.is_newer(c_vcr): +                    c_revs_to_prune.append(c_doc.rev) +                elif doc.same_content_as(c_doc): +                    c_revs_to_prune.append(c_doc.rev) +                    doc_vcr.maximize(c_vcr) +                    autoresolved = True +            if autoresolved: +                doc_vcr.increment(self._replica_uid) +                doc.rev = doc_vcr.as_str() +            self._delete_conflicts(doc, c_revs_to_prune) + +    def _force_doc_sync_conflict(self, doc): +        """ +        Add a conflict and force a document put. -    def _load_other_generations_from_json(self, other_generations): +        :param doc: The document to be put. +        :type doc: CouchDocument          """ -        Load other generations from stored JSON. +        my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) +        self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) +        self._add_conflict(doc, my_doc.rev, my_doc.get_json()) +        doc.has_conflicts = True +        self._put_doc(my_doc, doc) -        @param other_generations: A JSON representation of other_generations -            as {'replica_uid': ('generation', 'transaction_id'), ...}. -        @type other_generations: dict +    def resolve_doc(self, doc, conflicted_doc_revs): +        """ +        Mark a document as no longer conflicted. + +        We take the list of revisions that the client knows about that it is +        superseding. This may be a different list from the actual current +        conflicts, in which case only those are removed as conflicted.  This +        may fail if the conflict list is significantly different from the +        supplied information. (sync could have happened in the background from +        the time you GET_DOC_CONFLICTS until the point where you RESOLVE) + +        :param doc: A Document with the new content to be inserted. +        :type doc: CouchDocument +        :param conflicted_doc_revs: A list of revisions that the new content +                                    supersedes. +        :type conflicted_doc_revs: [str]          """ -        self._other_generations = {} -        for replica_uid, [gen, trans_id] in other_generations.iteritems(): -            self._other_generations[replica_uid] = (gen, trans_id) +        cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) +        new_rev = self._ensure_maximal_rev(cur_doc.rev, +                                           conflicted_doc_revs) +        superseded_revs = set(conflicted_doc_revs) +        doc.rev = new_rev +        if cur_doc.rev in superseded_revs: +            self._delete_conflicts(doc, superseded_revs) +            self._put_doc(cur_doc, doc) +        else: +            self._add_conflict(doc, new_rev, doc.get_json()) +            self._delete_conflicts(doc, superseded_revs) +            # perform request to resolve document in server +            resource = self._database.resource( +                '_design', 'docs', '_update', 'resolve_doc', doc.doc_id) +            conflicts = None +            if doc.has_conflicts: +                conflicts = binascii.b2a_base64( +                    json.dumps( +                        map(lambda cdoc: (cdoc.rev, cdoc.content), +                            doc.get_conflicts())) +                )[:-1]  # exclude \n +            response = resource.put_json( +                body={ +                    'couch_rev': cur_doc.couch_rev, +                    'conflicts': conflicts, +                }, +                headers={'content-type': 'application/json'}) + +    def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, +                          replica_trans_id=''): +        """ +        Insert/update document into the database with a given revision. + +        This api is used during synchronization operations. + +        If a document would conflict and save_conflict is set to True, the +        content will be selected as the 'current' content for doc.doc_id, +        even though doc.rev doesn't supersede the currently stored revision. +        The currently stored document will be added to the list of conflict +        alternatives for the given doc_id. + +        This forces the new content to be 'current' so that we get convergence +        after synchronizing, even if people don't resolve conflicts. Users can +        then notice that their content is out of date, update it, and +        synchronize again. (The alternative is that users could synchronize and +        think the data has propagated, but their local copy looks fine, and the +        remote copy is never updated again.) + +        :param doc: A document object +        :type doc: CouchDocument +        :param save_conflict: If this document is a conflict, do you want to +                              save it as a conflict, or just ignore it. +        :type save_conflict: bool +        :param replica_uid: A unique replica identifier. +        :type replica_uid: str +        :param replica_gen: The generation of the replica corresponding to the +                            this document. The replica arguments are optional, +                            but are used during synchronization. +        :type replica_gen: int +        :param replica_trans_id: The transaction_id associated with the +                                 generation. +        :type replica_trans_id: str + +        :return: (state, at_gen) -  If we don't have doc_id already, or if +                 doc_rev supersedes the existing document revision, then the +                 content will be inserted, and state is 'inserted'.  If +                 doc_rev is less than or equal to the existing revision, then +                 the put is ignored and state is respecitvely 'superseded' or +                 'converged'.  If doc_rev is not strictly superseded or +                 supersedes, then state is 'conflicted'. The document will not +                 be inserted if save_conflict is False.  For 'inserted' or +                 'converged', at_gen is the insertion/current generation. +        :rtype: (str, int) +        """ +        cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) +        # at this point, `doc` has arrived from the other syncing party, and +        # we will decide what to do with it. +        # First, we prepare the arriving doc to update couch database. +        old_doc = doc +        doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) +        if cur_doc is not None: +            doc.couch_rev = cur_doc.couch_rev +        # fetch conflicts because we will eventually manipulate them +        doc.ensure_fetch_conflicts(self._get_conflicts) +        # from now on, it works just like u1db sqlite backend +        doc_vcr = vectorclock.VectorClockRev(doc.rev) +        if cur_doc is None: +            cur_vcr = vectorclock.VectorClockRev(None) +        else: +            cur_vcr = vectorclock.VectorClockRev(cur_doc.rev) +        self._validate_source(replica_uid, replica_gen, replica_trans_id) +        if doc_vcr.is_newer(cur_vcr): +            rev = doc.rev +            self._prune_conflicts(doc, doc_vcr) +            if doc.rev != rev: +                # conflicts have been autoresolved +                state = 'superseded' +            else: +                state = 'inserted' +            self._put_doc(cur_doc, doc) +        elif doc.rev == cur_doc.rev: +            # magical convergence +            state = 'converged' +        elif cur_vcr.is_newer(doc_vcr): +            # Don't add this to seen_ids, because we have something newer, +            # so we should send it back, and we should not generate a +            # conflict +            state = 'superseded' +        elif cur_doc.same_content_as(doc): +            # the documents have been edited to the same thing at both ends +            doc_vcr.maximize(cur_vcr) +            doc_vcr.increment(self._replica_uid) +            doc.rev = doc_vcr.as_str() +            self._put_doc(cur_doc, doc) +            state = 'superseded' +        else: +            state = 'conflicted' +            if save_conflict: +                self._force_doc_sync_conflict(doc) +        if replica_uid is not None and replica_gen is not None: +            self._do_set_replica_gen_and_trans_id( +                replica_uid, replica_gen, replica_trans_id) +        # update info +        old_doc.rev = doc.rev +        if doc.is_tombstone(): +            old_doc.is_tombstone() +        else: +            old_doc.content = doc.content +        old_doc.has_conflicts = doc.has_conflicts +        return state, self._get_generation() -class CouchSyncTarget(ObjectStoreSyncTarget): +class CouchSyncTarget(CommonSyncTarget):      """      Functionality for using a CouchDatabase as a synchronization target.      """ -    pass + +    def get_sync_info(self, source_replica_uid): +        source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( +            source_replica_uid) +        my_gen, my_trans_id = self._db._get_generation_info() +        return ( +            self._db._replica_uid, my_gen, my_trans_id, source_gen, +            source_trans_id) + +    def record_sync_info(self, source_replica_uid, source_replica_generation, +                         source_replica_transaction_id): +        if self._trace_hook: +            self._trace_hook('record_sync_info') +        self._db._set_replica_gen_and_trans_id( +            source_replica_uid, source_replica_generation, +            source_replica_transaction_id)  class NotEnoughCouchPermissions(Exception): @@ -527,12 +1014,12 @@ class CouchServerState(ServerState):          """          Initialize the couch server state. -        @param couch_url: The URL for the couch database. -        @type couch_url: str -        @param shared_db_name: The name of the shared database. -        @type shared_db_name: str -        @param tokens_db_name: The name of the tokens database. -        @type tokens_db_name: str +        :param couch_url: The URL for the couch database. +        :type couch_url: str +        :param shared_db_name: The name of the shared database. +        :type shared_db_name: str +        :param tokens_db_name: The name of the tokens database. +        :type tokens_db_name: str          """          self._couch_url = couch_url          self._shared_db_name = shared_db_name @@ -563,12 +1050,12 @@ class CouchServerState(ServerState):          couch library to ensure that Soledad Server can do everything it needs          on the underlying couch database. -        @param couch_url: The URL of the couch database. -        @type couch_url: str +        :param couch_url: The URL of the couch database. +        :type couch_url: str          @raise NotEnoughCouchPermissions: Raised in case there are not enough              permissions to read/write/create the needed couch databases. -        @rtype: bool +        :rtype: bool          """          def _open_couch_db(dbname): @@ -601,11 +1088,11 @@ class CouchServerState(ServerState):          """          Open a couch database. -        @param dbname: The name of the database to open. -        @type dbname: str +        :param dbname: The name of the database to open. +        :type dbname: str -        @return: The CouchDatabase object. -        @rtype: CouchDatabase +        :return: The CouchDatabase object. +        :rtype: CouchDatabase          """          # TODO: open couch          return CouchDatabase.open_database( @@ -616,11 +1103,11 @@ class CouchServerState(ServerState):          """          Ensure couch database exists. -        @param dbname: The name of the database to ensure. -        @type dbname: str +        :param dbname: The name of the database to ensure. +        :type dbname: str -        @return: The CouchDatabase object and the replica uid. -        @rtype: (CouchDatabase, str) +        :return: The CouchDatabase object and the replica uid. +        :rtype: (CouchDatabase, str)          """          db = CouchDatabase.open_database(              self._couch_url + '/' + dbname, @@ -631,8 +1118,8 @@ class CouchServerState(ServerState):          """          Delete couch database. -        @param dbname: The name of the database to delete. -        @type dbname: str +        :param dbname: The name of the database to delete. +        :type dbname: str          """          CouchDatabase.delete_database(self._couch_url + '/' + dbname) @@ -640,8 +1127,8 @@ class CouchServerState(ServerState):          """          Set the couchdb URL -        @param url: CouchDB URL -        @type url: str +        :param url: CouchDB URL +        :type url: str          """          self._couch_url = url @@ -649,7 +1136,7 @@ class CouchServerState(ServerState):          """          Return CouchDB URL -        @rtype: str +        :rtype: str          """          return self._couch_url diff --git a/common/src/leap/soledad/common/ddocs/README.txt b/common/src/leap/soledad/common/ddocs/README.txt new file mode 100644 index 00000000..5569d929 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/README.txt @@ -0,0 +1,34 @@ +This directory holds a folder structure containing javascript files that +represent the design documents needed by the CouchDB U1DB backend. These files +are compiled into the `../ddocs.py` file by setuptools when creating the +source distribution. + +The following table depicts the U1DB CouchDB backend method and the URI that +is queried to obtain/update data from/to the server. + +   +----------------------------------+------------------------------------------------------------------+ +   | u1db backend method              | URI                                                              | +   |----------------------------------+------------------------------------------------------------------| +   | _get_generation                  | _design/transactions/_list/generation/log                        | +   | _get_generation_info             | _design/transactions/_list/generation/log                        | +   | _get_trans_id_for_gen            | _design/transactions/_list/trans_id_for_gen/log                  | +   | _get_transaction_log             | _design/transactions/_view/log                                   | +   | _get_doc (*)                     | _design/docs/_view/get?key=<doc_id>                              | +   | _has_conflicts                   | _design/docs/_view/get?key=<doc_id>                              | +   | get_all_docs                     | _design/docs/_view/get                                           | +   | _put_doc                         | _design/docs/_update/put/<doc_id>                                | +   | _whats_changed                   | _design/transactions/_list/whats_changed/log?old_gen=<gen>       | +   | _get_conflicts (*)               | _design/docs/_view/conflicts?key=<doc_id>                        | +   | _get_replica_gen_and_trans_id    | _design/syncs/_view/log?other_replica_uid=<uid>                  | +   | _do_set_replica_gen_and_trans_id | _design/syncs/_update/put/u1db_sync_log                          | +   | _add_conflict                    | _design/docs/_update/add_conflict/<doc_id>                       | +   | _delete_conflicts                | _design/docs/_update/delete_conflicts/<doc_id>?doc_rev=<doc_rev> | +   | list_indexes                     | not implemented                                                  | +   | _get_index_definition            | not implemented                                                  | +   | delete_index                     | not implemented                                                  | +   | _get_indexed_fields              | not implemented                                                  | +   | _put_and_update_indexes          | not implemented                                                  | +   +----------------------------------+------------------------------------------------------------------+ + +(*) These methods also request CouchDB document attachments that store U1DB +    document contents. diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js new file mode 100644 index 00000000..5a4647de --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/updates/put.js @@ -0,0 +1,64 @@ +function(doc, req){ +    /* we expect to receive the following in `req.body`: +     * { +     *     'couch_rev': '<couch_rev>', +     *     'u1db_rev': '<u1db_rev>', +     *     'content': '<base64 encoded content>', +     *     'trans_id': '<reansaction_id>' +     *     'conflicts': '<base64 encoded conflicts>', +     *     'update_conflicts': <boolean> +     * } +     */ +    var body = JSON.parse(req.body); + +    // create a new document document +    if (!doc) { +        doc = {} +        doc['_id'] = req['id']; +    } +    // or fail if couch revisions do not match +    else if (doc['_rev'] != body['couch_rev']) { +        // of fail if revisions do not match +        return [null, 'revision conflict'] +    } + +    // store u1db rev +    doc.u1db_rev = body['u1db_rev']; + +    // save content as attachment +    if (body['content'] != null) { +        // save u1db content as attachment +        if (!doc._attachments) +            doc._attachments = {}; +        doc._attachments.u1db_content =  { +            content_type: "application/octet-stream", +            data: body['content']  // should be base64 encoded +        }; +    } +    // or delete the attachment if document is tombstone +    else if (doc._attachments && +             doc._attachments.u1db_content) +        delete doc._attachments.u1db_content; + +    // store the transaction id +    if (!doc.u1db_transactions) +        doc.u1db_transactions = []; +    var d = new Date(); +    doc.u1db_transactions.push([d.getTime(), body['trans_id']]); + +    // save conflicts as attachment if they were sent +    if (body['update_conflicts']) +        if (body['conflicts'] != null) { +            if (!doc._attachments) +                doc._attachments = {}; +            doc._attachments.u1db_conflicts = { +                content_type: "application/octet-stream", +                data: body['conflicts']  // should be base64 encoded +            } +        } else { +            if(doc._attachments && doc._attachments.u1db_conflicts) +                delete doc._attachments.u1db_conflicts +        } + +    return [doc, 'ok']; +} diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js new file mode 100644 index 00000000..7ba66cf8 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js @@ -0,0 +1,39 @@ +function(doc, req){ +    /* we expect to receive the following in `req.body`: +     * { +     *     'couch_rev': '<couch_rev>', +     *     'conflicts': '<base64 encoded conflicts>', +     * } +     */ +    var body = JSON.parse(req.body); + +    // fail if no document was given +    if (!doc) { +        return [null, 'document does not exist'] +    }  + +    // fail if couch revisions do not match +    if (body['couch_rev'] != null +        && doc['_rev'] != body['couch_rev']) { +        return [null, 'revision conflict'] +    } + +    // fail if conflicts were not sent +    if (body['conflicts'] == null) +        return [null, 'missing conflicts'] + +    // save conflicts as attachment if they were sent +    if (body['conflicts'] != null) { +        if (!doc._attachments) +            doc._attachments = {}; +        doc._attachments.u1db_conflicts = { +            content_type: "application/octet-stream", +            data: body['conflicts']  // should be base64 encoded +        } +    } +    // or delete attachment if there are no conflicts +    else if (doc._attachments && doc._attachments.u1db_conflicts) +        delete doc._attachments.u1db_conflicts; + +    return [doc, 'ok']; +} diff --git a/common/src/leap/soledad/common/ddocs/docs/views/get/map.js b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js new file mode 100644 index 00000000..ae08d9e9 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js @@ -0,0 +1,20 @@ +function(doc) { +    if (doc.u1db_rev) { +        var is_tombstone = true; +        var has_conflicts = false; +        if (doc._attachments) { +            if (doc._attachments.u1db_content) +                is_tombstone = false; +            if (doc._attachments.u1db_conflicts) +                has_conflicts = true; +        } +        emit(doc._id, +            { +                "couch_rev": doc._rev, +                "u1db_rev": doc.u1db_rev, +                "is_tombstone": is_tombstone, +                "has_conflicts": has_conflicts, +            } +        ); +    } +} diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js new file mode 100644 index 00000000..722f695a --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -0,0 +1,22 @@ +function(doc, req){ +    if (!doc) { +        doc = {} +        doc['_id'] = 'u1db_sync_log'; +        doc['syncs'] = []; +    } +    body = JSON.parse(req.body); +    // remove outdated info +    doc['syncs'] = doc['syncs'].filter( +        function (entry) { +            return entry[0] != body['other_replica_uid']; +        } +    ); +    // store u1db rev +    doc['syncs'].push([ +        body['other_replica_uid'], +        body['other_generation'], +        body['other_transaction_id'] +    ]); +    return [doc, 'ok']; +} + diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js new file mode 100644 index 00000000..a63c7cf4 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js @@ -0,0 +1,12 @@ +function(doc) { +    if (doc._id == 'u1db_sync_log') { +        if (doc.syncs) +            doc.syncs.forEach(function (entry) { +                emit(entry[0], +                    { +                        'known_generation': entry[1], +                        'known_transaction_id': entry[2] +                    }); +            }); +    } +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js new file mode 100644 index 00000000..dbdfff0d --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js @@ -0,0 +1,20 @@ +function(head, req) { +    var row; +    var rows=[]; +    // fetch all rows +    while(row = getRow()) { +        rows.push(row); +    } +    if (rows.length > 0) +        send(JSON.stringify({ +            "generation": rows.length, +            "doc_id": rows[rows.length-1]['id'], +            "transaction_id": rows[rows.length-1]['value'] +        })); +    else +        send(JSON.stringify({ +            "generation": 0, +            "doc_id": "", +            "transaction_id": "", +        })); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js new file mode 100644 index 00000000..2ec91794 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js @@ -0,0 +1,19 @@ +function(head, req) { +    var row; +    var rows=[]; +    var i = 1; +    var gen = 1; +    if (req.query.gen) +        gen = parseInt(req.query['gen']); +    // fetch all rows +    while(row = getRow()) +        rows.push(row); +    if (gen <= rows.length) +        send(JSON.stringify({ +            "generation": gen, +            "doc_id": rows[gen-1]['id'], +            "transaction_id": rows[gen-1]['value'], +        })); +    else +        send('{}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js new file mode 100644 index 00000000..b35cdf51 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js @@ -0,0 +1,22 @@ +function(head, req) { +    var row; +    var gen = 1; +    var old_gen = 0; +    if (req.query.old_gen) +        old_gen = parseInt(req.query['old_gen']); +    send('{"transactions":[\n'); +    // fetch all rows +    while(row = getRow()) { +        if (gen > old_gen) { +            if (gen > old_gen+1) +                send(',\n'); +            send(JSON.stringify({ +                "generation": gen, +                "doc_id": row["id"], +                "transaction_id": row["value"] +            })); +        } +        gen++; +    } +    send('\n]}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js new file mode 100644 index 00000000..94ef63ca --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js @@ -0,0 +1,7 @@ +function(doc) { +    if (doc.u1db_transactions) +        doc.u1db_transactions.forEach(function(t) { +            emit(t[0],  // use timestamp as key so the results are ordered +                 t[1]); // value is the transaction_id +        }); +} diff --git a/common/src/leap/soledad/common/objectstore.py b/common/src/leap/soledad/common/objectstore.py deleted file mode 100644 index 7aff3e32..00000000 --- a/common/src/leap/soledad/common/objectstore.py +++ /dev/null @@ -1,282 +0,0 @@ -# -*- coding: utf-8 -*- -# objectstore.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -Abstract U1DB backend to handle storage using object stores (like CouchDB, for -example). - -This backend uses special documents to store all U1DB data (replica uid, -indexes, transaction logs and info about other dbs). The id of these documents -are reserved and have prefix equal to ObjectStore.U1DB_DATA_DOC_ID_PREFIX. - -Right now, this is only used by CouchDatabase backend, but can also be -extended to implement OpenStack or Amazon S3 storage, for example. - -See U1DB documentation for more information on how to use databases. -""" - - -from base64 import b64encode, b64decode - - -import uuid -import simplejson as json - - -from u1db import errors -from u1db.backends.inmemory import ( -    InMemoryDatabase, -    InMemorySyncTarget, -) - - -class ObjectStoreDatabase(InMemoryDatabase): -    """ -    A backend for storing u1db data in an object store. -    """ - -    U1DB_DATA_DOC_ID_PREFIX = 'u1db/' - -    @classmethod -    def open_database(cls, url, create, document_factory=None): -        """ -        Open a U1DB database using an object store as backend. - -        @param url: the url of the database replica -        @type url: str -        @param create: should the replica be created if it does not exist? -        @type create: bool -        @param document_factory: A function that will be called with the same -            parameters as Document.__init__. -        @type document_factory: callable - -        @return: the database instance -        @rtype: CouchDatabase -        """ -        raise NotImplementedError(cls.open_database) - -    def __init__(self, replica_uid=None, document_factory=None): -        """ -        Initialize the object store database. - -        @param replica_uid: an optional unique replica identifier -        @type replica_uid: str -        @param document_factory: A function that will be called with the same -            parameters as Document.__init__. -        @type document_factory: callable -        """ -        InMemoryDatabase.__init__( -            self, -            replica_uid, -            document_factory=document_factory) -        if self._replica_uid is None: -            self._replica_uid = uuid.uuid4().hex -        self._init_u1db_data() - -    def _init_u1db_data(self): -        """ -        Initialize u1db configuration data on backend storage. - -        A U1DB database needs to keep track of all database transactions, -        document conflicts, the generation of other replicas it has seen, -        indexes created by users and so on. - -        In this implementation, all this information is stored in special -        documents stored in the couch db with id prefix equal to -        U1DB_DATA_DOC_ID_PREFIX.  Those documents ids are reserved: -        put_doc(), get_doc() and delete_doc() will not allow documents with -        a doc_id with that prefix to be accessed or modified. -        """ -        raise NotImplementedError(self._init_u1db_data) - -    #------------------------------------------------------------------------- -    # methods from Database -    #------------------------------------------------------------------------- - -    def put_doc(self, doc): -        """ -        Update a document. - -        If the document currently has conflicts, put will fail. -        If the database specifies a maximum document size and the document -        exceeds it, put will fail and raise a DocumentTooBig exception. - -        This method prevents from updating the document with doc_id equals to -        self.U1DB_DATA_DOC_ID, which contains U1DB data. - -        @param doc: A Document with new content. -        @type doc: Document - -        @return: new_doc_rev - The new revision identifier for the document. -            The Document object will also be updated. -        @rtype: str -        """ -        if doc.doc_id is not None and \ -                doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): -            raise errors.InvalidDocId() -        return InMemoryDatabase.put_doc(self, doc) - -    def _put_doc(self, doc): -        """ -        Update a document. - -        This is called everytime we just want to do a raw put on the db (i.e. -        without index updates, document constraint checks, and conflict -        checks). - -        @param doc: The document to update. -        @type doc: u1db.Document - -        @return: The new revision identifier for the document. -        @rtype: str -        """ -        raise NotImplementedError(self._put_doc) - -    def get_doc(self, doc_id, include_deleted=False): -        """ -        Get the JSON string for the given document. - -        This method prevents from getting the document with doc_id equals to -        self.U1DB_DATA_DOC_ID, which contains U1DB data. - -        @param doc_id: The unique document identifier -        @type doc_id: str -        @param include_deleted: If set to True, deleted documents will be -            returned with empty content. Otherwise asking for a deleted -            document will return None. -        @type include_deleted: bool - -        @return: a Document object. -        @rtype: Document -        """ -        if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): -            raise errors.InvalidDocId() -        return InMemoryDatabase.get_doc(self, doc_id, include_deleted) - -    def _get_doc(self, doc_id): -        """ -        Get just the document content, without fancy handling. - -        @param doc_id: The unique document identifier -        @type doc_id: str -        @param include_deleted: If set to True, deleted documents will be -            returned with empty content. Otherwise asking for a deleted -            document will return None. -        @type include_deleted: bool - -        @return: a Document object. -        @type: u1db.Document -        """ -        raise NotImplementedError(self._get_doc) - -    def get_all_docs(self, include_deleted=False): -        """ -        Get the JSON content for all documents in the database. - -        @param include_deleted: If set to True, deleted documents will be -            returned with empty content. Otherwise deleted documents will not -            be included in the results. -        @type include_deleted: bool - -        @return: (generation, [Document]) -            The current generation of the database, followed by a list of all -            the documents in the database. -        @rtype: tuple -        """ -        generation = self._get_generation() -        results = [] -        for doc_id in self._database: -            if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): -                continue -            doc = self._get_doc(doc_id, check_for_conflicts=True) -            if doc.content is None and not include_deleted: -                continue -            results.append(doc) -        return (generation, results) - -    def delete_doc(self, doc): -        """ -        Mark a document as deleted. - -        This method prevents from deleting the document with doc_id equals to -        self.U1DB_DATA_DOC_ID, which contains U1DB data. - -        @param doc: The document to mark as deleted. -        @type doc: u1db.Document - -        @return: The new revision id of the document. -        @type: str -        """ -        if doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): -            raise errors.InvalidDocId() -        old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) -        if old_doc is None: -            raise errors.DocumentDoesNotExist -        if old_doc.rev != doc.rev: -            raise errors.RevisionConflict() -        if old_doc.is_tombstone(): -            raise errors.DocumentAlreadyDeleted -        if old_doc.has_conflicts: -            raise errors.ConflictedDoc() -        new_rev = self._allocate_doc_rev(doc.rev) -        doc.rev = new_rev -        doc.make_tombstone() -        self._put_and_update_indexes(old_doc, doc) -        return new_rev - -    # index-related methods - -    def create_index(self, index_name, *index_expressions): -        """ -        Create a named index, which can then be queried for future lookups. - -        See U1DB documentation for more information. - -        @param index_name: A unique name which can be used as a key prefix. -        @param index_expressions: Index expressions defining the index -            information. -        """ -        raise NotImplementedError(self.create_index) - -    #------------------------------------------------------------------------- -    # implemented methods from CommonBackend -    #------------------------------------------------------------------------- - -    def _put_and_update_indexes(self, old_doc, doc): -        """ -        Update a document and all indexes related to it. - -        @param old_doc: The old version of the document. -        @type old_doc: u1db.Document -        @param doc: The new version of the document. -        @type doc: u1db.Document -        """ -        for index in self._indexes.itervalues(): -            if old_doc is not None and not old_doc.is_tombstone(): -                index.remove_json(old_doc.doc_id, old_doc.get_json()) -            if not doc.is_tombstone(): -                index.add_json(doc.doc_id, doc.get_json()) -        trans_id = self._allocate_transaction_id() -        self._put_doc(doc) -        self._transaction_log.append((doc.doc_id, trans_id)) - - -class ObjectStoreSyncTarget(InMemorySyncTarget): -    """ -    Functionality for using an ObjectStore as a synchronization target. -    """ diff --git a/common/src/leap/soledad/common/tests/couchdb.ini.template b/common/src/leap/soledad/common/tests/couchdb.ini.template index 7d0316f0..217ae201 100644 --- a/common/src/leap/soledad/common/tests/couchdb.ini.template +++ b/common/src/leap/soledad/common/tests/couchdb.ini.template @@ -74,6 +74,8 @@ use_users_db = false  [query_servers]  ; javascript = %(tempdir)s/server/main.js +javascript = /usr/bin/couchjs /usr/share/couchdb/server/main.js +coffeescript = /usr/bin/couchjs /usr/share/couchdb/server/main-coffee.js  ; Changing reduce_limit to false will disable reduce_limit. @@ -219,4 +221,4 @@ min_file_size = 131072  ;[admins]  ;testuser = -hashed-f50a252c12615697c5ed24ec5cd56b05d66fe91e,b05471ba260132953930cf9f97f327f5 -; pass for above user is 'testpass'
\ No newline at end of file +; pass for above user is 'testpass' diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 42edf9fe..72346333 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -25,9 +25,11 @@ import copy  import shutil  from base64 import b64decode +from couchdb.client import Server +from u1db import errors +  from leap.common.files import mkdir_p -from leap.soledad.common.document import SoledadDocument  from leap.soledad.common.tests import u1db_tests as tests  from leap.soledad.common.tests.u1db_tests import test_backends  from leap.soledad.common.tests.u1db_tests import test_sync @@ -148,7 +150,7 @@ class TestCouchBackendImpl(CouchDBTestCase):      def test__allocate_doc_id(self):          db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') +                                 'u1db_tests', ensure_ddocs=True)          doc_id1 = db._allocate_doc_id()          self.assertTrue(doc_id1.startswith('D-'))          self.assertEqual(34, len(doc_id1)) @@ -163,32 +165,51 @@ class TestCouchBackendImpl(CouchDBTestCase):  def make_couch_database_for_test(test, replica_uid):      port = str(test.wrapper.port)      return couch.CouchDatabase('http://localhost:' + port, replica_uid, -                               replica_uid=replica_uid or 'test') +                               replica_uid=replica_uid or 'test', +                               ensure_ddocs=True)  def copy_couch_database_for_test(test, db):      port = str(test.wrapper.port) -    new_db = couch.CouchDatabase('http://localhost:' + port, -                                 db._replica_uid + '_copy', +    couch_url = 'http://localhost:' + port +    new_dbname = db._replica_uid + '_copy' +    new_db = couch.CouchDatabase(couch_url, +                                 new_dbname,                                   replica_uid=db._replica_uid or 'test') -    gen, docs = db.get_all_docs(include_deleted=True) -    for doc in docs: -        new_db._put_doc(doc) -    new_db._transaction_log = copy.deepcopy(db._transaction_log) -    new_db._conflicts = copy.deepcopy(db._conflicts) -    new_db._other_generations = copy.deepcopy(db._other_generations) -    new_db._indexes = copy.deepcopy(db._indexes) -    # save u1db data on couch -    for key in new_db.U1DB_DATA_KEYS: -        doc_id = '%s%s' % (new_db.U1DB_DATA_DOC_ID_PREFIX, key) -        doc = new_db._get_doc(doc_id) -        doc.content = {'content': getattr(new_db, key)} -        new_db._put_doc(doc) +    # copy all docs +    old_couch_db = Server(couch_url)[db._replica_uid] +    new_couch_db = Server(couch_url)[new_dbname] +    for doc_id in old_couch_db: +        doc = old_couch_db.get(doc_id) +        # copy design docs +        if ('u1db_rev' not in doc): +            new_couch_db.save(doc) +        # copy u1db docs +        else: +            new_doc = { +                '_id': doc['_id'], +                'u1db_transactions': doc['u1db_transactions'], +                'u1db_rev': doc['u1db_rev'] +            } +            attachments = [] +            if ('u1db_conflicts' in doc): +                new_doc['u1db_conflicts'] = doc['u1db_conflicts'] +                for c_rev in doc['u1db_conflicts']: +                    attachments.append('u1db_conflict_%s' % c_rev) +            new_couch_db.save(new_doc) +            # save conflict data +            attachments.append('u1db_content') +            for att_name in attachments: +                att = old_couch_db.get_attachment(doc_id, att_name) +                if (att is not None): +                    new_couch_db.put_attachment(new_doc, att, +                                                filename=att_name)      return new_db  def make_document_for_test(test, doc_id, rev, content, has_conflicts=False): -    return SoledadDocument(doc_id, rev, content, has_conflicts=has_conflicts) +    return couch.CouchDocument( +        doc_id, rev, content, has_conflicts=has_conflicts)  COUCH_SCENARIOS = [ @@ -202,8 +223,22 @@ class CouchTests(test_backends.AllDatabaseTests, CouchDBTestCase):      scenarios = COUCH_SCENARIOS +    def setUp(self): +        test_backends.AllDatabaseTests.setUp(self) +        # save db info because of test_close +        self._server = self.db._server +        self._dbname = self.db._dbname +      def tearDown(self): -        self.db.delete_database() +        # if current test is `test_close` we have to use saved objects to +        # delete the database because the close() method will have removed the +        # references needed to do it using the CouchDatabase. +        if self.id() == \ +                'leap.soledad.common.tests.test_couch.CouchTests.' \ +                'test_close(couch)': +            del(self._server[self._dbname]) +        else: +            self.db.delete_database()          test_backends.AllDatabaseTests.tearDown(self) @@ -246,17 +281,16 @@ class CouchWithConflictsTests(          test_backends.LocalDatabaseWithConflictsTests.tearDown(self) -# Notice: the CouchDB backend is currently used for storing encrypted data in -# the server, so indexing makes no sense. Thus, we ignore index testing for -# now. - -class CouchIndexTests(test_backends.DatabaseIndexTests, CouchDBTestCase): +# Notice: the CouchDB backend does not have indexing capabilities, so we do +# not test indexing now. -    scenarios = COUCH_SCENARIOS - -    def tearDown(self): -        self.db.delete_database() -        test_backends.DatabaseIndexTests.tearDown(self) +#class CouchIndexTests(test_backends.DatabaseIndexTests, CouchDBTestCase): +# +#    scenarios = COUCH_SCENARIOS +# +#    def tearDown(self): +#        self.db.delete_database() +#        test_backends.DatabaseIndexTests.tearDown(self)  #----------------------------------------------------------------------------- @@ -311,6 +345,89 @@ class CouchDatabaseSyncTargetTests(test_sync.DatabaseSyncTargetTests,                   [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]}) +# The following tests need that the database have an index, so we fake one. +old_class = couch.CouchDatabase + +from u1db.backends.inmemory import InMemoryIndex + + +class IndexedCouchDatabase(couch.CouchDatabase): + +    def __init__(self, url, dbname, replica_uid=None, full_commit=True, +                     session=None, ensure_ddocs=True): +        old_class.__init__(self, url, dbname, replica_uid, full_commit, +                           session, ensure_ddocs=True) +        self._indexes = {} + +    def _put_doc(self, old_doc, doc): +        for index in self._indexes.itervalues(): +            if old_doc is not None and not old_doc.is_tombstone(): +                index.remove_json(old_doc.doc_id, old_doc.get_json()) +            if not doc.is_tombstone(): +                index.add_json(doc.doc_id, doc.get_json()) +        old_class._put_doc(self, old_doc, doc) + +    def create_index(self, index_name, *index_expressions): +        if index_name in self._indexes: +            if self._indexes[index_name]._definition == list( +                    index_expressions): +                return +            raise errors.IndexNameTakenError +        index = InMemoryIndex(index_name, list(index_expressions)) +        _, all_docs = self.get_all_docs() +        for doc in all_docs: +            index.add_json(doc.doc_id, doc.get_json()) +        self._indexes[index_name] = index + +    def delete_index(self, index_name): +        del self._indexes[index_name] + +    def list_indexes(self): +        definitions = [] +        for idx in self._indexes.itervalues(): +            definitions.append((idx._name, idx._definition)) +        return definitions + +    def get_from_index(self, index_name, *key_values): +        try: +            index = self._indexes[index_name] +        except KeyError: +            raise errors.IndexDoesNotExist +        doc_ids = index.lookup(key_values) +        result = [] +        for doc_id in doc_ids: +            result.append(self._get_doc(doc_id, check_for_conflicts=True)) +        return result + +    def get_range_from_index(self, index_name, start_value=None, +                             end_value=None): +        """Return all documents with key values in the specified range.""" +        try: +            index = self._indexes[index_name] +        except KeyError: +            raise errors.IndexDoesNotExist +        if isinstance(start_value, basestring): +            start_value = (start_value,) +        if isinstance(end_value, basestring): +            end_value = (end_value,) +        doc_ids = index.lookup_range(start_value, end_value) +        result = [] +        for doc_id in doc_ids: +            result.append(self._get_doc(doc_id, check_for_conflicts=True)) +        return result + +    def get_index_keys(self, index_name): +        try: +            index = self._indexes[index_name] +        except KeyError: +            raise errors.IndexDoesNotExist +        keys = index.keys() +        # XXX inefficiency warning +        return list(set([tuple(key.split('\x01')) for key in keys])) + + +couch.CouchDatabase = IndexedCouchDatabase +  sync_scenarios = []  for name, scenario in COUCH_SCENARIOS:      scenario = dict(scenario) @@ -344,98 +461,4 @@ class CouchDatabaseSyncTests(test_sync.DatabaseSyncTests, CouchDBTestCase):          test_sync.DatabaseSyncTests.tearDown(self) -#----------------------------------------------------------------------------- -# The following tests test extra functionality introduced by our backends -#----------------------------------------------------------------------------- - -class CouchDatabaseStorageTests(CouchDBTestCase): - -    def _listify(self, l): -        if type(l) is dict: -            return { -                self._listify(a): self._listify(b) for a, b in l.iteritems()} -        if hasattr(l, '__iter__'): -            return [self._listify(i) for i in l] -        return l - -    def _fetch_u1db_data(self, db, key): -        doc = db._get_doc("%s%s" % (db.U1DB_DATA_DOC_ID_PREFIX, key)) -        return doc.content['content'] - -    def test_transaction_log_storage_after_put(self): -        db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') -        db.create_doc({'simple': 'doc'}) -        content = self._fetch_u1db_data(db, db.U1DB_TRANSACTION_LOG_KEY) -        self.assertEqual( -            self._listify(db._transaction_log), -            self._listify(content)) - -    def test_conflict_log_storage_after_put_if_newer(self): -        db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') -        doc = db.create_doc({'simple': 'doc'}) -        doc.set_json(nested_doc) -        doc.rev = db._replica_uid + ':2' -        db._force_doc_sync_conflict(doc) -        content = self._fetch_u1db_data(db, db.U1DB_CONFLICTS_KEY) -        self.assertEqual( -            self._listify(db._conflicts), -            self._listify(content)) - -    def test_other_gens_storage_after_set(self): -        db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') -        doc = db.create_doc({'simple': 'doc'}) -        db._set_replica_gen_and_trans_id('a', 'b', 'c') -        content = self._fetch_u1db_data(db, db.U1DB_OTHER_GENERATIONS_KEY) -        self.assertEqual( -            self._listify(db._other_generations), -            self._listify(content)) - -    def test_index_storage_after_create(self): -        db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') -        doc = db.create_doc({'name': 'john'}) -        db.create_index('myindex', 'name') -        content = self._fetch_u1db_data(db, db.U1DB_INDEXES_KEY) -        myind = db._indexes['myindex'] -        index = { -            'myindex': { -                'definition': myind._definition, -                'name': myind._name, -                'values': myind._values, -            } -        } -        self.assertEqual( -            self._listify(index), -            self._listify(content)) - -    def test_index_storage_after_delete(self): -        db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') -        doc = db.create_doc({'name': 'john'}) -        db.create_index('myindex', 'name') -        db.create_index('myindex2', 'name') -        db.delete_index('myindex') -        content = self._fetch_u1db_data(db, db.U1DB_INDEXES_KEY) -        myind = db._indexes['myindex2'] -        index = { -            'myindex2': { -                'definition': myind._definition, -                'name': myind._name, -                'values': myind._values, -            } -        } -        self.assertEqual( -            self._listify(index), -            self._listify(content)) - -    def test_replica_uid_storage_after_db_creation(self): -        db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), -                                 'u1db_tests') -        content = self._fetch_u1db_data(db, db.U1DB_REPLICA_UID_KEY) -        self.assertEqual(db._replica_uid, content) - -  load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py new file mode 100644 index 00000000..a0c473b1 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -0,0 +1,339 @@ +# -*- coding: utf-8 -*- +# test_soledad.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +""" + +import os +import mock +import tempfile +import threading + +from leap.soledad.client import Soledad +from leap.soledad.common.couch import CouchDatabase, CouchServerState +from leap.soledad.common.tests.test_couch import CouchDBTestCase +from leap.soledad.common.tests.u1db_tests import TestCaseWithServer +from leap.soledad.common.tests.test_target import ( +    make_token_soledad_app, +    make_leap_document_for_test, +    token_leap_sync_target, +) + + +REPEAT_TIMES = 20 + + +class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): + +    @staticmethod +    def make_app_after_state(state): +        return make_token_soledad_app(state) + +    make_document_for_test = make_leap_document_for_test + +    sync_target = token_leap_sync_target + +    def _soledad_instance(self, user='user-uuid', passphrase=u'123', +                          prefix='', +                          secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME, +                          local_db_path='soledad.u1db', server_url='', +                          cert_file=None, auth_token=None, secret_id=None): +        """ +        Instantiate Soledad. +        """ + +        # this callback ensures we save a document which is sent to the shared +        # db. +        def _put_doc_side_effect(doc): +            self._doc_put = doc + +        # we need a mocked shared db or else Soledad will try to access the +        # network to find if there are uploaded secrets. +        class MockSharedDB(object): + +            get_doc = mock.Mock(return_value=None) +            put_doc = mock.Mock(side_effect=_put_doc_side_effect) +            lock = mock.Mock(return_value=('atoken', 300)) +            unlock = mock.Mock() + +            def __call__(self): +                return self + +        Soledad._shared_db = MockSharedDB() +        return Soledad( +            user, +            passphrase, +            secrets_path=os.path.join(self.tempdir, prefix, secrets_path), +            local_db_path=os.path.join( +                self.tempdir, prefix, local_db_path), +            server_url=server_url, +            cert_file=cert_file, +            auth_token=auth_token, +            secret_id=secret_id) + +    def make_app(self): +        self.request_state = CouchServerState(self._couch_url, 'shared', +                                              'tokens') +        return self.make_app_after_state(self.request_state) + +    def setUp(self): +        TestCaseWithServer.setUp(self) +        CouchDBTestCase.setUp(self) +        self._couch_url = 'http://localhost:' + str(self.wrapper.port) +        self.db = CouchDatabase( +            self._couch_url, 'user-user-uuid', replica_uid='replica') +        self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") + +    def tearDown(self): +        CouchDBTestCase.tearDown(self) +        TestCaseWithServer.tearDown(self) + +    # +    # Sequential tests +    # + +    def test_correct_transaction_log_after_sequential_puts(self): +        """ +        Assert that the transaction_log increases accordingly with sequential +        puts. +        """ +        doc = self.db.create_doc({'ops': 0}) +        ops = 1 +        docs = [doc.doc_id] +        for i in range(0, REPEAT_TIMES): +            self.assertEqual( +                i+1, len(self.db._get_transaction_log())) +            doc.content['ops'] += 1 +            self.db.put_doc(doc) +            docs.append(doc.doc_id) + +        # assert length of transaction_log +        transaction_log = self.db._get_transaction_log() +        self.assertEqual( +            REPEAT_TIMES+1, len(transaction_log)) + +        # assert that all entries in the log belong to the same doc +        self.assertEqual(REPEAT_TIMES+1, len(docs)) +        for doc_id in docs: +            self.assertEqual( +                REPEAT_TIMES+1, +                len(filter(lambda t: t[0] == doc_id, transaction_log))) + +    def test_correct_transaction_log_after_sequential_deletes(self): +        """ +        Assert that the transaction_log increases accordingly with sequential +        puts and deletes. +        """ +        docs = [] +        for i in range(0, REPEAT_TIMES): +            doc = self.db.create_doc({'ops': 0}) +            self.assertEqual( +                2*i+1, len(self.db._get_transaction_log())) +            docs.append(doc.doc_id) +            self.db.delete_doc(doc) +            self.assertEqual( +                2*i+2, len(self.db._get_transaction_log())) + +        # assert length of transaction_log +        transaction_log = self.db._get_transaction_log() +        self.assertEqual( +            2*REPEAT_TIMES, len(transaction_log)) + +        # assert that each doc appears twice in the transaction_log +        self.assertEqual(REPEAT_TIMES, len(docs)) +        for doc_id in docs: +            self.assertEqual( +                2, +                len(filter(lambda t: t[0] == doc_id, transaction_log))) + +    def test_correct_sync_log_after_sequential_syncs(self): +        """ +        Assert that the sync_log increases accordingly with sequential syncs. +        """ +        self.startServer() +        sol = self._soledad_instance( +            auth_token='auth-token', +            server_url=self.getURL()) + +        def _create_docs_and_sync(sol, syncs): +            # create a lot of documents +            for i in range(0, REPEAT_TIMES): +                sol.create_doc({}) +            # assert sizes of transaction and sync logs +            self.assertEqual( +                syncs*REPEAT_TIMES, +                len(self.db._get_transaction_log())) +            self.assertEqual( +                1 if syncs > 0 else 0, +                len(self.db._database.view('syncs/log').rows)) +            # sync to the remote db +            sol.sync() +            gen, docs = self.db.get_all_docs() +            self.assertEqual((syncs+1)*REPEAT_TIMES, gen) +            self.assertEqual((syncs+1)*REPEAT_TIMES, len(docs)) +            # assert sizes of transaction and sync logs +            self.assertEqual((syncs+1)*REPEAT_TIMES, +                             len(self.db._get_transaction_log())) +            sync_log_rows = self.db._database.view('syncs/log').rows +            sync_log = sync_log_rows[0].value +            replica_uid = sync_log_rows[0].key +            known_gen = sync_log['known_generation'] +            known_trans_id = sync_log['known_transaction_id'] +            # assert sync_log has exactly 1 row +            self.assertEqual(1, len(sync_log_rows)) +            # assert it has the correct replica_uid, gen and trans_id +            self.assertEqual(sol._db._replica_uid, replica_uid) +            sol_gen, sol_trans_id = sol._db._get_generation_info() +            self.assertEqual(sol_gen, known_gen) +            self.assertEqual(sol_trans_id, known_trans_id) + +        _create_docs_and_sync(sol, 0) +        _create_docs_and_sync(sol, 1) + +    # +    # Concurrency tests +    # +     +    class _WorkerThread(threading.Thread): +         +        def __init__(self, params, run_method): +            threading.Thread.__init__(self) +            self._params = params +            self._run_method = run_method + +        def run(self): +            self._run_method(self) + +    def test_correct_transaction_log_after_concurrent_puts(self): +        """ +        Assert that the transaction_log increases accordingly with concurrent +        puts. +        """ +        pool = threading.BoundedSemaphore(value=1) +        threads = [] +        docs = [] + +        def _run_method(self): +            doc = self._params['db'].create_doc({}) +            pool.acquire() +            self._params['docs'].append(doc.doc_id) +            pool.release() + + +        for i in range(0, REPEAT_TIMES): +            thread = self._WorkerThread( +                {'docs': docs, 'db': self.db}, +                _run_method) +            thread.start() +            threads.append(thread) + +        for thread in threads: +            thread.join() +         +        # assert length of transaction_log +        transaction_log = self.db._get_transaction_log() +        self.assertEqual( +            REPEAT_TIMES, len(transaction_log)) + +        # assert all documents are in the log +        self.assertEqual(REPEAT_TIMES, len(docs)) +        for doc_id in docs: +            self.assertEqual( +                1, +                len(filter(lambda t: t[0] == doc_id, transaction_log))) + +    def test_correct_transaction_log_after_concurrent_deletes(self): +        """ +        Assert that the transaction_log increases accordingly with concurrent +        puts and deletes. +        """ +        threads = [] +        docs = [] +        pool = threading.BoundedSemaphore(value=1) + +        # create/delete method that will be run concurrently +        def _run_method(self): +            doc = self._params['db'].create_doc({}) +            pool.acquire() +            docs.append(doc.doc_id) +            pool.release() +            self._params['db'].delete_doc(doc) + +        # launch concurrent threads +        for i in range(0, REPEAT_TIMES): +            thread = self._WorkerThread({'db': self.db}, _run_method) +            thread.start() +            threads.append(thread) + +        # wait for threads to finish +        for thread in threads: +            thread.join() + +        # assert transaction log +        transaction_log = self.db._get_transaction_log() +        self.assertEqual( +            2*REPEAT_TIMES, len(transaction_log)) +        # assert that each doc appears twice in the transaction_log +        self.assertEqual(REPEAT_TIMES, len(docs)) +        for doc_id in docs: +            self.assertEqual( +                2, +                len(filter(lambda t: t[0] == doc_id, transaction_log))) + +    def test_correct_sync_log_after_concurrent_puts_and_sync(self): +        """ +        Assert that the sync_log is correct after concurrent syncs. +        """ +        threads = [] +        docs = [] +        pool = threading.BoundedSemaphore(value=1) +        self.startServer() +        sol = self._soledad_instance( +            auth_token='auth-token', +            server_url=self.getURL()) + +        def _run_method(self): +            # create a lot of documents +            doc = self._params['sol'].create_doc({}) +            pool.acquire() +            docs.append(doc.doc_id) +            pool.release() + +        # launch threads to create documents in parallel +        for i in range(0, REPEAT_TIMES): +            thread = self._WorkerThread( +                {'sol': sol, 'syncs': i}, +                _run_method) +            thread.start() +            threads.append(thread) + +        # wait for threads to finish +        for thread in threads: +            thread.join() +         +        # do the sync! +        sol.sync() + +        transaction_log = self.db._get_transaction_log() +        self.assertEqual(REPEAT_TIMES, len(transaction_log)) +        # assert all documents are in the remote log +        self.assertEqual(REPEAT_TIMES, len(docs)) +        for doc_id in docs: +            self.assertEqual( +                1, +                len(filter(lambda t: t[0] == doc_id, transaction_log))) diff --git a/scripts/migrate_dbs.py b/scripts/migrate_dbs.py new file mode 100644 index 00000000..f1c20d87 --- /dev/null +++ b/scripts/migrate_dbs.py @@ -0,0 +1,288 @@ +#!/usr/bin/python + +import sys +import json +import logging +import argparse +import re +import threading +from urlparse import urlparse +from ConfigParser import ConfigParser +from couchdb.client import Server +from couchdb.http import ResourceNotFound, Resource, Session +from datetime import datetime + +from leap.soledad.common.couch import CouchDatabase + + +# parse command line for the log file name +logger_fname = "/tmp/u1db-couch-db-migration_%s.log" % \ +               str(datetime.now()).replace(' ', '_') +parser = argparse.ArgumentParser() +parser.add_argument('--log', action='store', default=logger_fname, type=str, +                    required=False, help='the name of the log file', nargs=1) +args = parser.parse_args() + + +# configure the logger +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +print "Logging to %s." % args.log +logging.basicConfig( +    filename=args.log, +    format="%(asctime)-15s %(message)s") + + +# configure threads +max_threads = 20 +semaphore_pool = threading.BoundedSemaphore(value=max_threads) + +# get couch url +cp = ConfigParser() +cp.read('/etc/leap/soledad-server.conf') +url = cp.get('soledad-server', 'couch_url') + +resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) +server = Server(url=resource) + +hidden_url = re.sub( +    'http://(.*):.*@', +    'http://\\1:xxxxx@', +    url) + +print """ +========== +ATTENTION! +========== + +This script will modify Soledad's shared and user databases in: + +  %s + +This script does not make a backup of the couch db data, so make sure youj +have a copy or you may loose data. +""" % hidden_url +confirm = raw_input("Proceed (type uppercase YES)? ") + +if confirm != "YES": +    exit(1) + + +# +# Thread +# + +class DocWorkerThread(threading.Thread): + +    def __init__(self, dbname, doc_id, db_idx, db_len, doc_idx, doc_len, +                 transaction_log, conflict_log, release_fun): +        threading.Thread.__init__(self) +        resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) +        server = Server(url=resource) +        self._dbname = dbname +        self._cdb = server[self._dbname] +        self._doc_id = doc_id +        self._db_idx = db_idx +        self._db_len = db_len +        self._doc_idx = doc_idx +        self._doc_len = doc_len +        self._transaction_log = transaction_log +        self._conflict_log = conflict_log +        self._release_fun = release_fun + +    def run(self): + +        old_doc = self._cdb[self._doc_id] + +        # skip non u1db docs +        if 'u1db_rev' not in old_doc: +            logger.debug('(%d/%d) (%d/%d) Skipping %s/%s).' % +                         (self._db_idx, self._db_len, self._doc_idx, +                          self._doc_len, self._dbname, self._doc_id)) +            self._release_fun() +            return +        else: +            logger.debug('(%d/%d) (%d/%d) Processing %s/%s ...' % +                         (self._db_idx, self._db_len, self._doc_idx, +                          self._doc_len, self._dbname, self._doc_id)) + +        doc = { +            '_id': self._doc_id, +            '_rev': old_doc['_rev'], +            'u1db_rev': old_doc['u1db_rev'] +        } +        attachments = [] + +        # add transactions +        doc['u1db_transactions'] = map( +            lambda (gen, doc_id, trans_id): (gen, trans_id), +            filter( +                lambda (gen, doc_id, trans_id): doc_id == doc['_id'], +                self._transaction_log)) +        if len(doc['u1db_transactions']) == 0: +            del doc['u1db_transactions'] + +        # add conflicts +        if doc['_id'] in self._conflict_log: +            attachments.append([ +                conflict_log[doc['_id']], +                'u1db_conflicts', +                "application/octet-stream"]) + +        # move document's content to 'u1db_content' attachment +        content = self._cdb.get_attachment(doc, 'u1db_json') +        if content is not None: +            attachments.append([ +                content, +                'u1db_content', +                "application/octet-stream"]) +        #self._cdb.delete_attachment(doc, 'u1db_json') + +        # save modified doc +        self._cdb.save(doc) + +        # save all doc attachments +        for content, att_name, content_type in attachments: +            self._cdb.put_attachment( +                doc, +                content, +                filename=att_name, +                content_type=content_type) + +        # release the semaphore +        self._release_fun() + + +db_idx = 0 +db_len = len(server) +for dbname in server: + +    db_idx += 1 + +    if not (dbname.startswith('user-') or dbname == 'shared') \ +            or dbname == 'user-test-db': +        logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname)) +        continue + +    logger.info("(%d/%d) Migrating db %s." % (db_idx, db_len, dbname)) + +    # get access to couch db +    cdb = Server(url)[dbname] + +    # get access to soledad db +    sdb = CouchDatabase(url, dbname) + +    # Migration table +    # --------------- +    # +    # * Metadata that was previously stored in special documents migrate to +    #   inside documents, to allow for atomic doc-and-metadata updates. +    # * Doc content attachment name changes. +    # * Indexes are removed, to be implemented in the future possibly as +    #   design docs view functions. +    # +    # +-----------------+-------------------------+-------------------------+ +    # | Data            | old storage             | new storage             | +    # |-----------------+-------------------------+-------------------------+ +    # | doc content     | <doc_id>/u1db_json      | <doc_id>/u1db_content   | +    # | doc conflicts   | u1db/_conflicts         | <doc_id>/u1db_conflicts | +    # | transaction log | u1db/_transaction_log   | doc.u1db_transactions   | +    # | sync log        | u1db/_other_generations | u1db_sync_log           | +    # | indexes         | u1db/_indexes           | not implemented         | +    # | replica uid     | u1db/_replica_uid       | u1db_config             | +    # +-----------------+-------------------------+-------------------------+ + +    def get_att_content(db, doc_id, att_name): +        try: +            return json.loads( +                db.get_attachment( +                    doc_id, att_name).read())['content'] +        except: +            import ipdb +            ipdb.set_trace() + +    # only migrate databases that have the 'u1db/_replica_uid' document +    try: +        metadoc = cdb.get('u1db/_replica_uid') +        replica_uid = get_att_content(cdb, 'u1db/_replica_uid', 'u1db_json') +    except ResourceNotFound: +        continue + +    #--------------------------------------------------------------------- +    # Step 1: Set replica uid. +    #--------------------------------------------------------------------- +    sdb._set_replica_uid(replica_uid) + +    #--------------------------------------------------------------------- +    # Step 2: Obtain metadata. +    #--------------------------------------------------------------------- + +    # obtain the transaction log: [['<doc_id>', '<trans_id>'], ...] +    transaction_log = get_att_content( +        cdb, 'u1db/_transaction_log', 'u1db_json') +    new_transaction_log = [] +    gen = 1 +    for (doc_id, trans_id) in transaction_log: +        new_transaction_log.append((gen, doc_id, trans_id)) +        gen += 1 +    transaction_log = new_transaction_log + +    # obtain the conflict log: {'<doc_id>': ['<rev>', '<content>'], ...} +    conflict_log = get_att_content(cdb, 'u1db/_conflicts', 'u1db_json') + +    # obtain the sync log: +    # {'<replica_uid>': ['<gen>', '<transaction_id>'], ...} +    other_generations = get_att_content( +        cdb, 'u1db/_other_generations', 'u1db_json') + +    #--------------------------------------------------------------------- +    # Step 3: Iterate over all documents in database. +    #--------------------------------------------------------------------- +    doc_len = len(cdb) +    logger.info("(%d, %d) Found %d documents." % (db_idx, db_len, doc_len)) +    doc_idx = 0 +    threads = [] +    for doc_id in cdb: +        doc_idx = doc_idx + 1 + +        semaphore_pool.acquire() +        thread = DocWorkerThread(dbname, doc_id, db_idx, db_len, +                                 doc_idx, doc_len, transaction_log, +                                 conflict_log, semaphore_pool.release) +        thread.daemon = True +        thread.start() +        threads.append(thread) + +    map(lambda thread: thread.join(), threads) + +    #--------------------------------------------------------------------- +    # Step 4: Move sync log. +    #--------------------------------------------------------------------- + +    # move sync log +    sync_doc = { +        '_id': 'u1db_sync_log', +        'syncs': [] +    } + +    for replica_uid in other_generations: +        gen, transaction_id = other_generations[replica_uid] +        sync_doc['syncs'].append([replica_uid, gen, transaction_id]) +    cdb.save(sync_doc) + +    #--------------------------------------------------------------------- +    # Step 5: Delete old meta documents. +    #--------------------------------------------------------------------- + +    # remove unused docs +    for doc_id in ['_transaction_log', '_conflicts', '_other_generations', +            '_indexes', '_replica_uid']: +        for prefix in ['u1db/', 'u1db%2F']: +            try: +                doc = cdb['%s%s' % (prefix, doc_id)] +                logger.info( +                    "(%d/%d) Deleting %s/%s/%s." % +                    (db_idx, db_len, dbname, 'u1db', doc_id)) +                cdb.delete(doc) +            except ResourceNotFound: +                pass | 
