diff options
| author | drebs <drebs@leap.se> | 2013-01-24 20:38:42 -0200 | 
|---|---|---|
| committer | drebs <drebs@leap.se> | 2013-01-24 20:38:42 -0200 | 
| commit | e306d7a884703eba1e44bb587842bc546e8515d0 (patch) | |
| tree | dcdc0513c85a529c3ed73a53fd27e0f5d0268c72 /src | |
| parent | 74fb141d33b195a425f3eece63fdd7bcc0b970ab (diff) | |
| parent | d6a235e90e93b51d46402ed85645842af222bc84 (diff) | |
Merge branch 'feature/soledad-api' into develop
Diffstat (limited to 'src')
| -rw-r--r-- | src/leap/soledad/backends/couch.py | 69 | ||||
| -rw-r--r-- | src/leap/soledad/backends/objectstore.py | 418 | ||||
| -rw-r--r-- | src/leap/soledad/tests/test_couch.py | 39 | 
3 files changed, 107 insertions, 419 deletions
| diff --git a/src/leap/soledad/backends/couch.py b/src/leap/soledad/backends/couch.py index 8ba42d78..c8dadfa8 100644 --- a/src/leap/soledad/backends/couch.py +++ b/src/leap/soledad/backends/couch.py @@ -1,6 +1,8 @@  import uuid  from base64 import b64encode, b64decode +from u1db import errors  from u1db.sync import LocalSyncTarget +from u1db.backends.inmemory import InMemoryIndex  from couchdb.client import Server, Document as CouchDocument  from couchdb.http import ResourceNotFound  from leap.soledad.backends.objectstore import ObjectStore @@ -36,7 +38,7 @@ class CouchDatabase(ObjectStore):          super(CouchDatabase, self).__init__(replica_uid=replica_uid)      #------------------------------------------------------------------------- -    # implemented methods from Database +    # methods from Database      #-------------------------------------------------------------------------      def _get_doc(self, doc_id, check_for_conflicts=False): @@ -95,6 +97,23 @@ class CouchDatabase(ObjectStore):      def get_sync_target(self):          return CouchSyncTarget(self) +    def create_index(self, index_name, *index_expressions): +        if index_name in self._indexes: +            if self._indexes[index_name]._definition == list( +                    index_expressions): +                return +            raise errors.IndexNameTakenError +        index = InMemoryIndex(index_name, list(index_expressions)) +        for doc_id in self._database: +            if doc_id == self.U1DB_DATA_DOC_ID: +                continue +            doc = self._get_doc(doc_id) +            if doc.content is not None: +                index.add_json(doc_id, doc.get_json()) +        self._indexes[index_name] = index +        # save data in object store +        self._set_u1db_data() +      def close(self):          # TODO: fix this method so the connection is properly closed and          # test_close (+tearDown, which deletes the db) works without problems. @@ -110,35 +129,47 @@ class CouchDatabase(ObjectStore):          return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync(              autocreate=autocreate) -    def _initialize(self): +    #------------------------------------------------------------------------- +    # methods from ObjectStore +    #------------------------------------------------------------------------- + +    def _init_u1db_data(self):          if self._replica_uid is None:              self._replica_uid = uuid.uuid4().hex          doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) -        doc.content = {'sync_log': [], -                       'transaction_log': [], -                       'conflict_log': b64encode(json.dumps([])), +        doc.content = {'transaction_log': [], +                       'conflicts': b64encode(json.dumps({})), +                       'other_generations': {}, +                       'indexes': b64encode(json.dumps({})),                         'replica_uid': self._replica_uid}          self._put_doc(doc)      def _get_u1db_data(self): +        # retrieve u1db data from couch db          cdoc = self._database.get(self.U1DB_DATA_DOC_ID)          jsonstr = self._database.get_attachment(cdoc, 'u1db_json').getvalue()          content = json.loads(jsonstr) -        self._sync_log.log = content['sync_log'] -        self._transaction_log.log = content['transaction_log'] -        self._conflict_log.log = json.loads(b64decode(content['conflict_log'])) +        # set u1db database info +        #self._sync_log = content['sync_log'] +        self._transaction_log = content['transaction_log'] +        self._conflicts = json.loads(b64decode(content['conflicts'])) +        self._other_generations = content['other_generations'] +        self._indexes = self._load_indexes_from_json( +            b64decode(content['indexes']))          self._replica_uid = content['replica_uid'] +        # save couch _rev          self._couch_rev = cdoc['_rev']      def _set_u1db_data(self):          doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID)          doc.content = { -            'sync_log': self._sync_log.log, -            'transaction_log': self._transaction_log.log, +            'transaction_log': self._transaction_log,              # Here, the b64 encode ensures that document content              # does not cause strange behaviour in couchdb because              # of encoding. -            'conflict_log': b64encode(json.dumps(self._conflict_log.log)), +            'conflicts': b64encode(json.dumps(self._conflicts)), +            'other_generations': self._other_generations, +            'indexes': b64encode(self._dump_indexes_as_json()),              'replica_uid': self._replica_uid,              '_rev': self._couch_rev}          self._put_doc(doc) @@ -150,6 +181,22 @@ class CouchDatabase(ObjectStore):      def delete_database(self):          del(self._server[self._dbname]) +    def _dump_indexes_as_json(self): +        indexes = {} +        for name, idx in self._indexes.iteritems(): +            indexes[name] = {} +            for attr in ['name', 'definition', 'values']: +                indexes[name][attr] = getattr(idx, '_' + attr) +        return json.dumps(indexes) + +    def _load_indexes_from_json(self, indexes): +        dict = {} +        for name, idx_dict in json.loads(indexes).iteritems(): +            idx = InMemoryIndex(name, idx_dict['definition']) +            idx._values = idx_dict['values'] +            dict[name] = idx +        return dict +  class CouchSyncTarget(LocalSyncTarget): diff --git a/src/leap/soledad/backends/objectstore.py b/src/leap/soledad/backends/objectstore.py index d72a2ecc..588fc7a1 100644 --- a/src/leap/soledad/backends/objectstore.py +++ b/src/leap/soledad/backends/objectstore.py @@ -1,77 +1,35 @@ -from u1db.backends import CommonBackend -from u1db import errors, Document, vectorclock +from u1db.backends.inmemory import InMemoryDatabase +from u1db import errors -class ObjectStore(CommonBackend): +class ObjectStore(InMemoryDatabase):      """      A backend for storing u1db data in an object store.      """      def __init__(self, replica_uid=None): -        # This initialization method should be called after the connection -        # with the database is established in each implementation, so it can -        # ensure that u1db data is configured and up-to-date. -        self.set_document_factory(Document) -        self._sync_log = SyncLog() -        self._transaction_log = TransactionLog() -        self._conflict_log = ConflictLog(self._factory) -        self._replica_uid = replica_uid -        self._ensure_u1db_data() +        super(ObjectStore, self).__init__(replica_uid) +        # sync data in memory with data in object store +        if not self._get_doc(self.U1DB_DATA_DOC_ID): +            self._init_u1db_data() +        self._get_u1db_data()      #------------------------------------------------------------------------- -    # implemented methods from Database +    # methods from Database      #------------------------------------------------------------------------- -    def set_document_factory(self, factory): -        self._factory = factory - -    def set_document_size_limit(self, limit): -        raise NotImplementedError(self.set_document_size_limit) - -    def whats_changed(self, old_generation=0): -        self._get_u1db_data() -        return self._transaction_log.whats_changed(old_generation) - -    def get_doc(self, doc_id, include_deleted=False): -        doc = self._get_doc(doc_id, check_for_conflicts=True) -        if doc is None: -            return None -        if doc.is_tombstone() and not include_deleted: -            return None -        return doc +    def _set_replica_uid(self, replica_uid): +        super(ObjectStore, self)._set_replica_uid(replica_uid) +        self._set_u1db_data()      def _put_doc(self, doc):          raise NotImplementedError(self._put_doc) -    def _update_gen_and_transaction_log(self, doc_id): -        new_gen = self._get_generation() + 1 -        trans_id = self._allocate_transaction_id() -        self._transaction_log.append((new_gen, doc_id, trans_id)) -        self._set_u1db_data() +    def _get_doc(self, doc): +        raise NotImplementedError(self._get_doc) -    def put_doc(self, doc): -        # consistency check -        if doc.doc_id is None: -            raise errors.InvalidDocId() -        self._check_doc_id(doc.doc_id) -        self._check_doc_size(doc) -        # check if document exists -        old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) -        if old_doc and old_doc.has_conflicts: -            raise errors.ConflictedDoc() -        if old_doc and doc.rev is None and old_doc.is_tombstone(): -            new_rev = self._allocate_doc_rev(old_doc.rev) -        else: -            if old_doc is not None: -                if old_doc.rev != doc.rev: -                    raise errors.RevisionConflict() -            else: -                if doc.rev is not None: -                    raise errors.RevisionConflict() -            new_rev = self._allocate_doc_rev(doc.rev) -        doc.rev = new_rev -        self._put_and_update_indexes(old_doc, doc) -        return doc.rev +    def get_all_docs(self, include_deleted=False): +        raise NotImplementedError(self.get_all_docs)      def delete_doc(self, doc):          old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) @@ -89,130 +47,49 @@ class ObjectStore(CommonBackend):          self._put_and_update_indexes(old_doc, doc)          return new_rev -    # start of index-related methods: these are not supported by this backend. +    # index-related methods      def create_index(self, index_name, *index_expressions): -        return False +        raise NotImplementedError(self.create_index)      def delete_index(self, index_name): -        return False - -    def list_indexes(self): -        return [] - -    def get_from_index(self, index_name, *key_values): -        return [] - -    def get_range_from_index(self, index_name, start_value=None, -                             end_value=None): -        return [] - -    def get_index_keys(self, index_name): -        return [] - -    # end of index-related methods: these are not supported by this backend. - -    def get_doc_conflicts(self, doc_id): -        self._get_u1db_data() -        conflict_docs = self._conflict_log.get_conflicts(doc_id) -        if not conflict_docs: -            return [] -        this_doc = self._get_doc(doc_id) -        this_doc.has_conflicts = True -        return [this_doc] + list(conflict_docs) - -    def resolve_doc(self, doc, conflicted_doc_revs): -        cur_doc = self._get_doc(doc.doc_id) -        new_rev = self._ensure_maximal_rev(cur_doc.rev, -                                           conflicted_doc_revs) -        superseded_revs = set(conflicted_doc_revs) -        doc.rev = new_rev -        if cur_doc.rev in superseded_revs: -            self._put_and_update_indexes(cur_doc, doc) -        else: -            self._add_conflict(doc.doc_id, new_rev, doc.get_json()) -        self._delete_conflicts(doc, superseded_revs) - -    def _get_replica_gen_and_trans_id(self, other_replica_uid): -        self._get_u1db_data() -        return self._sync_log.get_replica_gen_and_trans_id(other_replica_uid) +        super(ObjectStore, self).delete_index(index_name) +        self._set_u1db_data() -    def _set_replica_gen_and_trans_id(self, other_replica_uid, -                                      other_generation, other_transaction_id): -        return self._do_set_replica_gen_and_trans_id( -            other_replica_uid, -            other_generation, -            other_transaction_id) +    def _replace_conflicts(self, doc, conflicts): +        super(ObjectStore, self)._replace_conflicts(doc, conflicts) +        self._set_u1db_data()      def _do_set_replica_gen_and_trans_id(self, other_replica_uid,                                           other_generation,                                           other_transaction_id): -        self._sync_log.set_replica_gen_and_trans_id(other_replica_uid, -                                                    other_generation, -                                                    other_transaction_id) +        super(ObjectStore, self)._do_set_replica_gen_and_trans_id( +            other_replica_uid, +            other_generation, +            other_transaction_id)          self._set_u1db_data() -    def _get_transaction_log(self): -        self._get_u1db_data() -        return self._transaction_log.get_transaction_log() -      #-------------------------------------------------------------------------      # implemented methods from CommonBackend      #------------------------------------------------------------------------- -    def _get_generation(self): -        self._get_u1db_data() -        return self._transaction_log.get_generation() - -    def _get_generation_info(self): -        self._get_u1db_data() -        return self._transaction_log.get_generation_info() - -    def _has_conflicts(self, doc_id): -        self._get_u1db_data() -        return self._conflict_log.has_conflicts(doc_id) -      def _put_and_update_indexes(self, old_doc, doc): -        # for now we ignore indexes as this backend is used to store encrypted -        # blobs of data in the server. +        for index in self._indexes.itervalues(): +            if old_doc is not None and not old_doc.is_tombstone(): +                index.remove_json(old_doc.doc_id, old_doc.get_json()) +            if not doc.is_tombstone(): +                index.add_json(doc.doc_id, doc.get_json()) +        trans_id = self._allocate_transaction_id()          self._put_doc(doc) -        self._update_gen_and_transaction_log(doc.doc_id) - -    def _get_trans_id_for_gen(self, generation): -        self._get_u1db_data() -        trans_id = self._transaction_log.get_trans_id_for_gen(generation) -        if trans_id is None: -            raise errors.InvalidGeneration -        return trans_id +        self._transaction_log.append((doc.doc_id, trans_id)) +        self._set_u1db_data()      #-------------------------------------------------------------------------      # methods specific for object stores      #------------------------------------------------------------------------- -    def _ensure_u1db_data(self): -        """ -        Guarantee that u1db data (logs and replica info) exists in store. -        """ -        if not self._is_initialized(): -            self._initialize() -        self._get_u1db_data() -      U1DB_DATA_DOC_ID = 'u1db_data' -    def _is_initialized(self): -        """ -        Verify if u1db data exists in store. -        """ -        if not self._get_doc(self.U1DB_DATA_DOC_ID): -            return False -        return True - -    def _initialize(self): -        """ -        Create u1db data object in store. -        """ -        NotImplementedError(self._initialize) -      def _get_u1db_data(self):          """          Fetch u1db configuration data from backend storage. @@ -225,227 +102,8 @@ class ObjectStore(CommonBackend):          """          NotImplementedError(self._set_u1db_data) -    def _set_replica_uid(self, replica_uid): -        self._replica_uid = replica_uid -        self._set_u1db_data() - -    def _get_replica_uid(self): -        return self._replica_uid - -    replica_uid = property( -        _get_replica_uid, _set_replica_uid, doc="Replica UID of the database") - -    #------------------------------------------------------------------------- -    # The methods below were cloned from u1db sqlite backend. They should at -    # least exist and raise a NotImplementedError exception in CommonBackend -    # (should we maybe fill a bug in u1db bts?). -    #------------------------------------------------------------------------- - -    def _add_conflict(self, doc_id, my_doc_rev, my_content): -        self._conflict_log.append((doc_id, my_doc_rev, my_content)) -        self._set_u1db_data() - -    def _delete_conflicts(self, doc, conflict_revs): -        deleting = [(doc.doc_id, c_rev) for c_rev in conflict_revs] -        self._conflict_log.delete_conflicts(deleting) -        self._set_u1db_data() -        doc.has_conflicts = self._has_conflicts(doc.doc_id) - -    def _prune_conflicts(self, doc, doc_vcr): -        if self._has_conflicts(doc.doc_id): -            autoresolved = False -            c_revs_to_prune = [] -            for c_doc in self._conflict_log.get_conflicts(doc.doc_id): -                c_vcr = vectorclock.VectorClockRev(c_doc.rev) -                if doc_vcr.is_newer(c_vcr): -                    c_revs_to_prune.append(c_doc.rev) -                elif doc.same_content_as(c_doc): -                    c_revs_to_prune.append(c_doc.rev) -                    doc_vcr.maximize(c_vcr) -                    autoresolved = True -            if autoresolved: -                doc_vcr.increment(self._replica_uid) -                doc.rev = doc_vcr.as_str() -            self._delete_conflicts(doc, c_revs_to_prune) - -    def _force_doc_sync_conflict(self, doc): -        my_doc = self._get_doc(doc.doc_id) -        self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) -        self._add_conflict(doc.doc_id, my_doc.rev, my_doc.get_json()) -        doc.has_conflicts = True -        self._put_and_update_indexes(my_doc, doc) - - -#---------------------------------------------------------------------------- -# U1DB's TransactionLog, SyncLog, ConflictLog, and Index -#---------------------------------------------------------------------------- - -class SimpleList(object): -    def __init__(self): -        self._data = [] - -    def _set_data(self, data): -        self._data = data - -    def _get_data(self): -        return self._data - -    data = property( -        _get_data, _set_data, doc="List contents.") - -    def append(self, msg): -        self._data.append(msg) - -    def reduce(self, func, initializer=None): -        return reduce(func, self._data, initializer) - -    def map(self, func): -        return map(func, self._get_data()) - -    def filter(self, func): -        return filter(func, self._get_data()) - - -class SimpleLog(SimpleList): - -    def _set_log(self, log): -        self._data = log - -    def _get_log(self): -        return self._data - -    log = property( -        _get_log, _set_log, doc="Log contents.") - - -class TransactionLog(SimpleLog): -    """ -    An ordered list of (generation, doc_id, transaction_id) tuples. -    """ - -    def _set_log(self, log): -        self._data = log - -    def _get_data(self, reverse=True): -        return sorted(self._data, reverse=reverse) - -    _get_log = _get_data - -    log = property( -        _get_log, _set_log, doc="Log contents.") - -    def get_generation(self): -        """ -        Return the current generation. +    def _init_u1db_data(self):          """ -        gens = self.map(lambda x: x[0]) -        if not gens: -            return 0 -        return max(gens) - -    def get_generation_info(self): -        """ -        Return the current generation and transaction id. -        """ -        if not self._get_log(): -            return(0, '') -        info = self.map(lambda x: (x[0], x[2])) -        return reduce(lambda x, y: x if (x[0] > y[0]) else y, info) - -    def get_trans_id_for_gen(self, gen): -        """ -        Get the transaction id corresponding to a particular generation. -        """ -        log = self.reduce(lambda x, y: y if y[0] == gen else x) -        if log is None: -            return None -        return log[2] - -    def whats_changed(self, old_generation): -        """ -        Return a list of documents that have changed since old_generation. +        Initialize u1db configuration data on backend storage.          """ -        results = self.filter(lambda x: x[0] > old_generation) -        seen = set() -        changes = [] -        newest_trans_id = '' -        for generation, doc_id, trans_id in results: -            if doc_id not in seen: -                changes.append((doc_id, generation, trans_id)) -                seen.add(doc_id) -        if changes: -            cur_gen = changes[0][1]  # max generation -            newest_trans_id = changes[0][2] -            changes.reverse() -        else: -            results = self._get_log() -            if not results: -                cur_gen = 0 -                newest_trans_id = '' -            else: -                cur_gen, _, newest_trans_id = results[0] - -        return cur_gen, newest_trans_id, changes - -    def get_transaction_log(self): -        """ -        Return only a list of (doc_id, transaction_id) -        """ -        return map(lambda x: (x[1], x[2]), -                   sorted(self._get_log(reverse=False))) - - -class SyncLog(SimpleLog): -    """ -    A list of (replica_id, generation, transaction_id) tuples. -    """ - -    def find_by_replica_uid(self, replica_uid): -        if not self._get_log(): -            return () -        return self.reduce(lambda x, y: y if y[0] == replica_uid else x) - -    def get_replica_gen_and_trans_id(self, other_replica_uid): -        """ -        Return the last known generation and transaction id for the other db -        replica. -        """ -        info = self.find_by_replica_uid(other_replica_uid) -        if not info: -            return (0, '') -        return (info[1], info[2]) - -    def set_replica_gen_and_trans_id(self, other_replica_uid, -                                     other_generation, other_transaction_id): -        """ -        Set the last-known generation and transaction id for the other -        database replica. -        """ -        self._set_log(self.filter(lambda x: x[0] != other_replica_uid)) -        self.append((other_replica_uid, other_generation, -                     other_transaction_id)) - - -class ConflictLog(SimpleLog): -    """ -    A list of (doc_id, my_doc_rev, my_content) tuples. -    """ - -    def __init__(self, factory): -        super(ConflictLog, self).__init__() -        self._factory = factory - -    def delete_conflicts(self, conflicts): -        for conflict in conflicts: -            self._set_log(self.filter(lambda x: -                          x[0] != conflict[0] or x[1] != conflict[1])) - -    def get_conflicts(self, doc_id): -        conflicts = self.filter(lambda x: x[0] == doc_id) -        if not conflicts: -            return [] -        return reversed(map(lambda x: self._factory(doc_id, x[1], x[2]), -                            conflicts)) - -    def has_conflicts(self, doc_id): -        return bool(self.filter(lambda x: x[0] == doc_id)) +        NotImplementedError(self._init_u1db_data) diff --git a/src/leap/soledad/tests/test_couch.py b/src/leap/soledad/tests/test_couch.py index 5e8d6126..6c3d7daf 100644 --- a/src/leap/soledad/tests/test_couch.py +++ b/src/leap/soledad/tests/test_couch.py @@ -46,9 +46,10 @@ def copy_couch_database_for_test(test, db):      gen, docs = db.get_all_docs(include_deleted=True)      for doc in docs:          new_db._put_doc(doc) -    new_db._transaction_log._data = copy.deepcopy(db._transaction_log._data) -    new_db._sync_log._data = copy.deepcopy(db._sync_log._data) -    new_db._conflict_log._data = copy.deepcopy(db._conflict_log._data) +    new_db._transaction_log = copy.deepcopy(db._transaction_log) +    new_db._conflicts = copy.deepcopy(db._conflicts) +    new_db._other_generations = copy.deepcopy(db._other_generations) +    new_db._indexes = copy.deepcopy(db._indexes)      new_db._set_u1db_data()      return new_db @@ -112,13 +113,13 @@ class CouchWithConflictsTests(  # the server, so indexing makes no sense. Thus, we ignore index testing for  # now. -# class CouchIndexTests(DatabaseIndexTests): -# -#    scenarios = COUCH_SCENARIOS -# -#    def tearDown(self): -#        self.db.delete_database() -#        super(CouchIndexTests, self).tearDown() +class CouchIndexTests(test_backends.DatabaseIndexTests): + +    scenarios = COUCH_SCENARIOS + +    def tearDown(self): +        self.db.delete_database() +        super(CouchIndexTests, self).tearDown()  #----------------------------------------------------------------------------- @@ -196,23 +197,5 @@ class CouchDatabaseSyncTests(test_sync.DatabaseSyncTests):          db.delete_database()          super(CouchDatabaseSyncTests, self).tearDown() -    # The following tests use indexing, so we eliminate them for now because -    # indexing is still not implemented in couch backend. - -    def test_sync_pulls_changes(self): -        pass - -    def test_sync_sees_remote_conflicted(self): -        pass - -    def test_sync_sees_remote_delete_conflicted(self): -        pass - -    def test_sync_local_race_conflicted(self): -        pass - -    def test_sync_propagates_deletes(self): -        pass -  load_tests = tests.load_with_scenarios | 
