diff options
Diffstat (limited to 'src/leap')
-rw-r--r-- | src/leap/soledad/backends/couch.py | 34 | ||||
-rw-r--r-- | src/leap/soledad/backends/objectstore.py | 66 | ||||
-rw-r--r-- | src/leap/soledad/tests/test_couch.py | 62 | ||||
-rw-r--r-- | src/leap/soledad/tests/test_logs.py | 3 | ||||
-rw-r--r-- | src/leap/soledad/util.py | 28 |
5 files changed, 124 insertions, 69 deletions
diff --git a/src/leap/soledad/backends/couch.py b/src/leap/soledad/backends/couch.py index 14021737..f071cfad 100644 --- a/src/leap/soledad/backends/couch.py +++ b/src/leap/soledad/backends/couch.py @@ -1,6 +1,8 @@ +import uuid +from base64 import b64encode, b64decode from u1db import errors from u1db.remote.http_target import HTTPSyncTarget -from couchdb.client import Server, Document +from couchdb.client import Server, Document as CouchDocument from couchdb.http import ResourceNotFound from leap.soledad.backends.objectstore import ObjectStore from leap.soledad.backends.leap_backend import LeapDocument @@ -46,7 +48,13 @@ class CouchDatabase(ObjectStore): cdoc = self._database.get(doc_id) if cdoc is None: return None - doc = self._factory(doc_id=doc_id, rev=cdoc['u1db_rev']) + has_conflicts = False + if check_for_conflicts: + has_conflicts = self._has_conflicts(doc_id) + doc = self._factory( + doc_id=doc_id, + rev=cdoc['u1db_rev'], + has_conflicts=has_conflicts) if cdoc['u1db_json'] is not None: doc.content = json.loads(cdoc['u1db_json']) else: @@ -60,7 +68,7 @@ class CouchDatabase(ObjectStore): for doc_id in self._database: if doc_id == self.U1DB_DATA_DOC_ID: continue - doc = self._get_doc(doc_id) + doc = self._get_doc(doc_id, check_for_conflicts=True) if doc.content is None and not include_deleted: continue results.append(doc) @@ -68,7 +76,7 @@ class CouchDatabase(ObjectStore): def _put_doc(self, doc): # prepare couch's Document - cdoc = Document() + cdoc = CouchDocument() cdoc['_id'] = doc.doc_id # we have to guarantee that couch's _rev is cosistent old_cdoc = self._database.get(doc.doc_id) @@ -81,6 +89,7 @@ class CouchDatabase(ObjectStore): cdoc['u1db_json'] = doc.get_json() else: cdoc['u1db_json'] = None + # save doc in db self._database.save(cdoc) def get_sync_target(self): @@ -103,12 +112,22 @@ class CouchDatabase(ObjectStore): return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync( autocreate=autocreate) + def _initialize(self): + if self._replica_uid is None: + self._replica_uid = uuid.uuid4().hex + doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) + doc.content = { 'sync_log' : [], + 'transaction_log' : [], + 'conflict_log' : b64encode(json.dumps([])), + 'replica_uid' : self._replica_uid } + self._put_doc(doc) + def _get_u1db_data(self): cdoc = self._database.get(self.U1DB_DATA_DOC_ID) content = json.loads(cdoc['u1db_json']) self._sync_log.log = content['sync_log'] self._transaction_log.log = content['transaction_log'] - self._conflict_log.log = content['conflict_log'] + self._conflict_log.log = json.loads(b64decode(content['conflict_log'])) self._replica_uid = content['replica_uid'] self._couch_rev = cdoc['_rev'] @@ -116,7 +135,10 @@ class CouchDatabase(ObjectStore): doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) doc.content = { 'sync_log' : self._sync_log.log, 'transaction_log' : self._transaction_log.log, - 'conflict_log' : self._conflict_log.log, + # Here, the b64 encode ensures that document content + # does not cause strange behaviour in couchdb because + # of encoding. + 'conflict_log' : b64encode(json.dumps(self._conflict_log.log)), 'replica_uid' : self._replica_uid, '_rev' : self._couch_rev} self._put_doc(doc) diff --git a/src/leap/soledad/backends/objectstore.py b/src/leap/soledad/backends/objectstore.py index cd051588..2ab07675 100644 --- a/src/leap/soledad/backends/objectstore.py +++ b/src/leap/soledad/backends/objectstore.py @@ -1,9 +1,7 @@ -import uuid from u1db.backends import CommonBackend -from u1db import errors, Document +from u1db import errors, Document, vectorclock from leap.soledad import util as soledadutil - class ObjectStore(CommonBackend): """ A backend for storing u1db data in an object store. @@ -11,12 +9,12 @@ class ObjectStore(CommonBackend): def __init__(self, replica_uid=None): # This initialization method should be called after the connection - # with the database is established, so it can ensure that u1db data is - # configured and up-to-date. + # with the database is established in each implementation, so it can + # ensure that u1db data is configured and up-to-date. self.set_document_factory(Document) self._sync_log = soledadutil.SyncLog() self._transaction_log = soledadutil.TransactionLog() - self._conflict_log = soledadutil.ConflictLog() + self._conflict_log = soledadutil.ConflictLog(self._factory) self._replica_uid = replica_uid self._ensure_u1db_data() @@ -72,8 +70,7 @@ class ObjectStore(CommonBackend): raise errors.RevisionConflict() new_rev = self._allocate_doc_rev(doc.rev) doc.rev = new_rev - self._put_doc(doc) - self._update_gen_and_transaction_log(doc.doc_id) + self._put_and_update_indexes(old_doc, doc) return doc.rev def delete_doc(self, doc): @@ -89,8 +86,7 @@ class ObjectStore(CommonBackend): new_rev = self._allocate_doc_rev(doc.rev) doc.rev = new_rev doc.make_tombstone() - self._put_doc(doc) - self._update_gen_and_transaction_log(doc.doc_id) + self._put_and_update_indexes(old_doc, doc) return new_rev # start of index-related methods: these are not supported by this backend. @@ -117,10 +113,25 @@ class ObjectStore(CommonBackend): # end of index-related methods: these are not supported by this backend. def get_doc_conflicts(self, doc_id): - return [] + self._get_u1db_data() + conflict_docs = self._conflict_log.get_conflicts(doc_id) + if not conflict_docs: + return [] + this_doc = self._get_doc(doc_id) + this_doc.has_conflicts = True + return [this_doc] + list(conflict_docs) def resolve_doc(self, doc, conflicted_doc_revs): - raise NotImplementedError(self.resolve_doc) + cur_doc = self._get_doc(doc.doc_id) + new_rev = self._ensure_maximal_rev(cur_doc.rev, + conflicted_doc_revs) + superseded_revs = set(conflicted_doc_revs) + doc.rev = new_rev + if cur_doc.rev in superseded_revs: + self._put_and_update_indexes(cur_doc, doc) + else: + self._add_conflict(doc.doc_id, new_rev, doc.get_json()) + self._delete_conflicts(doc, superseded_revs) def _get_replica_gen_and_trans_id(self, other_replica_uid): self._get_u1db_data() @@ -142,6 +153,7 @@ class ObjectStore(CommonBackend): other_transaction_id) def _get_transaction_log(self): + self._get_u1db_data() return self._transaction_log.get_transaction_log() #------------------------------------------------------------------------- @@ -157,11 +169,12 @@ class ObjectStore(CommonBackend): return self._transaction_log.get_generation_info() def _has_conflicts(self, doc_id): - # Documents never have conflicts on server. - return False + self._get_u1db_data() + return self._conflict_log.has_conflicts(doc_id) def _put_and_update_indexes(self, old_doc, doc): - # TODO: implement index update + # for now we ignore indexes as this backend is used to store encrypted + # blobs of data in the server. self._put_doc(doc) self._update_gen_and_transaction_log(doc.doc_id) @@ -199,14 +212,7 @@ class ObjectStore(CommonBackend): """ Create u1db data object in store. """ - if self._replica_uid is None: - self._replica_uid = uuid.uuid4().hex - doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) - doc.content = { 'sync_log' : [], - 'transaction_log' : [], - 'conflict_log' : [], - 'replica_uid' : self._replica_uid } - self._put_doc(doc) + NotImplementedError(self._initialize) def _get_u1db_data(self, u1db_data_doc_id): """ @@ -239,17 +245,19 @@ class ObjectStore(CommonBackend): def _add_conflict(self, doc_id, my_doc_rev, my_content): self._conflict_log.append((doc_id, my_doc_rev, my_content)) + self._set_u1db_data() def _delete_conflicts(self, doc, conflict_revs): deleting = [(doc.doc_id, c_rev) for c_rev in conflict_revs] self._conflict_log.delete_conflicts(deleting) + self._set_u1db_data() doc.has_conflicts = self._has_conflicts(doc.doc_id) def _prune_conflicts(self, doc, doc_vcr): if self._has_conflicts(doc.doc_id): autoresolved = False c_revs_to_prune = [] - for c_doc in self._get_conflicts(doc.doc_id): + for c_doc in self._conflict_log.get_conflicts(doc.doc_id): c_vcr = vectorclock.VectorClockRev(c_doc.rev) if doc_vcr.is_newer(c_vcr): c_revs_to_prune.append(c_doc.rev) @@ -260,5 +268,11 @@ class ObjectStore(CommonBackend): if autoresolved: doc_vcr.increment(self._replica_uid) doc.rev = doc_vcr.as_str() - c = self._db_handle.cursor() - self._delete_conflicts(c, doc, c_revs_to_prune) + self._delete_conflicts(doc, c_revs_to_prune) + + def _force_doc_sync_conflict(self, doc): + my_doc = self._get_doc(doc.doc_id) + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + self._add_conflict(doc.doc_id, my_doc.rev, my_doc.get_json()) + doc.has_conflicts = True + self._put_and_update_indexes(my_doc, doc) diff --git a/src/leap/soledad/tests/test_couch.py b/src/leap/soledad/tests/test_couch.py index 3f6c45f6..2337be9b 100644 --- a/src/leap/soledad/tests/test_couch.py +++ b/src/leap/soledad/tests/test_couch.py @@ -40,7 +40,7 @@ class TestCouchBackendImpl(tests.TestCase): # The following tests come from `u1db.tests.test_backends`. #----------------------------------------------------------------------------- -def make_couch_database_for_test(test, replica_uid, path='test'): +def make_couch_database_for_test(test, replica_uid): return couch.CouchDatabase('http://localhost:5984', 'u1db_tests', replica_uid=replica_uid) @@ -81,40 +81,44 @@ class CouchDatabaseTests(LocalDatabaseTests): super(CouchDatabaseTests, self).tearDown() -#class CouchValidateGenNTransIdTests(LocalDatabaseValidateGenNTransIdTests): -# -# scenarios = COUCH_SCENARIOS -# -# def tearDown(self): -# self.db.delete_database() -# super(CouchTests, self).tearDown() -# -# -#class CouchValidateSourceGenTests(LocalDatabaseValidateSourceGenTests): -# -# scenarios = COUCH_SCENARIOS -# -# def tearDown(self): -# self.db.delete_database() -# super(CouchTests, self).tearDown() -# -# -#class CouchWithConflictsTests(LocalDatabaseWithConflictsTests): -# -# scenarios = COUCH_SCENARIOS -# -# def tearDown(self): -# self.db.delete_database() -# super(CouchTests, self).tearDown() -# -# +class CouchValidateGenNTransIdTests(LocalDatabaseValidateGenNTransIdTests): + + scenarios = COUCH_SCENARIOS + + def tearDown(self): + self.db.delete_database() + super(CouchValidateGenNTransIdTests, self).tearDown() + + +class CouchValidateSourceGenTests(LocalDatabaseValidateSourceGenTests): + + scenarios = COUCH_SCENARIOS + + def tearDown(self): + self.db.delete_database() + super(CouchValidateSourceGenTests, self).tearDown() + + +class CouchWithConflictsTests(LocalDatabaseWithConflictsTests): + + scenarios = COUCH_SCENARIOS + + def tearDown(self): + self.db.delete_database() + super(CouchWithConflictsTests, self).tearDown() + + +# Notice: the CouchDB backend is currently used for storing encrypted data in +# the server, so indexing makes no sense. Thus, we ignore index testing for +# now. + #class CouchIndexTests(DatabaseIndexTests): # # scenarios = COUCH_SCENARIOS # # def tearDown(self): # self.db.delete_database() -# super(CouchTests, self).tearDown() +# super(CouchIndexTests, self).tearDown() # load_tests = tests.load_with_scenarios diff --git a/src/leap/soledad/tests/test_logs.py b/src/leap/soledad/tests/test_logs.py index 7fbb1cb7..2102b671 100644 --- a/src/leap/soledad/tests/test_logs.py +++ b/src/leap/soledad/tests/test_logs.py @@ -69,10 +69,11 @@ class LogTestCase(unittest.TestCase): 'error getting whats changed.') def test_conflict_log(self): + # TODO: include tests for `get_conflicts` and `has_conflicts`. data = [('1', 'my:1', 'irrelevant'), ('2', 'my:1', 'irrelevant'), ('3', 'my:1', 'irrelevant')] - log = ConflictLog() + log = ConflictLog(None) log.log = data log.delete_conflicts([('1','my:1'),('2','my:1')]) self.assertEqual( diff --git a/src/leap/soledad/util.py b/src/leap/soledad/util.py index 8683fbb9..8a8bedfb 100644 --- a/src/leap/soledad/util.py +++ b/src/leap/soledad/util.py @@ -74,13 +74,13 @@ class SimpleLog(object): self._log.append(msg) def reduce(self, func, initializer=None): - return reduce(func, self.log, initializer) + return reduce(func, self._log, initializer) def map(self, func): - return map(func, self.log) + return map(func, self._get_log()) def filter(self, func): - return filter(func, self.log) + return filter(func, self._get_log()) class TransactionLog(SimpleLog): @@ -141,7 +141,7 @@ class TransactionLog(SimpleLog): newest_trans_id = changes[0][2] changes.reverse() else: - results = self.log + results = self._get_log() if not results: cur_gen = 0 newest_trans_id = '' @@ -164,7 +164,7 @@ class SyncLog(SimpleLog): """ def find_by_replica_uid(self, replica_uid): - if not self.log: + if not self._get_log(): return () return self.reduce(lambda x, y: y if y[0] == replica_uid else x) @@ -184,7 +184,7 @@ class SyncLog(SimpleLog): Set the last-known generation and transaction id for the other database replica. """ - self.log = self.filter(lambda x: x[0] != other_replica_uid) + self._log = self.filter(lambda x: x[0] != other_replica_uid) self.append((other_replica_uid, other_generation, other_transaction_id)) @@ -192,8 +192,22 @@ class ConflictLog(SimpleLog): """ A list of (doc_id, my_doc_rev, my_content) tuples. """ + + def __init__(self, factory): + super(ConflictLog, self).__init__() + self._factory = factory def delete_conflicts(self, conflicts): for conflict in conflicts: - self.log = self.filter(lambda x: + self._log = self.filter(lambda x: x[0] != conflict[0] or x[1] != conflict[1]) + + def get_conflicts(self, doc_id): + conflicts = self.filter(lambda x: x[0] == doc_id) + if not conflicts: + return [] + return reversed(map(lambda x: self._factory(doc_id, x[1], x[2]), + conflicts)) + + def has_conflicts(self, doc_id): + return bool(self.filter(lambda x: x[0] == doc_id)) |