summaryrefslogtreecommitdiff
path: root/src/leap/soledad/u1db/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad/u1db/sync.py')
-rw-r--r--src/leap/soledad/u1db/sync.py304
1 files changed, 304 insertions, 0 deletions
diff --git a/src/leap/soledad/u1db/sync.py b/src/leap/soledad/u1db/sync.py
new file mode 100644
index 00000000..3375d097
--- /dev/null
+++ b/src/leap/soledad/u1db/sync.py
@@ -0,0 +1,304 @@
+# Copyright 2011 Canonical Ltd.
+#
+# This file is part of u1db.
+#
+# u1db is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation.
+#
+# u1db is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with u1db. If not, see <http://www.gnu.org/licenses/>.
+
+"""The synchronization utilities for U1DB."""
+from itertools import izip
+
+import u1db
+from u1db import errors
+
+
+class Synchronizer(object):
+ """Collect the state around synchronizing 2 U1DB replicas.
+
+ Synchronization is bi-directional, in that new items in the source are sent
+ to the target, and new items in the target are returned to the source.
+ However, it still recognizes that one side is initiating the request. Also,
+ at the moment, conflicts are only created in the source.
+ """
+
+ def __init__(self, source, sync_target):
+ """Create a new Synchronization object.
+
+ :param source: A Database
+ :param sync_target: A SyncTarget
+ """
+ self.source = source
+ self.sync_target = sync_target
+ self.target_replica_uid = None
+ self.num_inserted = 0
+
+ def _insert_doc_from_target(self, doc, replica_gen, trans_id):
+ """Try to insert synced document from target.
+
+ Implements TAKE OTHER semantics: any document from the target
+ that is in conflict will be taken as the new official value,
+ while the current conflicting value will be stored alongside
+ as a conflict. In the process indexes will be updated etc.
+
+ :return: None
+ """
+ # Increases self.num_inserted depending whether the document
+ # was effectively inserted.
+ state, _ = self.source._put_doc_if_newer(doc, save_conflict=True,
+ replica_uid=self.target_replica_uid, replica_gen=replica_gen,
+ replica_trans_id=trans_id)
+ if state == 'inserted':
+ self.num_inserted += 1
+ elif state == 'converged':
+ # magical convergence
+ pass
+ elif state == 'superseded':
+ # we have something newer, will be taken care of at the next sync
+ pass
+ else:
+ assert state == 'conflicted'
+ # The doc was saved as a conflict, so the database was updated
+ self.num_inserted += 1
+
+ def _record_sync_info_with_the_target(self, start_generation):
+ """Record our new after sync generation with the target if gapless.
+
+ Any documents received from the target will cause the local
+ database to increment its generation. We do not want to send
+ them back to the target in a future sync. However, there could
+ also be concurrent updates from another process doing eg
+ 'put_doc' while the sync was running. And we do want to
+ synchronize those documents. We can tell if there was a
+ concurrent update by comparing our new generation number
+ versus the generation we started, and how many documents we
+ inserted from the target. If it matches exactly, then we can
+ record with the target that they are fully up to date with our
+ new generation.
+ """
+ cur_gen, trans_id = self.source._get_generation_info()
+ if (cur_gen == start_generation + self.num_inserted
+ and self.num_inserted > 0):
+ self.sync_target.record_sync_info(
+ self.source._replica_uid, cur_gen, trans_id)
+
+ def sync(self, callback=None, autocreate=False):
+ """Synchronize documents between source and target."""
+ sync_target = self.sync_target
+ # get target identifier, its current generation,
+ # and its last-seen database generation for this source
+ try:
+ (self.target_replica_uid, target_gen, target_trans_id,
+ target_my_gen, target_my_trans_id) = sync_target.get_sync_info(
+ self.source._replica_uid)
+ except errors.DatabaseDoesNotExist:
+ if not autocreate:
+ raise
+ # will try to ask sync_exchange() to create the db
+ self.target_replica_uid = None
+ target_gen, target_trans_id = 0, ''
+ target_my_gen, target_my_trans_id = 0, ''
+ def ensure_callback(replica_uid):
+ self.target_replica_uid = replica_uid
+ else:
+ ensure_callback = None
+ # validate the generation and transaction id the target knows about us
+ self.source.validate_gen_and_trans_id(
+ target_my_gen, target_my_trans_id)
+ # what's changed since that generation and this current gen
+ my_gen, _, changes = self.source.whats_changed(target_my_gen)
+
+ # this source last-seen database generation for the target
+ if self.target_replica_uid is None:
+ target_last_known_gen, target_last_known_trans_id = 0, ''
+ else:
+ target_last_known_gen, target_last_known_trans_id = \
+ self.source._get_replica_gen_and_trans_id(self.target_replica_uid)
+ if not changes and target_last_known_gen == target_gen:
+ if target_trans_id != target_last_known_trans_id:
+ raise errors.InvalidTransactionId
+ return my_gen
+ changed_doc_ids = [doc_id for doc_id, _, _ in changes]
+ # prepare to send all the changed docs
+ docs_to_send = self.source.get_docs(changed_doc_ids,
+ check_for_conflicts=False, include_deleted=True)
+ # TODO: there must be a way to not iterate twice
+ docs_by_generation = zip(
+ docs_to_send, (gen for _, gen, _ in changes),
+ (trans for _, _, trans in changes))
+
+ # exchange documents and try to insert the returned ones with
+ # the target, return target synced-up-to gen
+ new_gen, new_trans_id = sync_target.sync_exchange(
+ docs_by_generation, self.source._replica_uid,
+ target_last_known_gen, target_last_known_trans_id,
+ self._insert_doc_from_target, ensure_callback=ensure_callback)
+ # record target synced-up-to generation including applying what we sent
+ self.source._set_replica_gen_and_trans_id(
+ self.target_replica_uid, new_gen, new_trans_id)
+
+ # if gapless record current reached generation with target
+ self._record_sync_info_with_the_target(my_gen)
+
+ return my_gen
+
+
+class SyncExchange(object):
+ """Steps and state for carrying through a sync exchange on a target."""
+
+ def __init__(self, db, source_replica_uid, last_known_generation):
+ self._db = db
+ self.source_replica_uid = source_replica_uid
+ self.source_last_known_generation = last_known_generation
+ self.seen_ids = {} # incoming ids not superseded
+ self.changes_to_return = None
+ self.new_gen = None
+ self.new_trans_id = None
+ # for tests
+ self._incoming_trace = []
+ self._trace_hook = None
+ self._db._last_exchange_log = {
+ 'receive': {'docs': self._incoming_trace},
+ 'return': None
+ }
+
+ def _set_trace_hook(self, cb):
+ self._trace_hook = cb
+
+ def _trace(self, state):
+ if not self._trace_hook:
+ return
+ self._trace_hook(state)
+
+ def insert_doc_from_source(self, doc, source_gen, trans_id):
+ """Try to insert synced document from source.
+
+ Conflicting documents are not inserted but will be sent over
+ to the sync source.
+
+ It keeps track of progress by storing the document source
+ generation as well.
+
+ The 1st step of a sync exchange is to call this repeatedly to
+ try insert all incoming documents from the source.
+
+ :param doc: A Document object.
+ :param source_gen: The source generation of doc.
+ :return: None
+ """
+ state, at_gen = self._db._put_doc_if_newer(doc, save_conflict=False,
+ replica_uid=self.source_replica_uid, replica_gen=source_gen,
+ replica_trans_id=trans_id)
+ if state == 'inserted':
+ self.seen_ids[doc.doc_id] = at_gen
+ elif state == 'converged':
+ # magical convergence
+ self.seen_ids[doc.doc_id] = at_gen
+ elif state == 'superseded':
+ # we have something newer that we will return
+ pass
+ else:
+ # conflict that we will returne
+ assert state == 'conflicted'
+ # for tests
+ self._incoming_trace.append((doc.doc_id, doc.rev))
+ self._db._last_exchange_log['receive'].update({
+ 'source_uid': self.source_replica_uid,
+ 'source_gen': source_gen
+ })
+
+ def find_changes_to_return(self):
+ """Find changes to return.
+
+ Find changes since last_known_generation in db generation
+ order using whats_changed. It excludes documents ids that have
+ already been considered (superseded by the sender, etc).
+
+ :return: new_generation - the generation of this database
+ which the caller can consider themselves to be synchronized after
+ processing the returned documents.
+ """
+ self._db._last_exchange_log['receive'].update({ # for tests
+ 'last_known_gen': self.source_last_known_generation
+ })
+ self._trace('before whats_changed')
+ gen, trans_id, changes = self._db.whats_changed(
+ self.source_last_known_generation)
+ self._trace('after whats_changed')
+ self.new_gen = gen
+ self.new_trans_id = trans_id
+ seen_ids = self.seen_ids
+ # changed docs that weren't superseded by or converged with
+ self.changes_to_return = [
+ (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes
+ # there was a subsequent update
+ if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]
+ return self.new_gen
+
+ def return_docs(self, return_doc_cb):
+ """Return the changed documents and their last change generation
+ repeatedly invoking the callback return_doc_cb.
+
+ The final step of a sync exchange.
+
+ :param: return_doc_cb(doc, gen, trans_id): is a callback
+ used to return the documents with their last change generation
+ to the target replica.
+ :return: None
+ """
+ changes_to_return = self.changes_to_return
+ # return docs, including conflicts
+ changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return]
+ self._trace('before get_docs')
+ docs = self._db.get_docs(
+ changed_doc_ids, check_for_conflicts=False, include_deleted=True)
+
+ docs_by_gen = izip(
+ docs, (gen for _, gen, _ in changes_to_return),
+ (trans_id for _, _, trans_id in changes_to_return))
+ _outgoing_trace = [] # for tests
+ for doc, gen, trans_id in docs_by_gen:
+ return_doc_cb(doc, gen, trans_id)
+ _outgoing_trace.append((doc.doc_id, doc.rev))
+ # for tests
+ self._db._last_exchange_log['return'] = {
+ 'docs': _outgoing_trace,
+ 'last_gen': self.new_gen
+ }
+
+
+class LocalSyncTarget(u1db.SyncTarget):
+ """Common sync target implementation logic for all local sync targets."""
+
+ def __init__(self, db):
+ self._db = db
+ self._trace_hook = None
+
+ def sync_exchange(self, docs_by_generations, source_replica_uid,
+ last_known_generation, last_known_trans_id,
+ return_doc_cb, ensure_callback=None):
+ self._db.validate_gen_and_trans_id(
+ last_known_generation, last_known_trans_id)
+ sync_exch = SyncExchange(
+ self._db, source_replica_uid, last_known_generation)
+ if self._trace_hook:
+ sync_exch._set_trace_hook(self._trace_hook)
+ # 1st step: try to insert incoming docs and record progress
+ for doc, doc_gen, trans_id in docs_by_generations:
+ sync_exch.insert_doc_from_source(doc, doc_gen, trans_id)
+ # 2nd step: find changed documents (including conflicts) to return
+ new_gen = sync_exch.find_changes_to_return()
+ # final step: return docs and record source replica sync point
+ sync_exch.return_docs(return_doc_cb)
+ return new_gen, sync_exch.new_trans_id
+
+ def _set_trace_hook(self, cb):
+ self._trace_hook = cb