summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/sync.py')
-rw-r--r--client/src/leap/soledad/client/sync.py163
1 files changed, 150 insertions, 13 deletions
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index 56e63416..5d545a77 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -17,35 +17,85 @@
"""
-Sync infrastructure that can be interrupted and recovered.
+Soledad synchronization utilities.
+
+
+Extend u1db Synchronizer with the ability to:
+
+ * Defer the update of the known replica uid until all the decryption of
+ the incoming messages has been processed.
+
+ * Be interrupted and recovered.
"""
+
import json
+import logging
+import traceback
+from threading import Lock
from u1db import errors
-from u1db.sync import Synchronizer as U1DBSynchronizer
+from u1db.sync import Synchronizer
-class Synchronizer(U1DBSynchronizer):
+logger = logging.getLogger(__name__)
+
+
+class SoledadSynchronizer(Synchronizer):
"""
Collect the state around synchronizing 2 U1DB replicas.
- Modified to allow for interrupting the synchronization process.
+ Synchronization is bi-directional, in that new items in the source are sent
+ to the target, and new items in the target are returned to the source.
+ However, it still recognizes that one side is initiating the request. Also,
+ at the moment, conflicts are only created in the source.
+
+ Also modified to allow for interrupting the synchronization process.
"""
+ syncing_lock = Lock()
+
def stop(self):
"""
Stop the current sync in progress.
"""
self.sync_target.stop()
- def sync(self, autocreate=False):
+ def sync(self, autocreate=False, defer_decryption=True):
"""
Synchronize documents between source and target.
+ Differently from u1db `Synchronizer.sync` method, this one allows to
+ pass a `defer_decryption` flag that will postpone the last
+ step in the synchronization dance, namely, the setting of the last
+ known generation and transaction id for a given remote replica.
+
+ This is done to allow the ongoing parallel decryption of the incoming
+ docs to proceed without `InvalidGeneration` conflicts.
+
:param autocreate: Whether the target replica should be created or not.
:type autocreate: bool
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+ """
+ self.syncing_lock.acquire()
+ try:
+ return self._sync(autocreate=autocreate,
+ defer_decryption=defer_decryption)
+ except Exception:
+ # re-raising the exceptions to let syqlcipher.sync catch them
+ # (and re-create the syncer instance if needed)
+ raise
+ finally:
+ self.release_syncing_lock()
+
+ def _sync(self, autocreate=False, defer_decryption=True):
+ """
+ Helper function, called from the main `sync` method.
+ See `sync` docstring.
"""
sync_target = self.sync_target
@@ -64,6 +114,16 @@ class Synchronizer(U1DBSynchronizer):
target_gen, target_trans_id = (0, '')
target_my_gen, target_my_trans_id = (0, '')
+ logger.debug(
+ "Soledad target sync info:\n"
+ " target replica uid: %s\n"
+ " target generation: %d\n"
+ " target trans id: %s\n"
+ " target my gen: %d\n"
+ " target my trans_id: %s"
+ % (self.target_replica_uid, target_gen, target_trans_id,
+ target_my_gen, target_my_trans_id))
+
# make sure we'll have access to target replica uid once it exists
if self.target_replica_uid is None:
@@ -80,6 +140,8 @@ class Synchronizer(U1DBSynchronizer):
# what's changed since that generation and this current gen
my_gen, _, changes = self.source.whats_changed(target_my_gen)
+ logger.debug("Soledad sync: there are %d documents to send." \
+ % len(changes))
# get source last-seen database generation for the target
if self.target_replica_uid is None:
@@ -88,6 +150,11 @@ class Synchronizer(U1DBSynchronizer):
target_last_known_gen, target_last_known_trans_id = \
self.source._get_replica_gen_and_trans_id(
self.target_replica_uid)
+ logger.debug(
+ "Soledad source sync info:\n"
+ " source target gen: %d\n"
+ " source target trans_id: %s"
+ % (target_last_known_gen, target_last_known_trans_id))
# validate transaction ids
if not changes and target_last_known_gen == target_gen:
@@ -111,15 +178,85 @@ class Synchronizer(U1DBSynchronizer):
#
# The sync_exchange method may be interrupted, in which case it will
# return a tuple of Nones.
- new_gen, new_trans_id = sync_target.sync_exchange(
- docs_by_generation, self.source._replica_uid,
- target_last_known_gen, target_last_known_trans_id,
- self._insert_doc_from_target, ensure_callback=ensure_callback)
+ try:
+ new_gen, new_trans_id = sync_target.sync_exchange(
+ docs_by_generation, self.source._replica_uid,
+ target_last_known_gen, target_last_known_trans_id,
+ self._insert_doc_from_target, ensure_callback=ensure_callback,
+ defer_decryption=defer_decryption)
+ logger.debug(
+ "Soledad source sync info after sync exchange:\n"
+ " source target gen: %d\n"
+ " source target trans_id: %s"
+ % (new_gen, new_trans_id))
+ info = {
+ "target_replica_uid": self.target_replica_uid,
+ "new_gen": new_gen,
+ "new_trans_id": new_trans_id,
+ "my_gen": my_gen
+ }
+ self._syncing_info = info
+ if defer_decryption and not sync_target.has_syncdb():
+ logger.debug("Sync target has no valid sync db, "
+ "aborting defer_decryption")
+ defer_decryption = False
+ self.complete_sync()
+ except Exception as e:
+ logger.error("Soledad sync error: %s" % str(e))
+ logger.error(traceback.format_exc())
+ sync_target.stop()
+ finally:
+ sync_target.close()
- # record target synced-up-to generation including applying what we sent
+ return my_gen
+
+ def complete_sync(self):
+ """
+ Last stage of the synchronization:
+ (a) record last known generation and transaction uid for the remote
+ replica, and
+ (b) make target aware of our current reached generation.
+ """
+ logger.debug("Completing deferred last step in SYNC...")
+
+ # record target synced-up-to generation including applying what we
+ # sent
+ info = self._syncing_info
self.source._set_replica_gen_and_trans_id(
- self.target_replica_uid, new_gen, new_trans_id)
+ info["target_replica_uid"], info["new_gen"], info["new_trans_id"])
+
# if gapless record current reached generation with target
- self._record_sync_info_with_the_target(my_gen)
+ self._record_sync_info_with_the_target(info["my_gen"])
- return my_gen
+ @property
+ def syncing(self):
+ """
+ Return True if a sync is ongoing, False otherwise.
+ :rtype: bool
+ """
+ # XXX FIXME we need some mechanism for timeout: should cleanup and
+ # release if something in the syncdb-decrypt goes wrong. we could keep
+ # track of the release date and cleanup unrealistic sync entries after
+ # some time.
+ locked = self.syncing_lock.locked()
+ return locked
+
+ def release_syncing_lock(self):
+ """
+ Release syncing lock if it's locked.
+ """
+ if self.syncing_lock.locked():
+ self.syncing_lock.release()
+
+ def close(self):
+ """
+ Close sync target pool of workers.
+ """
+ self.release_syncing_lock()
+ self.sync_target.close()
+
+ def __del__(self):
+ """
+ Cleanup: release lock.
+ """
+ self.release_syncing_lock()