diff options
Diffstat (limited to 'common/src/leap/soledad/common/couch.py')
-rw-r--r-- | common/src/leap/soledad/common/couch.py | 354 |
1 files changed, 143 insertions, 211 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 6c28e0be..1c762036 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -87,8 +87,7 @@ class CouchDocument(SoledadDocument): atomic and consistent update of the database. """ - def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, - syncable=True): + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False): """ Container for handling a document that is stored in couch backend. @@ -100,27 +99,10 @@ class CouchDocument(SoledadDocument): :type json: str :param has_conflicts: Boolean indicating if this document has conflicts :type has_conflicts: bool - :param syncable: Should this document be synced with remote replicas? - :type syncable: bool """ SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) - self._couch_rev = None - self._conflicts = None - self._transactions = None - - def _ensure_fetch_conflicts(self, get_conflicts_fun): - """ - Ensure conflict data has been fetched from the server. - - :param get_conflicts_fun: A function which, given the document id and - the couch revision, return the conflicted - versions of the current document. - :type get_conflicts_fun: function - """ - if self._conflicts is None: - self._conflicts = get_conflicts_fun(self.doc_id, - couch_rev=self.couch_rev) - self.has_conflicts = len(self._conflicts) > 0 + self.couch_rev = None + self.transactions = None def get_conflicts(self): """ @@ -149,7 +131,7 @@ class CouchDocument(SoledadDocument): :type doc: CouchDocument """ if self._conflicts is None: - raise Exception("Run self._ensure_fetch_conflicts first!") + raise Exception("Fetch conflicts first!") self._conflicts.append(doc) self.has_conflicts = len(self._conflicts) > 0 @@ -161,27 +143,48 @@ class CouchDocument(SoledadDocument): :type conflict_revs: [str] """ if self._conflicts is None: - raise Exception("Run self._ensure_fetch_conflicts first!") + raise Exception("Fetch conflicts first!") self._conflicts = filter( lambda doc: doc.rev not in conflict_revs, self._conflicts) self.has_conflicts = len(self._conflicts) > 0 - def _get_couch_rev(self): - return self._couch_rev - - def _set_couch_rev(self, rev): - self._couch_rev = rev - - couch_rev = property(_get_couch_rev, _set_couch_rev) - - def _get_transactions(self): - return self._transactions + def update(self, new_doc): + # update info + self.rev = new_doc.rev + if new_doc.is_tombstone(): + self.is_tombstone() + else: + self.content = new_doc.content + self.has_conflicts = new_doc.has_conflicts - def _set_transactions(self, rev): - self._transactions = rev + def prune_conflicts(self, doc_vcr, autoresolved_increment): + """ + Prune conflicts that are older then the current document's revision, or + whose content match to the current document's content. + Originally in u1db.CommonBackend - transactions = property(_get_transactions, _set_transactions) + :param doc: The document to have conflicts pruned. + :type doc: CouchDocument + :param doc_vcr: A vector clock representing the current document's + revision. + :type doc_vcr: u1db.vectorclock.VectorClock + """ + if self.has_conflicts: + autoresolved = False + c_revs_to_prune = [] + for c_doc in self._conflicts: + c_vcr = vectorclock.VectorClockRev(c_doc.rev) + if doc_vcr.is_newer(c_vcr): + c_revs_to_prune.append(c_doc.rev) + elif self.same_content_as(c_doc): + c_revs_to_prune.append(c_doc.rev) + doc_vcr.maximize(c_vcr) + autoresolved = True + if autoresolved: + doc_vcr.increment(autoresolved_increment) + self.rev = doc_vcr.as_str() + self.delete_conflicts(c_revs_to_prune) # monkey-patch the u1db http app to use CouchDocument @@ -482,13 +485,10 @@ class CouchDatabase(CommonBackend): Ensure that the design documents used by the backend exist on the couch database. """ - # we check for existence of one of the files, and put all of them if - # that one does not exist - try: - self._database['_design/docs'] - return - except ResourceNotFound: - for ddoc_name in ['docs', 'syncs', 'transactions']: + for ddoc_name in ['docs', 'syncs', 'transactions']: + try: + self._database.info(ddoc_name) + except ResourceNotFound: ddoc = json.loads( binascii.a2b_base64( getattr(ddocs, ddoc_name))) @@ -750,7 +750,6 @@ class CouchDatabase(CommonBackend): if check_for_conflicts \ and '_attachments' in result \ and 'u1db_conflicts' in result['_attachments']: - doc.has_conflicts = True doc.set_conflicts( self._build_conflicts( doc.doc_id, @@ -1044,7 +1043,7 @@ class CouchDatabase(CommonBackend): conflicts.append(doc) return conflicts - def _get_conflicts(self, doc_id, couch_rev=None): + def get_doc_conflicts(self, doc_id, couch_rev=None): """ Get the conflicted versions of a document. @@ -1059,32 +1058,21 @@ class CouchDatabase(CommonBackend): """ # request conflicts attachment from server params = {} + conflicts = [] if couch_rev is not None: params['rev'] = couch_rev # restric document's couch revision + else: + # TODO: move into resource logic! + first_entry = self._get_doc(doc_id, check_for_conflicts=True) + conflicts.append(first_entry) resource = self._database.resource(doc_id, 'u1db_conflicts') try: response = resource.get_json(**params) - return self._build_conflicts( + return conflicts + self._build_conflicts( doc_id, json.loads(response[2].read())) except ResourceNotFound: return [] - def get_doc_conflicts(self, doc_id): - """ - Get the list of conflicts for the given document. - - The order of the conflicts is such that the first entry is the value - that would be returned by "get_doc". - - :return: A list of the document entries that are conflicted. - :rtype: [CouchDocument] - """ - conflict_docs = self._get_conflicts(doc_id) - if len(conflict_docs) == 0: - return [] - this_doc = self._get_doc(doc_id, check_for_conflicts=True) - return [this_doc] + conflict_docs - def _get_replica_gen_and_trans_id(self, other_replica_uid): """ Return the last known generation and transaction id for the other db @@ -1140,9 +1128,11 @@ class CouchDatabase(CommonBackend): :param sync_id: The id of the current sync session. :type sync_id: str """ - self._do_set_replica_gen_and_trans_id( - other_replica_uid, other_generation, other_transaction_id, - number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id) + if other_replica_uid is not None and other_generation is not None: + self._do_set_replica_gen_and_trans_id( + other_replica_uid, other_generation, other_transaction_id, + number_of_docs=number_of_docs, doc_idx=doc_idx, + sync_id=sync_id) def _do_set_replica_gen_and_trans_id( self, other_replica_uid, other_generation, other_transaction_id, @@ -1206,70 +1196,6 @@ class CouchDatabase(CommonBackend): except ResourceNotFound as e: raise_missing_design_doc_error(e, ddoc_path) - def _add_conflict(self, doc, my_doc_rev, my_content): - """ - Add a conflict to the document. - - Note that this method does not actually update the backend; rather, it - updates the CouchDocument object which will provide the conflict data - when the atomic document update is made. - - :param doc: The document to have conflicts added to. - :type doc: CouchDocument - :param my_doc_rev: The revision of the conflicted document. - :type my_doc_rev: str - :param my_content: The content of the conflicted document as a JSON - serialized string. - :type my_content: str - """ - doc._ensure_fetch_conflicts(self._get_conflicts) - doc.add_conflict( - self._factory(doc_id=doc.doc_id, rev=my_doc_rev, - json=my_content)) - - def _delete_conflicts(self, doc, conflict_revs): - """ - Delete the conflicted revisions from the list of conflicts of C{doc}. - - Note that this method does not actually update the backend; rather, it - updates the CouchDocument object which will provide the conflict data - when the atomic document update is made. - - :param doc: The document to have conflicts deleted. - :type doc: CouchDocument - :param conflict_revs: A list of the revisions to be deleted. - :param conflict_revs: [str] - """ - doc._ensure_fetch_conflicts(self._get_conflicts) - doc.delete_conflicts(conflict_revs) - - def _prune_conflicts(self, doc, doc_vcr): - """ - Prune conflicts that are older then the current document's revision, or - whose content match to the current document's content. - - :param doc: The document to have conflicts pruned. - :type doc: CouchDocument - :param doc_vcr: A vector clock representing the current document's - revision. - :type doc_vcr: u1db.vectorclock.VectorClock - """ - if doc.has_conflicts is True: - autoresolved = False - c_revs_to_prune = [] - for c_doc in doc.get_conflicts(): - c_vcr = vectorclock.VectorClockRev(c_doc.rev) - if doc_vcr.is_newer(c_vcr): - c_revs_to_prune.append(c_doc.rev) - elif doc.same_content_as(c_doc): - c_revs_to_prune.append(c_doc.rev) - doc_vcr.maximize(c_vcr) - autoresolved = True - if autoresolved: - doc_vcr.increment(self._replica_uid) - doc.rev = doc_vcr.as_str() - self._delete_conflicts(doc, c_revs_to_prune) - def _force_doc_sync_conflict(self, doc): """ Add a conflict and force a document put. @@ -1278,9 +1204,9 @@ class CouchDatabase(CommonBackend): :type doc: CouchDocument """ my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) - self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) - self._add_conflict(doc, my_doc.rev, my_doc.get_json()) - doc.has_conflicts = True + doc.prune_conflicts( + vectorclock.VectorClockRev(doc.rev), self._replica_uid) + doc.add_conflict(my_doc) self._put_doc(my_doc, doc) def resolve_doc(self, doc, conflicted_doc_revs): @@ -1325,14 +1251,14 @@ class CouchDatabase(CommonBackend): # the newer doc version will supersede the one in the database, so # we copy conflicts before updating the backend. doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over. - self._delete_conflicts(doc, superseded_revs) + doc.delete_conflicts(superseded_revs) self._put_doc(cur_doc, doc) else: # the newer doc version does not supersede the one in the # database, so we will add a conflict to the database and copy # those over to the document the user has in her hands. - self._add_conflict(cur_doc, new_rev, doc.get_json()) - self._delete_conflicts(cur_doc, superseded_revs) + cur_doc.add_conflict(doc) + cur_doc.delete_conflicts(superseded_revs) self._put_doc(cur_doc, cur_doc) # just update conflicts # backend has been updated with current conflicts, now copy them # to the current document. @@ -1392,65 +1318,33 @@ class CouchDatabase(CommonBackend): 'converged', at_gen is the insertion/current generation. :rtype: (str, int) """ - cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) - # at this point, `doc` has arrived from the other syncing party, and - # we will decide what to do with it. - # First, we prepare the arriving doc to update couch database. - old_doc = doc - doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) - if cur_doc is not None: - doc.couch_rev = cur_doc.couch_rev - # fetch conflicts because we will eventually manipulate them - doc._ensure_fetch_conflicts(self._get_conflicts) - # from now on, it works just like u1db sqlite backend - doc_vcr = vectorclock.VectorClockRev(doc.rev) - if cur_doc is None: - cur_vcr = vectorclock.VectorClockRev(None) - else: - cur_vcr = vectorclock.VectorClockRev(cur_doc.rev) - self._validate_source(replica_uid, replica_gen, replica_trans_id) - if doc_vcr.is_newer(cur_vcr): - rev = doc.rev - self._prune_conflicts(doc, doc_vcr) - if doc.rev != rev: - # conflicts have been autoresolved - state = 'superseded' - else: - state = 'inserted' - self._put_doc(cur_doc, doc) - elif doc.rev == cur_doc.rev: - # magical convergence - state = 'converged' - elif cur_vcr.is_newer(doc_vcr): - # Don't add this to seen_ids, because we have something newer, - # so we should send it back, and we should not generate a - # conflict - state = 'superseded' - elif cur_doc.same_content_as(doc): - # the documents have been edited to the same thing at both ends - doc_vcr.maximize(cur_vcr) - doc_vcr.increment(self._replica_uid) - doc.rev = doc_vcr.as_str() - self._put_doc(cur_doc, doc) - state = 'superseded' - else: - state = 'conflicted' - if save_conflict: - self._force_doc_sync_conflict(doc) - if replica_uid is not None and replica_gen is not None: - self._set_replica_gen_and_trans_id( - replica_uid, replica_gen, replica_trans_id, - number_of_docs=number_of_docs, doc_idx=doc_idx, - sync_id=sync_id) - # update info - old_doc.rev = doc.rev - if doc.is_tombstone(): - old_doc.is_tombstone() - else: - old_doc.content = doc.content - old_doc.has_conflicts = doc.has_conflicts + if not isinstance(doc, CouchDocument): + doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) + self._save_source_info(replica_uid, replica_gen, + replica_trans_id, number_of_docs, + doc_idx, sync_id) + my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if my_doc is not None: + my_doc.set_conflicts( + self.get_doc_conflicts(my_doc.doc_id, my_doc.couch_rev)) + state, save_doc = _process_incoming_doc( + my_doc, doc, save_conflict, self.replica_uid) + if save_doc: + self._put_doc(my_doc, save_doc) + doc.update(save_doc) return state, self._get_generation() + def _save_source_info(self, replica_uid, replica_gen, replica_trans_id, + number_of_docs, doc_idx, sync_id): + """ + Validate and save source information. + """ + self._validate_source(replica_uid, replica_gen, replica_trans_id) + self._set_replica_gen_and_trans_id( + replica_uid, replica_gen, replica_trans_id, + number_of_docs=number_of_docs, doc_idx=doc_idx, + sync_id=sync_id) + def get_docs(self, doc_ids, check_for_conflicts=True, include_deleted=False): """ @@ -1495,6 +1389,13 @@ class CouchDatabase(CommonBackend): continue yield t._doc + def _prune_conflicts(self, doc, doc_vcr): + """ + Overrides original method, but it is implemented elsewhere for + simplicity. + """ + doc.prune_conflicts(doc_vcr, self._replica_uid) + def _new_resource(self, *path): """ Return a new resource for accessing a couch database. @@ -1546,7 +1447,7 @@ class CouchServerState(ServerState): :param couch_url: The URL for the couch database. :type couch_url: str """ - self._couch_url = couch_url + self.couch_url = couch_url def open_database(self, dbname): """ @@ -1559,7 +1460,7 @@ class CouchServerState(ServerState): :rtype: CouchDatabase """ return CouchDatabase( - self._couch_url, + self.couch_url, dbname, ensure_ddocs=False) @@ -1594,21 +1495,52 @@ class CouchServerState(ServerState): """ raise Unauthorized() - def _set_couch_url(self, url): - """ - Set the couchdb URL - - :param url: CouchDB URL - :type url: str - """ - self._couch_url = url - - def _get_couch_url(self): - """ - Return CouchDB URL - :rtype: str - """ - return self._couch_url - - couch_url = property(_get_couch_url, _set_couch_url, doc='CouchDB URL') +def _process_incoming_doc(my_doc, other_doc, save_conflict, replica_uid): + """ + Check document, save and return state. + """ + # at this point, `doc` has arrived from the other syncing party, and + # we will decide what to do with it. + # First, we prepare the arriving doc to update couch database. + new_doc = CouchDocument( + other_doc.doc_id, other_doc.rev, other_doc.get_json()) + if my_doc is None: + return 'inserted', new_doc + new_doc.couch_rev = my_doc.couch_rev + new_doc.set_conflicts(my_doc.get_conflicts()) + # fetch conflicts because we will eventually manipulate them + # from now on, it works just like u1db sqlite backend + doc_vcr = vectorclock.VectorClockRev(new_doc.rev) + cur_vcr = vectorclock.VectorClockRev(my_doc.rev) + if doc_vcr.is_newer(cur_vcr): + rev = new_doc.rev + new_doc.prune_conflicts(doc_vcr, replica_uid) + if new_doc.rev != rev: + # conflicts have been autoresolved + return 'superseded', new_doc + else: + return'inserted', new_doc + elif new_doc.rev == my_doc.rev: + # magical convergence + return 'converged', None + elif cur_vcr.is_newer(doc_vcr): + # Don't add this to seen_ids, because we have something newer, + # so we should send it back, and we should not generate a + # conflict + other_doc.update(new_doc) + return 'superseded', None + elif my_doc.same_content_as(new_doc): + # the documents have been edited to the same thing at both ends + doc_vcr.maximize(cur_vcr) + doc_vcr.increment(replica_uid) + new_doc.rev = doc_vcr.as_str() + return 'superseded', new_doc + else: + if save_conflict: + new_doc.prune_conflicts( + vectorclock.VectorClockRev(new_doc.rev), replica_uid) + new_doc.add_conflict(my_doc) + return 'conflicted', new_doc + other_doc.update(new_doc) + return 'conflicted', None |