diff options
Diffstat (limited to 'client/src/leap/soledad/client/sync.py')
-rw-r--r-- | client/src/leap/soledad/client/sync.py | 171 |
1 files changed, 51 insertions, 120 deletions
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index d3f106da..53172f31 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -16,17 +16,10 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. """ Soledad synchronization utilities. - -Extend u1db Synchronizer with the ability to: - - * Postpone the update of the known replica uid until all the decryption of - the incoming messages has been processed. - - * Be interrupted and recovered. """ import logging -import traceback -from threading import Lock + +from twisted.internet import defer from u1db import errors from u1db.sync import Synchronizer @@ -47,17 +40,8 @@ class SoledadSynchronizer(Synchronizer): Also modified to allow for interrupting the synchronization process. """ - # TODO can delegate the syncing to the api object, living in the reactor - # thread, and use a simple flag. - syncing_lock = Lock() - - def stop(self): - """ - Stop the current sync in progress. - """ - self.sync_target.stop() - - def sync(self, autocreate=False, defer_decryption=True): + @defer.inlineCallbacks + def sync(self, defer_decryption=True): """ Synchronize documents between source and target. @@ -69,48 +53,22 @@ class SoledadSynchronizer(Synchronizer): This is done to allow the ongoing parallel decryption of the incoming docs to proceed without `InvalidGeneration` conflicts. - :param autocreate: Whether the target replica should be created or not. - :type autocreate: bool :param defer_decryption: Whether to defer the decryption process using the intermediate database. If False, decryption will be done inline. :type defer_decryption: bool - """ - self.syncing_lock.acquire() - try: - return self._sync(autocreate=autocreate, - defer_decryption=defer_decryption) - except Exception: - # we want this exception to reach either SQLCipherU1DBSync.sync or - # the Solead api object itself, so it is poperly handled and/or - # logged... - raise - finally: - # ... but we also want to release the syncing lock so this - # Synchronizer may be reused later. - self.release_syncing_lock() - - def _sync(self, autocreate=False, defer_decryption=True): - """ - Helper function, called from the main `sync` method. - See `sync` docstring. + + :return: A deferred which will fire after the sync has finished. + :rtype: twisted.internet.defer.Deferred """ sync_target = self.sync_target # get target identifier, its current generation, # and its last-seen database generation for this source ensure_callback = None - try: - (self.target_replica_uid, target_gen, target_trans_id, - target_my_gen, target_my_trans_id) = \ - sync_target.get_sync_info(self.source._replica_uid) - except errors.DatabaseDoesNotExist: - if not autocreate: - raise - # will try to ask sync_exchange() to create the db - self.target_replica_uid = None - target_gen, target_trans_id = (0, '') - target_my_gen, target_my_trans_id = (0, '') + (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id) = yield \ + sync_target.get_sync_info(self.source._replica_uid) logger.debug( "Soledad target sync info:\n" @@ -151,15 +109,15 @@ class SoledadSynchronizer(Synchronizer): self.target_replica_uid) logger.debug( "Soledad source sync info:\n" - " source target gen: %d\n" - " source target trans_id: %s" + " last target gen known to source: %d\n" + " last target trans_id known to source: %s" % (target_last_known_gen, target_last_known_trans_id)) # validate transaction ids if not changes and target_last_known_gen == target_gen: if target_trans_id != target_last_known_trans_id: raise errors.InvalidTransactionId - return my_gen + defer.returnValue(my_gen) # prepare to send all the changed docs changed_doc_ids = [doc_id for doc_id, _, _ in changes] @@ -174,40 +132,26 @@ class SoledadSynchronizer(Synchronizer): # exchange documents and try to insert the returned ones with # the target, return target synced-up-to gen. - # - # The sync_exchange method may be interrupted, in which case it will - # return a tuple of Nones. - try: - new_gen, new_trans_id = sync_target.sync_exchange( - docs_by_generation, self.source._replica_uid, - target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback, - defer_decryption=defer_decryption) - logger.debug( - "Soledad source sync info after sync exchange:\n" - " source target gen: %d\n" - " source target trans_id: %s" - % (new_gen, new_trans_id)) - info = { - "target_replica_uid": self.target_replica_uid, - "new_gen": new_gen, - "new_trans_id": new_trans_id, - "my_gen": my_gen - } - self._syncing_info = info - if defer_decryption and not sync_target.has_syncdb(): - logger.debug("Sync target has no valid sync db, " - "aborting defer_decryption") - defer_decryption = False - self.complete_sync() - except Exception as e: - logger.error("Soledad sync error: %s" % str(e)) - logger.error(traceback.format_exc()) - sync_target.stop() - finally: - sync_target.close() - - return my_gen + new_gen, new_trans_id = yield sync_target.sync_exchange( + docs_by_generation, self.source._replica_uid, + target_last_known_gen, target_last_known_trans_id, + self._insert_doc_from_target, ensure_callback=ensure_callback, + defer_decryption=defer_decryption) + logger.debug( + "Soledad source sync info after sync exchange:\n" + " source known target gen: %d\n" + " source known target trans_id: %s" + % (new_gen, new_trans_id)) + info = { + "target_replica_uid": self.target_replica_uid, + "new_gen": new_gen, + "new_trans_id": new_trans_id, + "my_gen": my_gen + } + self._syncing_info = info + yield self.complete_sync() + + defer.returnValue(my_gen) def complete_sync(self): """ @@ -215,6 +159,9 @@ class SoledadSynchronizer(Synchronizer): (a) record last known generation and transaction uid for the remote replica, and (b) make target aware of our current reached generation. + + :return: A deferred which will fire when the sync has been completed. + :rtype: twisted.internet.defer.Deferred """ logger.debug("Completing deferred last step in SYNC...") @@ -225,39 +172,23 @@ class SoledadSynchronizer(Synchronizer): info["target_replica_uid"], info["new_gen"], info["new_trans_id"]) # if gapless record current reached generation with target - self._record_sync_info_with_the_target(info["my_gen"]) - - @property - def syncing(self): - """ - Return True if a sync is ongoing, False otherwise. - :rtype: bool - """ - # XXX FIXME we need some mechanism for timeout: should cleanup and - # release if something in the syncdb-decrypt goes wrong. we could keep - # track of the release date and cleanup unrealistic sync entries after - # some time. + return self._record_sync_info_with_the_target(info["my_gen"]) - # TODO use cancellable deferreds instead - locked = self.syncing_lock.locked() - return locked - - def release_syncing_lock(self): - """ - Release syncing lock if it's locked. + def _record_sync_info_with_the_target(self, start_generation): """ - if self.syncing_lock.locked(): - self.syncing_lock.release() + Store local replica metadata in server. - def close(self): - """ - Close sync target pool of workers. - """ - self.release_syncing_lock() - self.sync_target.close() + :param start_generation: The local generation when the sync was + started. + :type start_generation: int - def __del__(self): - """ - Cleanup: release lock. + :return: A deferred which will fire when the operation has been + completed. + :rtype: twisted.internet.defer.Deferred """ - self.release_syncing_lock() + cur_gen, trans_id = self.source._get_generation_info() + if (cur_gen == start_generation + self.num_inserted + and self.num_inserted > 0): + return self.sync_target.record_sync_info( + self.source._replica_uid, cur_gen, trans_id) + return defer.succeed(None) |