summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad/common/couch.py
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/leap/soledad/common/couch.py')
-rw-r--r--common/src/leap/soledad/common/couch.py354
1 files changed, 143 insertions, 211 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index 6c28e0be..1c762036 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -87,8 +87,7 @@ class CouchDocument(SoledadDocument):
atomic and consistent update of the database.
"""
- def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False,
- syncable=True):
+ def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False):
"""
Container for handling a document that is stored in couch backend.
@@ -100,27 +99,10 @@ class CouchDocument(SoledadDocument):
:type json: str
:param has_conflicts: Boolean indicating if this document has conflicts
:type has_conflicts: bool
- :param syncable: Should this document be synced with remote replicas?
- :type syncable: bool
"""
SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts)
- self._couch_rev = None
- self._conflicts = None
- self._transactions = None
-
- def _ensure_fetch_conflicts(self, get_conflicts_fun):
- """
- Ensure conflict data has been fetched from the server.
-
- :param get_conflicts_fun: A function which, given the document id and
- the couch revision, return the conflicted
- versions of the current document.
- :type get_conflicts_fun: function
- """
- if self._conflicts is None:
- self._conflicts = get_conflicts_fun(self.doc_id,
- couch_rev=self.couch_rev)
- self.has_conflicts = len(self._conflicts) > 0
+ self.couch_rev = None
+ self.transactions = None
def get_conflicts(self):
"""
@@ -149,7 +131,7 @@ class CouchDocument(SoledadDocument):
:type doc: CouchDocument
"""
if self._conflicts is None:
- raise Exception("Run self._ensure_fetch_conflicts first!")
+ raise Exception("Fetch conflicts first!")
self._conflicts.append(doc)
self.has_conflicts = len(self._conflicts) > 0
@@ -161,27 +143,48 @@ class CouchDocument(SoledadDocument):
:type conflict_revs: [str]
"""
if self._conflicts is None:
- raise Exception("Run self._ensure_fetch_conflicts first!")
+ raise Exception("Fetch conflicts first!")
self._conflicts = filter(
lambda doc: doc.rev not in conflict_revs,
self._conflicts)
self.has_conflicts = len(self._conflicts) > 0
- def _get_couch_rev(self):
- return self._couch_rev
-
- def _set_couch_rev(self, rev):
- self._couch_rev = rev
-
- couch_rev = property(_get_couch_rev, _set_couch_rev)
-
- def _get_transactions(self):
- return self._transactions
+ def update(self, new_doc):
+ # update info
+ self.rev = new_doc.rev
+ if new_doc.is_tombstone():
+ self.is_tombstone()
+ else:
+ self.content = new_doc.content
+ self.has_conflicts = new_doc.has_conflicts
- def _set_transactions(self, rev):
- self._transactions = rev
+ def prune_conflicts(self, doc_vcr, autoresolved_increment):
+ """
+ Prune conflicts that are older then the current document's revision, or
+ whose content match to the current document's content.
+ Originally in u1db.CommonBackend
- transactions = property(_get_transactions, _set_transactions)
+ :param doc: The document to have conflicts pruned.
+ :type doc: CouchDocument
+ :param doc_vcr: A vector clock representing the current document's
+ revision.
+ :type doc_vcr: u1db.vectorclock.VectorClock
+ """
+ if self.has_conflicts:
+ autoresolved = False
+ c_revs_to_prune = []
+ for c_doc in self._conflicts:
+ c_vcr = vectorclock.VectorClockRev(c_doc.rev)
+ if doc_vcr.is_newer(c_vcr):
+ c_revs_to_prune.append(c_doc.rev)
+ elif self.same_content_as(c_doc):
+ c_revs_to_prune.append(c_doc.rev)
+ doc_vcr.maximize(c_vcr)
+ autoresolved = True
+ if autoresolved:
+ doc_vcr.increment(autoresolved_increment)
+ self.rev = doc_vcr.as_str()
+ self.delete_conflicts(c_revs_to_prune)
# monkey-patch the u1db http app to use CouchDocument
@@ -482,13 +485,10 @@ class CouchDatabase(CommonBackend):
Ensure that the design documents used by the backend exist on the
couch database.
"""
- # we check for existence of one of the files, and put all of them if
- # that one does not exist
- try:
- self._database['_design/docs']
- return
- except ResourceNotFound:
- for ddoc_name in ['docs', 'syncs', 'transactions']:
+ for ddoc_name in ['docs', 'syncs', 'transactions']:
+ try:
+ self._database.info(ddoc_name)
+ except ResourceNotFound:
ddoc = json.loads(
binascii.a2b_base64(
getattr(ddocs, ddoc_name)))
@@ -750,7 +750,6 @@ class CouchDatabase(CommonBackend):
if check_for_conflicts \
and '_attachments' in result \
and 'u1db_conflicts' in result['_attachments']:
- doc.has_conflicts = True
doc.set_conflicts(
self._build_conflicts(
doc.doc_id,
@@ -1044,7 +1043,7 @@ class CouchDatabase(CommonBackend):
conflicts.append(doc)
return conflicts
- def _get_conflicts(self, doc_id, couch_rev=None):
+ def get_doc_conflicts(self, doc_id, couch_rev=None):
"""
Get the conflicted versions of a document.
@@ -1059,32 +1058,21 @@ class CouchDatabase(CommonBackend):
"""
# request conflicts attachment from server
params = {}
+ conflicts = []
if couch_rev is not None:
params['rev'] = couch_rev # restric document's couch revision
+ else:
+ # TODO: move into resource logic!
+ first_entry = self._get_doc(doc_id, check_for_conflicts=True)
+ conflicts.append(first_entry)
resource = self._database.resource(doc_id, 'u1db_conflicts')
try:
response = resource.get_json(**params)
- return self._build_conflicts(
+ return conflicts + self._build_conflicts(
doc_id, json.loads(response[2].read()))
except ResourceNotFound:
return []
- def get_doc_conflicts(self, doc_id):
- """
- Get the list of conflicts for the given document.
-
- The order of the conflicts is such that the first entry is the value
- that would be returned by "get_doc".
-
- :return: A list of the document entries that are conflicted.
- :rtype: [CouchDocument]
- """
- conflict_docs = self._get_conflicts(doc_id)
- if len(conflict_docs) == 0:
- return []
- this_doc = self._get_doc(doc_id, check_for_conflicts=True)
- return [this_doc] + conflict_docs
-
def _get_replica_gen_and_trans_id(self, other_replica_uid):
"""
Return the last known generation and transaction id for the other db
@@ -1140,9 +1128,11 @@ class CouchDatabase(CommonBackend):
:param sync_id: The id of the current sync session.
:type sync_id: str
"""
- self._do_set_replica_gen_and_trans_id(
- other_replica_uid, other_generation, other_transaction_id,
- number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id)
+ if other_replica_uid is not None and other_generation is not None:
+ self._do_set_replica_gen_and_trans_id(
+ other_replica_uid, other_generation, other_transaction_id,
+ number_of_docs=number_of_docs, doc_idx=doc_idx,
+ sync_id=sync_id)
def _do_set_replica_gen_and_trans_id(
self, other_replica_uid, other_generation, other_transaction_id,
@@ -1206,70 +1196,6 @@ class CouchDatabase(CommonBackend):
except ResourceNotFound as e:
raise_missing_design_doc_error(e, ddoc_path)
- def _add_conflict(self, doc, my_doc_rev, my_content):
- """
- Add a conflict to the document.
-
- Note that this method does not actually update the backend; rather, it
- updates the CouchDocument object which will provide the conflict data
- when the atomic document update is made.
-
- :param doc: The document to have conflicts added to.
- :type doc: CouchDocument
- :param my_doc_rev: The revision of the conflicted document.
- :type my_doc_rev: str
- :param my_content: The content of the conflicted document as a JSON
- serialized string.
- :type my_content: str
- """
- doc._ensure_fetch_conflicts(self._get_conflicts)
- doc.add_conflict(
- self._factory(doc_id=doc.doc_id, rev=my_doc_rev,
- json=my_content))
-
- def _delete_conflicts(self, doc, conflict_revs):
- """
- Delete the conflicted revisions from the list of conflicts of C{doc}.
-
- Note that this method does not actually update the backend; rather, it
- updates the CouchDocument object which will provide the conflict data
- when the atomic document update is made.
-
- :param doc: The document to have conflicts deleted.
- :type doc: CouchDocument
- :param conflict_revs: A list of the revisions to be deleted.
- :param conflict_revs: [str]
- """
- doc._ensure_fetch_conflicts(self._get_conflicts)
- doc.delete_conflicts(conflict_revs)
-
- def _prune_conflicts(self, doc, doc_vcr):
- """
- Prune conflicts that are older then the current document's revision, or
- whose content match to the current document's content.
-
- :param doc: The document to have conflicts pruned.
- :type doc: CouchDocument
- :param doc_vcr: A vector clock representing the current document's
- revision.
- :type doc_vcr: u1db.vectorclock.VectorClock
- """
- if doc.has_conflicts is True:
- autoresolved = False
- c_revs_to_prune = []
- for c_doc in doc.get_conflicts():
- c_vcr = vectorclock.VectorClockRev(c_doc.rev)
- if doc_vcr.is_newer(c_vcr):
- c_revs_to_prune.append(c_doc.rev)
- elif doc.same_content_as(c_doc):
- c_revs_to_prune.append(c_doc.rev)
- doc_vcr.maximize(c_vcr)
- autoresolved = True
- if autoresolved:
- doc_vcr.increment(self._replica_uid)
- doc.rev = doc_vcr.as_str()
- self._delete_conflicts(doc, c_revs_to_prune)
-
def _force_doc_sync_conflict(self, doc):
"""
Add a conflict and force a document put.
@@ -1278,9 +1204,9 @@ class CouchDatabase(CommonBackend):
:type doc: CouchDocument
"""
my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev))
- self._add_conflict(doc, my_doc.rev, my_doc.get_json())
- doc.has_conflicts = True
+ doc.prune_conflicts(
+ vectorclock.VectorClockRev(doc.rev), self._replica_uid)
+ doc.add_conflict(my_doc)
self._put_doc(my_doc, doc)
def resolve_doc(self, doc, conflicted_doc_revs):
@@ -1325,14 +1251,14 @@ class CouchDatabase(CommonBackend):
# the newer doc version will supersede the one in the database, so
# we copy conflicts before updating the backend.
doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over.
- self._delete_conflicts(doc, superseded_revs)
+ doc.delete_conflicts(superseded_revs)
self._put_doc(cur_doc, doc)
else:
# the newer doc version does not supersede the one in the
# database, so we will add a conflict to the database and copy
# those over to the document the user has in her hands.
- self._add_conflict(cur_doc, new_rev, doc.get_json())
- self._delete_conflicts(cur_doc, superseded_revs)
+ cur_doc.add_conflict(doc)
+ cur_doc.delete_conflicts(superseded_revs)
self._put_doc(cur_doc, cur_doc) # just update conflicts
# backend has been updated with current conflicts, now copy them
# to the current document.
@@ -1392,65 +1318,33 @@ class CouchDatabase(CommonBackend):
'converged', at_gen is the insertion/current generation.
:rtype: (str, int)
"""
- cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- # at this point, `doc` has arrived from the other syncing party, and
- # we will decide what to do with it.
- # First, we prepare the arriving doc to update couch database.
- old_doc = doc
- doc = self._factory(doc.doc_id, doc.rev, doc.get_json())
- if cur_doc is not None:
- doc.couch_rev = cur_doc.couch_rev
- # fetch conflicts because we will eventually manipulate them
- doc._ensure_fetch_conflicts(self._get_conflicts)
- # from now on, it works just like u1db sqlite backend
- doc_vcr = vectorclock.VectorClockRev(doc.rev)
- if cur_doc is None:
- cur_vcr = vectorclock.VectorClockRev(None)
- else:
- cur_vcr = vectorclock.VectorClockRev(cur_doc.rev)
- self._validate_source(replica_uid, replica_gen, replica_trans_id)
- if doc_vcr.is_newer(cur_vcr):
- rev = doc.rev
- self._prune_conflicts(doc, doc_vcr)
- if doc.rev != rev:
- # conflicts have been autoresolved
- state = 'superseded'
- else:
- state = 'inserted'
- self._put_doc(cur_doc, doc)
- elif doc.rev == cur_doc.rev:
- # magical convergence
- state = 'converged'
- elif cur_vcr.is_newer(doc_vcr):
- # Don't add this to seen_ids, because we have something newer,
- # so we should send it back, and we should not generate a
- # conflict
- state = 'superseded'
- elif cur_doc.same_content_as(doc):
- # the documents have been edited to the same thing at both ends
- doc_vcr.maximize(cur_vcr)
- doc_vcr.increment(self._replica_uid)
- doc.rev = doc_vcr.as_str()
- self._put_doc(cur_doc, doc)
- state = 'superseded'
- else:
- state = 'conflicted'
- if save_conflict:
- self._force_doc_sync_conflict(doc)
- if replica_uid is not None and replica_gen is not None:
- self._set_replica_gen_and_trans_id(
- replica_uid, replica_gen, replica_trans_id,
- number_of_docs=number_of_docs, doc_idx=doc_idx,
- sync_id=sync_id)
- # update info
- old_doc.rev = doc.rev
- if doc.is_tombstone():
- old_doc.is_tombstone()
- else:
- old_doc.content = doc.content
- old_doc.has_conflicts = doc.has_conflicts
+ if not isinstance(doc, CouchDocument):
+ doc = self._factory(doc.doc_id, doc.rev, doc.get_json())
+ self._save_source_info(replica_uid, replica_gen,
+ replica_trans_id, number_of_docs,
+ doc_idx, sync_id)
+ my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
+ if my_doc is not None:
+ my_doc.set_conflicts(
+ self.get_doc_conflicts(my_doc.doc_id, my_doc.couch_rev))
+ state, save_doc = _process_incoming_doc(
+ my_doc, doc, save_conflict, self.replica_uid)
+ if save_doc:
+ self._put_doc(my_doc, save_doc)
+ doc.update(save_doc)
return state, self._get_generation()
+ def _save_source_info(self, replica_uid, replica_gen, replica_trans_id,
+ number_of_docs, doc_idx, sync_id):
+ """
+ Validate and save source information.
+ """
+ self._validate_source(replica_uid, replica_gen, replica_trans_id)
+ self._set_replica_gen_and_trans_id(
+ replica_uid, replica_gen, replica_trans_id,
+ number_of_docs=number_of_docs, doc_idx=doc_idx,
+ sync_id=sync_id)
+
def get_docs(self, doc_ids, check_for_conflicts=True,
include_deleted=False):
"""
@@ -1495,6 +1389,13 @@ class CouchDatabase(CommonBackend):
continue
yield t._doc
+ def _prune_conflicts(self, doc, doc_vcr):
+ """
+ Overrides original method, but it is implemented elsewhere for
+ simplicity.
+ """
+ doc.prune_conflicts(doc_vcr, self._replica_uid)
+
def _new_resource(self, *path):
"""
Return a new resource for accessing a couch database.
@@ -1546,7 +1447,7 @@ class CouchServerState(ServerState):
:param couch_url: The URL for the couch database.
:type couch_url: str
"""
- self._couch_url = couch_url
+ self.couch_url = couch_url
def open_database(self, dbname):
"""
@@ -1559,7 +1460,7 @@ class CouchServerState(ServerState):
:rtype: CouchDatabase
"""
return CouchDatabase(
- self._couch_url,
+ self.couch_url,
dbname,
ensure_ddocs=False)
@@ -1594,21 +1495,52 @@ class CouchServerState(ServerState):
"""
raise Unauthorized()
- def _set_couch_url(self, url):
- """
- Set the couchdb URL
-
- :param url: CouchDB URL
- :type url: str
- """
- self._couch_url = url
-
- def _get_couch_url(self):
- """
- Return CouchDB URL
- :rtype: str
- """
- return self._couch_url
-
- couch_url = property(_get_couch_url, _set_couch_url, doc='CouchDB URL')
+def _process_incoming_doc(my_doc, other_doc, save_conflict, replica_uid):
+ """
+ Check document, save and return state.
+ """
+ # at this point, `doc` has arrived from the other syncing party, and
+ # we will decide what to do with it.
+ # First, we prepare the arriving doc to update couch database.
+ new_doc = CouchDocument(
+ other_doc.doc_id, other_doc.rev, other_doc.get_json())
+ if my_doc is None:
+ return 'inserted', new_doc
+ new_doc.couch_rev = my_doc.couch_rev
+ new_doc.set_conflicts(my_doc.get_conflicts())
+ # fetch conflicts because we will eventually manipulate them
+ # from now on, it works just like u1db sqlite backend
+ doc_vcr = vectorclock.VectorClockRev(new_doc.rev)
+ cur_vcr = vectorclock.VectorClockRev(my_doc.rev)
+ if doc_vcr.is_newer(cur_vcr):
+ rev = new_doc.rev
+ new_doc.prune_conflicts(doc_vcr, replica_uid)
+ if new_doc.rev != rev:
+ # conflicts have been autoresolved
+ return 'superseded', new_doc
+ else:
+ return'inserted', new_doc
+ elif new_doc.rev == my_doc.rev:
+ # magical convergence
+ return 'converged', None
+ elif cur_vcr.is_newer(doc_vcr):
+ # Don't add this to seen_ids, because we have something newer,
+ # so we should send it back, and we should not generate a
+ # conflict
+ other_doc.update(new_doc)
+ return 'superseded', None
+ elif my_doc.same_content_as(new_doc):
+ # the documents have been edited to the same thing at both ends
+ doc_vcr.maximize(cur_vcr)
+ doc_vcr.increment(replica_uid)
+ new_doc.rev = doc_vcr.as_str()
+ return 'superseded', new_doc
+ else:
+ if save_conflict:
+ new_doc.prune_conflicts(
+ vectorclock.VectorClockRev(new_doc.rev), replica_uid)
+ new_doc.add_conflict(my_doc)
+ return 'conflicted', new_doc
+ other_doc.update(new_doc)
+ return 'conflicted', None