diff options
author | Kali Kaneko <kali@leap.se> | 2014-07-11 10:19:47 -0500 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2014-07-11 10:19:47 -0500 |
commit | 4519775ebbbbe09913a1b96c8ac0a55c8a956069 (patch) | |
tree | 780a593e5840142021693cc7c0eada5b7f2cc2c1 /client/src/leap/soledad/client/sync.py | |
parent | 2628bbf4c5faff50491cdd227c787ca7f148f368 (diff) | |
parent | 9769301256f994061111e9a18beae90160cc809f (diff) |
Merge branch 'feature/encdb_splitted_drebs' into develop
Diffstat (limited to 'client/src/leap/soledad/client/sync.py')
-rw-r--r-- | client/src/leap/soledad/client/sync.py | 163 |
1 files changed, 150 insertions, 13 deletions
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 56e63416..5d545a77 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -17,35 +17,85 @@ """ -Sync infrastructure that can be interrupted and recovered. +Soledad synchronization utilities. + + +Extend u1db Synchronizer with the ability to: + + * Defer the update of the known replica uid until all the decryption of + the incoming messages has been processed. + + * Be interrupted and recovered. """ + import json +import logging +import traceback +from threading import Lock from u1db import errors -from u1db.sync import Synchronizer as U1DBSynchronizer +from u1db.sync import Synchronizer -class Synchronizer(U1DBSynchronizer): +logger = logging.getLogger(__name__) + + +class SoledadSynchronizer(Synchronizer): """ Collect the state around synchronizing 2 U1DB replicas. - Modified to allow for interrupting the synchronization process. + Synchronization is bi-directional, in that new items in the source are sent + to the target, and new items in the target are returned to the source. + However, it still recognizes that one side is initiating the request. Also, + at the moment, conflicts are only created in the source. + + Also modified to allow for interrupting the synchronization process. """ + syncing_lock = Lock() + def stop(self): """ Stop the current sync in progress. """ self.sync_target.stop() - def sync(self, autocreate=False): + def sync(self, autocreate=False, defer_decryption=True): """ Synchronize documents between source and target. + Differently from u1db `Synchronizer.sync` method, this one allows to + pass a `defer_decryption` flag that will postpone the last + step in the synchronization dance, namely, the setting of the last + known generation and transaction id for a given remote replica. + + This is done to allow the ongoing parallel decryption of the incoming + docs to proceed without `InvalidGeneration` conflicts. + :param autocreate: Whether the target replica should be created or not. :type autocreate: bool + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + """ + self.syncing_lock.acquire() + try: + return self._sync(autocreate=autocreate, + defer_decryption=defer_decryption) + except Exception: + # re-raising the exceptions to let syqlcipher.sync catch them + # (and re-create the syncer instance if needed) + raise + finally: + self.release_syncing_lock() + + def _sync(self, autocreate=False, defer_decryption=True): + """ + Helper function, called from the main `sync` method. + See `sync` docstring. """ sync_target = self.sync_target @@ -64,6 +114,16 @@ class Synchronizer(U1DBSynchronizer): target_gen, target_trans_id = (0, '') target_my_gen, target_my_trans_id = (0, '') + logger.debug( + "Soledad target sync info:\n" + " target replica uid: %s\n" + " target generation: %d\n" + " target trans id: %s\n" + " target my gen: %d\n" + " target my trans_id: %s" + % (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id)) + # make sure we'll have access to target replica uid once it exists if self.target_replica_uid is None: @@ -80,6 +140,8 @@ class Synchronizer(U1DBSynchronizer): # what's changed since that generation and this current gen my_gen, _, changes = self.source.whats_changed(target_my_gen) + logger.debug("Soledad sync: there are %d documents to send." \ + % len(changes)) # get source last-seen database generation for the target if self.target_replica_uid is None: @@ -88,6 +150,11 @@ class Synchronizer(U1DBSynchronizer): target_last_known_gen, target_last_known_trans_id = \ self.source._get_replica_gen_and_trans_id( self.target_replica_uid) + logger.debug( + "Soledad source sync info:\n" + " source target gen: %d\n" + " source target trans_id: %s" + % (target_last_known_gen, target_last_known_trans_id)) # validate transaction ids if not changes and target_last_known_gen == target_gen: @@ -111,15 +178,85 @@ class Synchronizer(U1DBSynchronizer): # # The sync_exchange method may be interrupted, in which case it will # return a tuple of Nones. - new_gen, new_trans_id = sync_target.sync_exchange( - docs_by_generation, self.source._replica_uid, - target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback) + try: + new_gen, new_trans_id = sync_target.sync_exchange( + docs_by_generation, self.source._replica_uid, + target_last_known_gen, target_last_known_trans_id, + self._insert_doc_from_target, ensure_callback=ensure_callback, + defer_decryption=defer_decryption) + logger.debug( + "Soledad source sync info after sync exchange:\n" + " source target gen: %d\n" + " source target trans_id: %s" + % (new_gen, new_trans_id)) + info = { + "target_replica_uid": self.target_replica_uid, + "new_gen": new_gen, + "new_trans_id": new_trans_id, + "my_gen": my_gen + } + self._syncing_info = info + if defer_decryption and not sync_target.has_syncdb(): + logger.debug("Sync target has no valid sync db, " + "aborting defer_decryption") + defer_decryption = False + self.complete_sync() + except Exception as e: + logger.error("Soledad sync error: %s" % str(e)) + logger.error(traceback.format_exc()) + sync_target.stop() + finally: + sync_target.close() - # record target synced-up-to generation including applying what we sent + return my_gen + + def complete_sync(self): + """ + Last stage of the synchronization: + (a) record last known generation and transaction uid for the remote + replica, and + (b) make target aware of our current reached generation. + """ + logger.debug("Completing deferred last step in SYNC...") + + # record target synced-up-to generation including applying what we + # sent + info = self._syncing_info self.source._set_replica_gen_and_trans_id( - self.target_replica_uid, new_gen, new_trans_id) + info["target_replica_uid"], info["new_gen"], info["new_trans_id"]) + # if gapless record current reached generation with target - self._record_sync_info_with_the_target(my_gen) + self._record_sync_info_with_the_target(info["my_gen"]) - return my_gen + @property + def syncing(self): + """ + Return True if a sync is ongoing, False otherwise. + :rtype: bool + """ + # XXX FIXME we need some mechanism for timeout: should cleanup and + # release if something in the syncdb-decrypt goes wrong. we could keep + # track of the release date and cleanup unrealistic sync entries after + # some time. + locked = self.syncing_lock.locked() + return locked + + def release_syncing_lock(self): + """ + Release syncing lock if it's locked. + """ + if self.syncing_lock.locked(): + self.syncing_lock.release() + + def close(self): + """ + Close sync target pool of workers. + """ + self.release_syncing_lock() + self.sync_target.close() + + def __del__(self): + """ + Cleanup: release lock. + """ + self.release_syncing_lock() |