summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/soledad/backends/couch.py34
-rw-r--r--src/leap/soledad/backends/objectstore.py66
-rw-r--r--src/leap/soledad/tests/test_couch.py62
-rw-r--r--src/leap/soledad/tests/test_logs.py3
-rw-r--r--src/leap/soledad/util.py28
5 files changed, 124 insertions, 69 deletions
diff --git a/src/leap/soledad/backends/couch.py b/src/leap/soledad/backends/couch.py
index 14021737..f071cfad 100644
--- a/src/leap/soledad/backends/couch.py
+++ b/src/leap/soledad/backends/couch.py
@@ -1,6 +1,8 @@
+import uuid
+from base64 import b64encode, b64decode
from u1db import errors
from u1db.remote.http_target import HTTPSyncTarget
-from couchdb.client import Server, Document
+from couchdb.client import Server, Document as CouchDocument
from couchdb.http import ResourceNotFound
from leap.soledad.backends.objectstore import ObjectStore
from leap.soledad.backends.leap_backend import LeapDocument
@@ -46,7 +48,13 @@ class CouchDatabase(ObjectStore):
cdoc = self._database.get(doc_id)
if cdoc is None:
return None
- doc = self._factory(doc_id=doc_id, rev=cdoc['u1db_rev'])
+ has_conflicts = False
+ if check_for_conflicts:
+ has_conflicts = self._has_conflicts(doc_id)
+ doc = self._factory(
+ doc_id=doc_id,
+ rev=cdoc['u1db_rev'],
+ has_conflicts=has_conflicts)
if cdoc['u1db_json'] is not None:
doc.content = json.loads(cdoc['u1db_json'])
else:
@@ -60,7 +68,7 @@ class CouchDatabase(ObjectStore):
for doc_id in self._database:
if doc_id == self.U1DB_DATA_DOC_ID:
continue
- doc = self._get_doc(doc_id)
+ doc = self._get_doc(doc_id, check_for_conflicts=True)
if doc.content is None and not include_deleted:
continue
results.append(doc)
@@ -68,7 +76,7 @@ class CouchDatabase(ObjectStore):
def _put_doc(self, doc):
# prepare couch's Document
- cdoc = Document()
+ cdoc = CouchDocument()
cdoc['_id'] = doc.doc_id
# we have to guarantee that couch's _rev is cosistent
old_cdoc = self._database.get(doc.doc_id)
@@ -81,6 +89,7 @@ class CouchDatabase(ObjectStore):
cdoc['u1db_json'] = doc.get_json()
else:
cdoc['u1db_json'] = None
+ # save doc in db
self._database.save(cdoc)
def get_sync_target(self):
@@ -103,12 +112,22 @@ class CouchDatabase(ObjectStore):
return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync(
autocreate=autocreate)
+ def _initialize(self):
+ if self._replica_uid is None:
+ self._replica_uid = uuid.uuid4().hex
+ doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID)
+ doc.content = { 'sync_log' : [],
+ 'transaction_log' : [],
+ 'conflict_log' : b64encode(json.dumps([])),
+ 'replica_uid' : self._replica_uid }
+ self._put_doc(doc)
+
def _get_u1db_data(self):
cdoc = self._database.get(self.U1DB_DATA_DOC_ID)
content = json.loads(cdoc['u1db_json'])
self._sync_log.log = content['sync_log']
self._transaction_log.log = content['transaction_log']
- self._conflict_log.log = content['conflict_log']
+ self._conflict_log.log = json.loads(b64decode(content['conflict_log']))
self._replica_uid = content['replica_uid']
self._couch_rev = cdoc['_rev']
@@ -116,7 +135,10 @@ class CouchDatabase(ObjectStore):
doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID)
doc.content = { 'sync_log' : self._sync_log.log,
'transaction_log' : self._transaction_log.log,
- 'conflict_log' : self._conflict_log.log,
+ # Here, the b64 encode ensures that document content
+ # does not cause strange behaviour in couchdb because
+ # of encoding.
+ 'conflict_log' : b64encode(json.dumps(self._conflict_log.log)),
'replica_uid' : self._replica_uid,
'_rev' : self._couch_rev}
self._put_doc(doc)
diff --git a/src/leap/soledad/backends/objectstore.py b/src/leap/soledad/backends/objectstore.py
index cd051588..2ab07675 100644
--- a/src/leap/soledad/backends/objectstore.py
+++ b/src/leap/soledad/backends/objectstore.py
@@ -1,9 +1,7 @@
-import uuid
from u1db.backends import CommonBackend
-from u1db import errors, Document
+from u1db import errors, Document, vectorclock
from leap.soledad import util as soledadutil
-
class ObjectStore(CommonBackend):
"""
A backend for storing u1db data in an object store.
@@ -11,12 +9,12 @@ class ObjectStore(CommonBackend):
def __init__(self, replica_uid=None):
# This initialization method should be called after the connection
- # with the database is established, so it can ensure that u1db data is
- # configured and up-to-date.
+ # with the database is established in each implementation, so it can
+ # ensure that u1db data is configured and up-to-date.
self.set_document_factory(Document)
self._sync_log = soledadutil.SyncLog()
self._transaction_log = soledadutil.TransactionLog()
- self._conflict_log = soledadutil.ConflictLog()
+ self._conflict_log = soledadutil.ConflictLog(self._factory)
self._replica_uid = replica_uid
self._ensure_u1db_data()
@@ -72,8 +70,7 @@ class ObjectStore(CommonBackend):
raise errors.RevisionConflict()
new_rev = self._allocate_doc_rev(doc.rev)
doc.rev = new_rev
- self._put_doc(doc)
- self._update_gen_and_transaction_log(doc.doc_id)
+ self._put_and_update_indexes(old_doc, doc)
return doc.rev
def delete_doc(self, doc):
@@ -89,8 +86,7 @@ class ObjectStore(CommonBackend):
new_rev = self._allocate_doc_rev(doc.rev)
doc.rev = new_rev
doc.make_tombstone()
- self._put_doc(doc)
- self._update_gen_and_transaction_log(doc.doc_id)
+ self._put_and_update_indexes(old_doc, doc)
return new_rev
# start of index-related methods: these are not supported by this backend.
@@ -117,10 +113,25 @@ class ObjectStore(CommonBackend):
# end of index-related methods: these are not supported by this backend.
def get_doc_conflicts(self, doc_id):
- return []
+ self._get_u1db_data()
+ conflict_docs = self._conflict_log.get_conflicts(doc_id)
+ if not conflict_docs:
+ return []
+ this_doc = self._get_doc(doc_id)
+ this_doc.has_conflicts = True
+ return [this_doc] + list(conflict_docs)
def resolve_doc(self, doc, conflicted_doc_revs):
- raise NotImplementedError(self.resolve_doc)
+ cur_doc = self._get_doc(doc.doc_id)
+ new_rev = self._ensure_maximal_rev(cur_doc.rev,
+ conflicted_doc_revs)
+ superseded_revs = set(conflicted_doc_revs)
+ doc.rev = new_rev
+ if cur_doc.rev in superseded_revs:
+ self._put_and_update_indexes(cur_doc, doc)
+ else:
+ self._add_conflict(doc.doc_id, new_rev, doc.get_json())
+ self._delete_conflicts(doc, superseded_revs)
def _get_replica_gen_and_trans_id(self, other_replica_uid):
self._get_u1db_data()
@@ -142,6 +153,7 @@ class ObjectStore(CommonBackend):
other_transaction_id)
def _get_transaction_log(self):
+ self._get_u1db_data()
return self._transaction_log.get_transaction_log()
#-------------------------------------------------------------------------
@@ -157,11 +169,12 @@ class ObjectStore(CommonBackend):
return self._transaction_log.get_generation_info()
def _has_conflicts(self, doc_id):
- # Documents never have conflicts on server.
- return False
+ self._get_u1db_data()
+ return self._conflict_log.has_conflicts(doc_id)
def _put_and_update_indexes(self, old_doc, doc):
- # TODO: implement index update
+ # for now we ignore indexes as this backend is used to store encrypted
+ # blobs of data in the server.
self._put_doc(doc)
self._update_gen_and_transaction_log(doc.doc_id)
@@ -199,14 +212,7 @@ class ObjectStore(CommonBackend):
"""
Create u1db data object in store.
"""
- if self._replica_uid is None:
- self._replica_uid = uuid.uuid4().hex
- doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID)
- doc.content = { 'sync_log' : [],
- 'transaction_log' : [],
- 'conflict_log' : [],
- 'replica_uid' : self._replica_uid }
- self._put_doc(doc)
+ NotImplementedError(self._initialize)
def _get_u1db_data(self, u1db_data_doc_id):
"""
@@ -239,17 +245,19 @@ class ObjectStore(CommonBackend):
def _add_conflict(self, doc_id, my_doc_rev, my_content):
self._conflict_log.append((doc_id, my_doc_rev, my_content))
+ self._set_u1db_data()
def _delete_conflicts(self, doc, conflict_revs):
deleting = [(doc.doc_id, c_rev) for c_rev in conflict_revs]
self._conflict_log.delete_conflicts(deleting)
+ self._set_u1db_data()
doc.has_conflicts = self._has_conflicts(doc.doc_id)
def _prune_conflicts(self, doc, doc_vcr):
if self._has_conflicts(doc.doc_id):
autoresolved = False
c_revs_to_prune = []
- for c_doc in self._get_conflicts(doc.doc_id):
+ for c_doc in self._conflict_log.get_conflicts(doc.doc_id):
c_vcr = vectorclock.VectorClockRev(c_doc.rev)
if doc_vcr.is_newer(c_vcr):
c_revs_to_prune.append(c_doc.rev)
@@ -260,5 +268,11 @@ class ObjectStore(CommonBackend):
if autoresolved:
doc_vcr.increment(self._replica_uid)
doc.rev = doc_vcr.as_str()
- c = self._db_handle.cursor()
- self._delete_conflicts(c, doc, c_revs_to_prune)
+ self._delete_conflicts(doc, c_revs_to_prune)
+
+ def _force_doc_sync_conflict(self, doc):
+ my_doc = self._get_doc(doc.doc_id)
+ self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev))
+ self._add_conflict(doc.doc_id, my_doc.rev, my_doc.get_json())
+ doc.has_conflicts = True
+ self._put_and_update_indexes(my_doc, doc)
diff --git a/src/leap/soledad/tests/test_couch.py b/src/leap/soledad/tests/test_couch.py
index 3f6c45f6..2337be9b 100644
--- a/src/leap/soledad/tests/test_couch.py
+++ b/src/leap/soledad/tests/test_couch.py
@@ -40,7 +40,7 @@ class TestCouchBackendImpl(tests.TestCase):
# The following tests come from `u1db.tests.test_backends`.
#-----------------------------------------------------------------------------
-def make_couch_database_for_test(test, replica_uid, path='test'):
+def make_couch_database_for_test(test, replica_uid):
return couch.CouchDatabase('http://localhost:5984', 'u1db_tests',
replica_uid=replica_uid)
@@ -81,40 +81,44 @@ class CouchDatabaseTests(LocalDatabaseTests):
super(CouchDatabaseTests, self).tearDown()
-#class CouchValidateGenNTransIdTests(LocalDatabaseValidateGenNTransIdTests):
-#
-# scenarios = COUCH_SCENARIOS
-#
-# def tearDown(self):
-# self.db.delete_database()
-# super(CouchTests, self).tearDown()
-#
-#
-#class CouchValidateSourceGenTests(LocalDatabaseValidateSourceGenTests):
-#
-# scenarios = COUCH_SCENARIOS
-#
-# def tearDown(self):
-# self.db.delete_database()
-# super(CouchTests, self).tearDown()
-#
-#
-#class CouchWithConflictsTests(LocalDatabaseWithConflictsTests):
-#
-# scenarios = COUCH_SCENARIOS
-#
-# def tearDown(self):
-# self.db.delete_database()
-# super(CouchTests, self).tearDown()
-#
-#
+class CouchValidateGenNTransIdTests(LocalDatabaseValidateGenNTransIdTests):
+
+ scenarios = COUCH_SCENARIOS
+
+ def tearDown(self):
+ self.db.delete_database()
+ super(CouchValidateGenNTransIdTests, self).tearDown()
+
+
+class CouchValidateSourceGenTests(LocalDatabaseValidateSourceGenTests):
+
+ scenarios = COUCH_SCENARIOS
+
+ def tearDown(self):
+ self.db.delete_database()
+ super(CouchValidateSourceGenTests, self).tearDown()
+
+
+class CouchWithConflictsTests(LocalDatabaseWithConflictsTests):
+
+ scenarios = COUCH_SCENARIOS
+
+ def tearDown(self):
+ self.db.delete_database()
+ super(CouchWithConflictsTests, self).tearDown()
+
+
+# Notice: the CouchDB backend is currently used for storing encrypted data in
+# the server, so indexing makes no sense. Thus, we ignore index testing for
+# now.
+
#class CouchIndexTests(DatabaseIndexTests):
#
# scenarios = COUCH_SCENARIOS
#
# def tearDown(self):
# self.db.delete_database()
-# super(CouchTests, self).tearDown()
+# super(CouchIndexTests, self).tearDown()
#
load_tests = tests.load_with_scenarios
diff --git a/src/leap/soledad/tests/test_logs.py b/src/leap/soledad/tests/test_logs.py
index 7fbb1cb7..2102b671 100644
--- a/src/leap/soledad/tests/test_logs.py
+++ b/src/leap/soledad/tests/test_logs.py
@@ -69,10 +69,11 @@ class LogTestCase(unittest.TestCase):
'error getting whats changed.')
def test_conflict_log(self):
+ # TODO: include tests for `get_conflicts` and `has_conflicts`.
data = [('1', 'my:1', 'irrelevant'),
('2', 'my:1', 'irrelevant'),
('3', 'my:1', 'irrelevant')]
- log = ConflictLog()
+ log = ConflictLog(None)
log.log = data
log.delete_conflicts([('1','my:1'),('2','my:1')])
self.assertEqual(
diff --git a/src/leap/soledad/util.py b/src/leap/soledad/util.py
index 8683fbb9..8a8bedfb 100644
--- a/src/leap/soledad/util.py
+++ b/src/leap/soledad/util.py
@@ -74,13 +74,13 @@ class SimpleLog(object):
self._log.append(msg)
def reduce(self, func, initializer=None):
- return reduce(func, self.log, initializer)
+ return reduce(func, self._log, initializer)
def map(self, func):
- return map(func, self.log)
+ return map(func, self._get_log())
def filter(self, func):
- return filter(func, self.log)
+ return filter(func, self._get_log())
class TransactionLog(SimpleLog):
@@ -141,7 +141,7 @@ class TransactionLog(SimpleLog):
newest_trans_id = changes[0][2]
changes.reverse()
else:
- results = self.log
+ results = self._get_log()
if not results:
cur_gen = 0
newest_trans_id = ''
@@ -164,7 +164,7 @@ class SyncLog(SimpleLog):
"""
def find_by_replica_uid(self, replica_uid):
- if not self.log:
+ if not self._get_log():
return ()
return self.reduce(lambda x, y: y if y[0] == replica_uid else x)
@@ -184,7 +184,7 @@ class SyncLog(SimpleLog):
Set the last-known generation and transaction id for the other
database replica.
"""
- self.log = self.filter(lambda x: x[0] != other_replica_uid)
+ self._log = self.filter(lambda x: x[0] != other_replica_uid)
self.append((other_replica_uid, other_generation,
other_transaction_id))
@@ -192,8 +192,22 @@ class ConflictLog(SimpleLog):
"""
A list of (doc_id, my_doc_rev, my_content) tuples.
"""
+
+ def __init__(self, factory):
+ super(ConflictLog, self).__init__()
+ self._factory = factory
def delete_conflicts(self, conflicts):
for conflict in conflicts:
- self.log = self.filter(lambda x:
+ self._log = self.filter(lambda x:
x[0] != conflict[0] or x[1] != conflict[1])
+
+ def get_conflicts(self, doc_id):
+ conflicts = self.filter(lambda x: x[0] == doc_id)
+ if not conflicts:
+ return []
+ return reversed(map(lambda x: self._factory(doc_id, x[1], x[2]),
+ conflicts))
+
+ def has_conflicts(self, doc_id):
+ return bool(self.filter(lambda x: x[0] == doc_id))