diff options
Diffstat (limited to 'src/leap/soledad/backends/objectstore.py')
-rw-r--r-- | src/leap/soledad/backends/objectstore.py | 316 |
1 files changed, 278 insertions, 38 deletions
diff --git a/src/leap/soledad/backends/objectstore.py b/src/leap/soledad/backends/objectstore.py index 3cefdf5d..b6523336 100644 --- a/src/leap/soledad/backends/objectstore.py +++ b/src/leap/soledad/backends/objectstore.py @@ -1,22 +1,20 @@ -import uuid from u1db.backends import CommonBackend -from u1db import errors, Document - -from leap.soledad import util as soledadutil - +from u1db import errors, Document, vectorclock class ObjectStore(CommonBackend): """ A backend for storing u1db data in an object store. """ - def __init__(self): + def __init__(self, replica_uid=None): # This initialization method should be called after the connection - # with the database is established, so it can ensure that u1db data is - # configured and up-to-date. + # with the database is established in each implementation, so it can + # ensure that u1db data is configured and up-to-date. self.set_document_factory(Document) - self._sync_log = soledadutil.SyncLog() - self._transaction_log = soledadutil.TransactionLog() + self._sync_log = SyncLog() + self._transaction_log = TransactionLog() + self._conflict_log = ConflictLog(self._factory) + self._replica_uid = replica_uid self._ensure_u1db_data() #------------------------------------------------------------------------- @@ -44,6 +42,12 @@ class ObjectStore(CommonBackend): def _put_doc(self, doc): raise NotImplementedError(self._put_doc) + def _update_gen_and_transaction_log(self, doc_id): + new_gen = self._get_generation() + 1 + trans_id = self._allocate_transaction_id() + self._transaction_log.append((new_gen, doc_id, trans_id)) + self._set_u1db_data() + def put_doc(self, doc): # consistency check if doc.doc_id is None: @@ -65,12 +69,7 @@ class ObjectStore(CommonBackend): raise errors.RevisionConflict() new_rev = self._allocate_doc_rev(doc.rev) doc.rev = new_rev - self._put_doc(doc) - # update u1db generation and logs - new_gen = self._get_generation() + 1 - trans_id = self._allocate_transaction_id() - self._transaction_log.append((new_gen, doc.doc_id, trans_id)) - self._set_u1db_data() + self._put_and_update_indexes(old_doc, doc) return doc.rev def delete_doc(self, doc): @@ -86,7 +85,7 @@ class ObjectStore(CommonBackend): new_rev = self._allocate_doc_rev(doc.rev) doc.rev = new_rev doc.make_tombstone() - self._put_doc(doc) + self._put_and_update_indexes(old_doc, doc) return new_rev # start of index-related methods: these are not supported by this backend. @@ -113,10 +112,25 @@ class ObjectStore(CommonBackend): # end of index-related methods: these are not supported by this backend. def get_doc_conflicts(self, doc_id): - return [] + self._get_u1db_data() + conflict_docs = self._conflict_log.get_conflicts(doc_id) + if not conflict_docs: + return [] + this_doc = self._get_doc(doc_id) + this_doc.has_conflicts = True + return [this_doc] + list(conflict_docs) def resolve_doc(self, doc, conflicted_doc_revs): - raise NotImplementedError(self.resolve_doc) + cur_doc = self._get_doc(doc.doc_id) + new_rev = self._ensure_maximal_rev(cur_doc.rev, + conflicted_doc_revs) + superseded_revs = set(conflicted_doc_revs) + doc.rev = new_rev + if cur_doc.rev in superseded_revs: + self._put_and_update_indexes(cur_doc, doc) + else: + self._add_conflict(doc.doc_id, new_rev, doc.get_json()) + self._delete_conflicts(doc, superseded_revs) def _get_replica_gen_and_trans_id(self, other_replica_uid): self._get_u1db_data() @@ -124,12 +138,22 @@ class ObjectStore(CommonBackend): def _set_replica_gen_and_trans_id(self, other_replica_uid, other_generation, other_transaction_id): - self._get_u1db_data() + return self._do_set_replica_gen_and_trans_id( + other_replica_uid, + other_generation, + other_transaction_id) + + def _do_set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): self._sync_log.set_replica_gen_and_trans_id(other_replica_uid, other_generation, other_transaction_id) self._set_u1db_data() + def _get_transaction_log(self): + self._get_u1db_data() + return self._transaction_log.get_transaction_log() + #------------------------------------------------------------------------- # implemented methods from CommonBackend #------------------------------------------------------------------------- @@ -143,12 +167,14 @@ class ObjectStore(CommonBackend): return self._transaction_log.get_generation_info() def _has_conflicts(self, doc_id): - # Documents never have conflicts on server. - return False - - def _put_and_update_indexes(self, doc_id, old_doc, new_rev, content): - raise NotImplementedError(self._put_and_update_indexes) + self._get_u1db_data() + return self._conflict_log.has_conflicts(doc_id) + def _put_and_update_indexes(self, old_doc, doc): + # for now we ignore indexes as this backend is used to store encrypted + # blobs of data in the server. + self._put_doc(doc) + self._update_gen_and_transaction_log(doc.doc_id) def _get_trans_id_for_gen(self, generation): self._get_u1db_data() @@ -184,14 +210,9 @@ class ObjectStore(CommonBackend): """ Create u1db data object in store. """ - self._replica_uid = uuid.uuid4().hex - doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) - doc.content = { 'transaction_log' : [], - 'sync_log' : [], - 'replica_uid' : self._replica_uid } - self._put_doc(doc) + NotImplementedError(self._initialize) - def _get_u1db_data(self, u1db_data_doc_id): + def _get_u1db_data(self): """ Fetch u1db configuration data from backend storage. """ @@ -201,11 +222,230 @@ class ObjectStore(CommonBackend): """ Save u1db configuration data on backend storage. """ - doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) - doc.content = { 'transaction_log' : self._transaction_log.log, - 'sync_log' : self._sync_log.log, - 'replica_uid' : self._replica_uid, - '_rev' : self._couch_rev} - self._put_doc(doc) + NotImplementedError(self._set_u1db_data) + + def _set_replica_uid(self, replica_uid): + self._replica_uid = replica_uid + self._set_u1db_data() + + def _get_replica_uid(self): + return self._replica_uid + + replica_uid = property( + _get_replica_uid, _set_replica_uid, doc="Replica UID of the database") + + + #------------------------------------------------------------------------- + # The methods below were cloned from u1db sqlite backend. They should at + # least exist and raise a NotImplementedError exception in CommonBackend + # (should we maybe fill a bug in u1db bts?). + #------------------------------------------------------------------------- + + def _add_conflict(self, doc_id, my_doc_rev, my_content): + self._conflict_log.append((doc_id, my_doc_rev, my_content)) + self._set_u1db_data() + + def _delete_conflicts(self, doc, conflict_revs): + deleting = [(doc.doc_id, c_rev) for c_rev in conflict_revs] + self._conflict_log.delete_conflicts(deleting) + self._set_u1db_data() + doc.has_conflicts = self._has_conflicts(doc.doc_id) + + def _prune_conflicts(self, doc, doc_vcr): + if self._has_conflicts(doc.doc_id): + autoresolved = False + c_revs_to_prune = [] + for c_doc in self._conflict_log.get_conflicts(doc.doc_id): + c_vcr = vectorclock.VectorClockRev(c_doc.rev) + if doc_vcr.is_newer(c_vcr): + c_revs_to_prune.append(c_doc.rev) + elif doc.same_content_as(c_doc): + c_revs_to_prune.append(c_doc.rev) + doc_vcr.maximize(c_vcr) + autoresolved = True + if autoresolved: + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._delete_conflicts(doc, c_revs_to_prune) + + def _force_doc_sync_conflict(self, doc): + my_doc = self._get_doc(doc.doc_id) + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + self._add_conflict(doc.doc_id, my_doc.rev, my_doc.get_json()) + doc.has_conflicts = True + self._put_and_update_indexes(my_doc, doc) + + +#---------------------------------------------------------------------------- +# U1DB's TransactionLog, SyncLog, ConflictLog, and Index +#---------------------------------------------------------------------------- + +class SimpleList(object): + def __init__(self): + self._data = [] + + def _set_data(self, data): + self._data = data + + def _get_data(self): + return self._data + + data = property( + _get_data, _set_data, doc="List contents.") + + def append(self, msg): + self._data.append(msg) + + def reduce(self, func, initializer=None): + return reduce(func, self._data, initializer) + + def map(self, func): + return map(func, self._get_data()) + + def filter(self, func): + return filter(func, self._get_data()) + + +class SimpleLog(SimpleList): + def _set_log(self, log): + self._data = log + + def _get_log(self): + return self._data + + log = property( + _get_log, _set_log, doc="Log contents.") + + +class TransactionLog(SimpleLog): + """ + An ordered list of (generation, doc_id, transaction_id) tuples. + """ + + def _set_log(self, log): + self._data = log + + def _get_data(self, reverse=True): + return sorted(self._data, reverse=reverse) + + _get_log = _get_data + + log = property( + _get_log, _set_log, doc="Log contents.") + + def get_generation(self): + """ + Return the current generation. + """ + gens = self.map(lambda x: x[0]) + if not gens: + return 0 + return max(gens) + + def get_generation_info(self): + """ + Return the current generation and transaction id. + """ + if not self._get_log(): + return(0, '') + info = self.map(lambda x: (x[0], x[2])) + return reduce(lambda x, y: x if (x[0] > y[0]) else y, info) + + def get_trans_id_for_gen(self, gen): + """ + Get the transaction id corresponding to a particular generation. + """ + log = self.reduce(lambda x, y: y if y[0] == gen else x) + if log is None: + return None + return log[2] + + def whats_changed(self, old_generation): + """ + Return a list of documents that have changed since old_generation. + """ + results = self.filter(lambda x: x[0] > old_generation) + seen = set() + changes = [] + newest_trans_id = '' + for generation, doc_id, trans_id in results: + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + if changes: + cur_gen = changes[0][1] # max generation + newest_trans_id = changes[0][2] + changes.reverse() + else: + results = self._get_log() + if not results: + cur_gen = 0 + newest_trans_id = '' + else: + cur_gen, _, newest_trans_id = results[0] + + return cur_gen, newest_trans_id, changes + + + def get_transaction_log(self): + """ + Return only a list of (doc_id, transaction_id) + """ + return map(lambda x: (x[1], x[2]), sorted(self._get_log(reverse=False))) + + +class SyncLog(SimpleLog): + """ + A list of (replica_id, generation, transaction_id) tuples. + """ + + def find_by_replica_uid(self, replica_uid): + if not self._get_log(): + return () + return self.reduce(lambda x, y: y if y[0] == replica_uid else x) + + def get_replica_gen_and_trans_id(self, other_replica_uid): + """ + Return the last known generation and transaction id for the other db + replica. + """ + info = self.find_by_replica_uid(other_replica_uid) + if not info: + return (0, '') + return (info[1], info[2]) + + def set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + """ + self._set_log(self.filter(lambda x: x[0] != other_replica_uid)) + self.append((other_replica_uid, other_generation, + other_transaction_id)) + +class ConflictLog(SimpleLog): + """ + A list of (doc_id, my_doc_rev, my_content) tuples. + """ + + def __init__(self, factory): + super(ConflictLog, self).__init__() + self._factory = factory + + def delete_conflicts(self, conflicts): + for conflict in conflicts: + self._set_log(self.filter(lambda x: + x[0] != conflict[0] or x[1] != conflict[1])) + + def get_conflicts(self, doc_id): + conflicts = self.filter(lambda x: x[0] == doc_id) + if not conflicts: + return [] + return reversed(map(lambda x: self._factory(doc_id, x[1], x[2]), + conflicts)) + + def has_conflicts(self, doc_id): + return bool(self.filter(lambda x: x[0] == doc_id)) |