summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/sync.py')
-rw-r--r--client/src/leap/soledad/client/sync.py171
1 files changed, 51 insertions, 120 deletions
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index d3f106da..53172f31 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -16,17 +16,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Soledad synchronization utilities.
-
-Extend u1db Synchronizer with the ability to:
-
- * Postpone the update of the known replica uid until all the decryption of
- the incoming messages has been processed.
-
- * Be interrupted and recovered.
"""
import logging
-import traceback
-from threading import Lock
+
+from twisted.internet import defer
from u1db import errors
from u1db.sync import Synchronizer
@@ -47,17 +40,8 @@ class SoledadSynchronizer(Synchronizer):
Also modified to allow for interrupting the synchronization process.
"""
- # TODO can delegate the syncing to the api object, living in the reactor
- # thread, and use a simple flag.
- syncing_lock = Lock()
-
- def stop(self):
- """
- Stop the current sync in progress.
- """
- self.sync_target.stop()
-
- def sync(self, autocreate=False, defer_decryption=True):
+ @defer.inlineCallbacks
+ def sync(self, defer_decryption=True):
"""
Synchronize documents between source and target.
@@ -69,48 +53,22 @@ class SoledadSynchronizer(Synchronizer):
This is done to allow the ongoing parallel decryption of the incoming
docs to proceed without `InvalidGeneration` conflicts.
- :param autocreate: Whether the target replica should be created or not.
- :type autocreate: bool
:param defer_decryption: Whether to defer the decryption process using
the intermediate database. If False,
decryption will be done inline.
:type defer_decryption: bool
- """
- self.syncing_lock.acquire()
- try:
- return self._sync(autocreate=autocreate,
- defer_decryption=defer_decryption)
- except Exception:
- # we want this exception to reach either SQLCipherU1DBSync.sync or
- # the Solead api object itself, so it is poperly handled and/or
- # logged...
- raise
- finally:
- # ... but we also want to release the syncing lock so this
- # Synchronizer may be reused later.
- self.release_syncing_lock()
-
- def _sync(self, autocreate=False, defer_decryption=True):
- """
- Helper function, called from the main `sync` method.
- See `sync` docstring.
+
+ :return: A deferred which will fire after the sync has finished.
+ :rtype: twisted.internet.defer.Deferred
"""
sync_target = self.sync_target
# get target identifier, its current generation,
# and its last-seen database generation for this source
ensure_callback = None
- try:
- (self.target_replica_uid, target_gen, target_trans_id,
- target_my_gen, target_my_trans_id) = \
- sync_target.get_sync_info(self.source._replica_uid)
- except errors.DatabaseDoesNotExist:
- if not autocreate:
- raise
- # will try to ask sync_exchange() to create the db
- self.target_replica_uid = None
- target_gen, target_trans_id = (0, '')
- target_my_gen, target_my_trans_id = (0, '')
+ (self.target_replica_uid, target_gen, target_trans_id,
+ target_my_gen, target_my_trans_id) = yield \
+ sync_target.get_sync_info(self.source._replica_uid)
logger.debug(
"Soledad target sync info:\n"
@@ -151,15 +109,15 @@ class SoledadSynchronizer(Synchronizer):
self.target_replica_uid)
logger.debug(
"Soledad source sync info:\n"
- " source target gen: %d\n"
- " source target trans_id: %s"
+ " last target gen known to source: %d\n"
+ " last target trans_id known to source: %s"
% (target_last_known_gen, target_last_known_trans_id))
# validate transaction ids
if not changes and target_last_known_gen == target_gen:
if target_trans_id != target_last_known_trans_id:
raise errors.InvalidTransactionId
- return my_gen
+ defer.returnValue(my_gen)
# prepare to send all the changed docs
changed_doc_ids = [doc_id for doc_id, _, _ in changes]
@@ -174,40 +132,26 @@ class SoledadSynchronizer(Synchronizer):
# exchange documents and try to insert the returned ones with
# the target, return target synced-up-to gen.
- #
- # The sync_exchange method may be interrupted, in which case it will
- # return a tuple of Nones.
- try:
- new_gen, new_trans_id = sync_target.sync_exchange(
- docs_by_generation, self.source._replica_uid,
- target_last_known_gen, target_last_known_trans_id,
- self._insert_doc_from_target, ensure_callback=ensure_callback,
- defer_decryption=defer_decryption)
- logger.debug(
- "Soledad source sync info after sync exchange:\n"
- " source target gen: %d\n"
- " source target trans_id: %s"
- % (new_gen, new_trans_id))
- info = {
- "target_replica_uid": self.target_replica_uid,
- "new_gen": new_gen,
- "new_trans_id": new_trans_id,
- "my_gen": my_gen
- }
- self._syncing_info = info
- if defer_decryption and not sync_target.has_syncdb():
- logger.debug("Sync target has no valid sync db, "
- "aborting defer_decryption")
- defer_decryption = False
- self.complete_sync()
- except Exception as e:
- logger.error("Soledad sync error: %s" % str(e))
- logger.error(traceback.format_exc())
- sync_target.stop()
- finally:
- sync_target.close()
-
- return my_gen
+ new_gen, new_trans_id = yield sync_target.sync_exchange(
+ docs_by_generation, self.source._replica_uid,
+ target_last_known_gen, target_last_known_trans_id,
+ self._insert_doc_from_target, ensure_callback=ensure_callback,
+ defer_decryption=defer_decryption)
+ logger.debug(
+ "Soledad source sync info after sync exchange:\n"
+ " source known target gen: %d\n"
+ " source known target trans_id: %s"
+ % (new_gen, new_trans_id))
+ info = {
+ "target_replica_uid": self.target_replica_uid,
+ "new_gen": new_gen,
+ "new_trans_id": new_trans_id,
+ "my_gen": my_gen
+ }
+ self._syncing_info = info
+ yield self.complete_sync()
+
+ defer.returnValue(my_gen)
def complete_sync(self):
"""
@@ -215,6 +159,9 @@ class SoledadSynchronizer(Synchronizer):
(a) record last known generation and transaction uid for the remote
replica, and
(b) make target aware of our current reached generation.
+
+ :return: A deferred which will fire when the sync has been completed.
+ :rtype: twisted.internet.defer.Deferred
"""
logger.debug("Completing deferred last step in SYNC...")
@@ -225,39 +172,23 @@ class SoledadSynchronizer(Synchronizer):
info["target_replica_uid"], info["new_gen"], info["new_trans_id"])
# if gapless record current reached generation with target
- self._record_sync_info_with_the_target(info["my_gen"])
-
- @property
- def syncing(self):
- """
- Return True if a sync is ongoing, False otherwise.
- :rtype: bool
- """
- # XXX FIXME we need some mechanism for timeout: should cleanup and
- # release if something in the syncdb-decrypt goes wrong. we could keep
- # track of the release date and cleanup unrealistic sync entries after
- # some time.
+ return self._record_sync_info_with_the_target(info["my_gen"])
- # TODO use cancellable deferreds instead
- locked = self.syncing_lock.locked()
- return locked
-
- def release_syncing_lock(self):
- """
- Release syncing lock if it's locked.
+ def _record_sync_info_with_the_target(self, start_generation):
"""
- if self.syncing_lock.locked():
- self.syncing_lock.release()
+ Store local replica metadata in server.
- def close(self):
- """
- Close sync target pool of workers.
- """
- self.release_syncing_lock()
- self.sync_target.close()
+ :param start_generation: The local generation when the sync was
+ started.
+ :type start_generation: int
- def __del__(self):
- """
- Cleanup: release lock.
+ :return: A deferred which will fire when the operation has been
+ completed.
+ :rtype: twisted.internet.defer.Deferred
"""
- self.release_syncing_lock()
+ cur_gen, trans_id = self.source._get_generation_info()
+ if (cur_gen == start_generation + self.num_inserted
+ and self.num_inserted > 0):
+ return self.sync_target.record_sync_info(
+ self.source._replica_uid, cur_gen, trans_id)
+ return defer.succeed(None)