summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/sync.py')
-rw-r--r--client/src/leap/soledad/client/sync.py83
1 files changed, 50 insertions, 33 deletions
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index d4ca4258..f8f74ce7 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -25,9 +25,10 @@ Extend u1db Synchronizer with the ability to:
* Be interrupted and recovered.
"""
import logging
-import traceback
from threading import Lock
+from twisted.internet import defer
+
from u1db import errors
from u1db.sync import Synchronizer
@@ -90,6 +91,7 @@ class SoledadSynchronizer(Synchronizer):
# Synchronizer may be reused later.
self.release_syncing_lock()
+ @defer.inlineCallbacks
def _sync(self, autocreate=False, defer_decryption=True):
"""
Helper function, called from the main `sync` method.
@@ -102,7 +104,7 @@ class SoledadSynchronizer(Synchronizer):
ensure_callback = None
try:
(self.target_replica_uid, target_gen, target_trans_id,
- target_my_gen, target_my_trans_id) = \
+ target_my_gen, target_my_trans_id) = yield \
sync_target.get_sync_info(self.source._replica_uid)
except errors.DatabaseDoesNotExist:
if not autocreate:
@@ -151,15 +153,15 @@ class SoledadSynchronizer(Synchronizer):
self.target_replica_uid)
logger.debug(
"Soledad source sync info:\n"
- " source target gen: %d\n"
- " source target trans_id: %s"
+ " last target gen known to source: %d\n"
+ " last target trans_id known to source: %s"
% (target_last_known_gen, target_last_known_trans_id))
# validate transaction ids
if not changes and target_last_known_gen == target_gen:
if target_trans_id != target_last_known_trans_id:
raise errors.InvalidTransactionId
- return my_gen
+ defer.returnValue(my_gen)
# prepare to send all the changed docs
changed_doc_ids = [doc_id for doc_id, _, _ in changes]
@@ -177,33 +179,26 @@ class SoledadSynchronizer(Synchronizer):
#
# The sync_exchange method may be interrupted, in which case it will
# return a tuple of Nones.
- try:
- new_gen, new_trans_id = sync_target.sync_exchange(
- docs_by_generation, self.source._replica_uid,
- target_last_known_gen, target_last_known_trans_id,
- self._insert_doc_from_target, ensure_callback=ensure_callback,
- defer_decryption=defer_decryption)
- logger.debug(
- "Soledad source sync info after sync exchange:\n"
- " source target gen: %d\n"
- " source target trans_id: %s"
- % (new_gen, new_trans_id))
- info = {
- "target_replica_uid": self.target_replica_uid,
- "new_gen": new_gen,
- "new_trans_id": new_trans_id,
- "my_gen": my_gen
- }
- self._syncing_info = info
- self.complete_sync()
- except Exception as e:
- logger.error("Soledad sync error: %s" % str(e))
- logger.error(traceback.format_exc())
- sync_target.stop()
- finally:
- sync_target.close()
-
- return my_gen
+ new_gen, new_trans_id = yield sync_target.sync_exchange(
+ docs_by_generation, self.source._replica_uid,
+ target_last_known_gen, target_last_known_trans_id,
+ self._insert_doc_from_target, ensure_callback=ensure_callback,
+ defer_decryption=defer_decryption)
+ logger.debug(
+ "Soledad source sync info after sync exchange:\n"
+ " source known target gen: %d\n"
+ " source known target trans_id: %s"
+ % (new_gen, new_trans_id))
+ info = {
+ "target_replica_uid": self.target_replica_uid,
+ "new_gen": new_gen,
+ "new_trans_id": new_trans_id,
+ "my_gen": my_gen
+ }
+ self._syncing_info = info
+ yield self.complete_sync()
+
+ defer.returnValue(my_gen)
def complete_sync(self):
"""
@@ -211,6 +206,9 @@ class SoledadSynchronizer(Synchronizer):
(a) record last known generation and transaction uid for the remote
replica, and
(b) make target aware of our current reached generation.
+
+ :return: A deferred which will fire when the sync has been completed.
+ :rtype: twisted.internet.defer.Deferred
"""
logger.debug("Completing deferred last step in SYNC...")
@@ -221,7 +219,26 @@ class SoledadSynchronizer(Synchronizer):
info["target_replica_uid"], info["new_gen"], info["new_trans_id"])
# if gapless record current reached generation with target
- self._record_sync_info_with_the_target(info["my_gen"])
+ return self._record_sync_info_with_the_target(info["my_gen"])
+
+ def _record_sync_info_with_the_target(self, start_generation):
+ """
+ Store local replica metadata in server.
+
+ :param start_generation: The local generation when the sync was
+ started.
+ :type start_generation: int
+
+ :return: A deferred which will fire when the operation has been
+ completed.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ cur_gen, trans_id = self.source._get_generation_info()
+ if (cur_gen == start_generation + self.num_inserted
+ and self.num_inserted > 0):
+ return self.sync_target.record_sync_info(
+ self.source._replica_uid, cur_gen, trans_id)
+ return defer.succeed(None)
@property
def syncing(self):