diff options
Diffstat (limited to 'src/leap/soledad/client/sync.py')
-rw-r--r-- | src/leap/soledad/client/sync.py | 231 |
1 files changed, 231 insertions, 0 deletions
diff --git a/src/leap/soledad/client/sync.py b/src/leap/soledad/client/sync.py new file mode 100644 index 00000000..2a927189 --- /dev/null +++ b/src/leap/soledad/client/sync.py @@ -0,0 +1,231 @@ +# -*- coding: utf-8 -*- +# sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Soledad synchronization utilities. +""" +import os + +from twisted.internet import defer + +from leap.soledad.common.log import getLogger +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.sync import Synchronizer +from leap.soledad.common.errors import BackendNotReadyError + + +logger = getLogger(__name__) + + +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + +class SoledadSynchronizer(Synchronizer): + """ + Collect the state around synchronizing 2 U1DB replicas. + + Synchronization is bi-directional, in that new items in the source are sent + to the target, and new items in the target are returned to the source. + However, it still recognizes that one side is initiating the request. Also, + at the moment, conflicts are only created in the source. + + Also modified to allow for interrupting the synchronization process. + """ + received_docs = [] + + def __init__(self, *args, **kwargs): + Synchronizer.__init__(self, *args, **kwargs) + if DO_STATS: + self.sync_phase = [0] + self.sync_exchange_phase = None + + @defer.inlineCallbacks + def sync(self): + """ + Synchronize documents between source and target. + + :return: A deferred which will fire after the sync has finished with + the local generation before the synchronization was performed. + :rtype: twisted.internet.defer.Deferred + """ + + sync_target = self.sync_target + self.received_docs = [] + + # ---------- phase 1: get sync info from server ---------------------- + if DO_STATS: + self.sync_phase[0] += 1 + self.sync_exchange_phase = self.sync_target.sync_exchange_phase + # -------------------------------------------------------------------- + + # get target identifier, its current generation, + # and its last-seen database generation for this source + ensure_callback = None + try: + (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id) = yield \ + sync_target.get_sync_info(self.source._replica_uid) + except (errors.DatabaseDoesNotExist, BackendNotReadyError) as e: + logger.warn("Database isn't ready on server. Will be created.") + logger.warn("Reason: %s" % e.__class__) + self.target_replica_uid = None + target_gen, target_trans_id = 0, '' + target_my_gen, target_my_trans_id = 0, '' + + logger.debug("target replica uid: %s" % self.target_replica_uid) + logger.debug("target generation: %d" % target_gen) + logger.debug("target trans id: %s" % target_trans_id) + logger.debug("target my gen: %d" % target_my_gen) + logger.debug("target my trans_id: %s" % target_my_trans_id) + logger.debug("source replica_uid: %s" % self.source._replica_uid) + + # make sure we'll have access to target replica uid once it exists + if self.target_replica_uid is None: + + def ensure_callback(replica_uid): + self.target_replica_uid = replica_uid + + # make sure we're not syncing one replica with itself + if self.target_replica_uid == self.source._replica_uid: + raise errors.InvalidReplicaUID + + # validate the info the target has about the source replica + self.source.validate_gen_and_trans_id( + target_my_gen, target_my_trans_id) + + # ---------- phase 2: what's changed --------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + + # what's changed since that generation and this current gen + my_gen, _, changes = self.source.whats_changed(target_my_gen) + logger.debug("there are %d documents to send" % len(changes)) + + # get source last-seen database generation for the target + if self.target_replica_uid is None: + target_last_known_gen, target_last_known_trans_id = 0, '' + else: + target_last_known_gen, target_last_known_trans_id = \ + self.source._get_replica_gen_and_trans_id( + self.target_replica_uid) + logger.debug( + "last known target gen: %d" % target_last_known_gen) + logger.debug( + "last known target trans_id: %s" % target_last_known_trans_id) + + # validate transaction ids + if not changes and target_last_known_gen == target_gen: + if target_trans_id != target_last_known_trans_id: + raise errors.InvalidTransactionId + defer.returnValue(my_gen) + + # ---------- phase 3: sync exchange ---------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + + docs_by_generation = self._docs_by_gen_from_changes(changes) + + # exchange documents and try to insert the returned ones with + # the target, return target synced-up-to gen. + new_gen, new_trans_id = yield sync_target.sync_exchange( + docs_by_generation, self.source._replica_uid, + target_last_known_gen, target_last_known_trans_id, + self._insert_doc_from_target, ensure_callback=ensure_callback) + ids_sent = [doc_id for doc_id, _, _ in changes] + logger.debug("target gen after sync: %d" % new_gen) + logger.debug("target trans_id after sync: %s" % new_trans_id) + if hasattr(self.source, 'commit'): # sqlcipher backend speed up + self.source.commit() # insert it all in a single transaction + info = { + "target_replica_uid": self.target_replica_uid, + "new_gen": new_gen, + "new_trans_id": new_trans_id, + "my_gen": my_gen + } + self._syncing_info = info + + # ---------- phase 4: complete sync ---------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + + yield self.complete_sync() + + _, _, changes = self.source.whats_changed(target_my_gen) + changed_doc_ids = [doc_id for doc_id, _, _ in changes] + + just_received = list(set(changed_doc_ids) - set(ids_sent)) + self.received_docs = just_received + + # ---------- phase 5: sync is over ----------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + + defer.returnValue(my_gen) + + def _docs_by_gen_from_changes(self, changes): + docs_by_generation = [] + kwargs = {'include_deleted': True} + for doc_id, gen, trans in changes: + get_doc = (self.source.get_doc, (doc_id,), kwargs) + docs_by_generation.append((get_doc, gen, trans)) + return docs_by_generation + + def complete_sync(self): + """ + Last stage of the synchronization: + (a) record last known generation and transaction uid for the remote + replica, and + (b) make target aware of our current reached generation. + + :return: A deferred which will fire when the sync has been completed. + :rtype: twisted.internet.defer.Deferred + """ + logger.debug("completing deferred last step in sync...") + + # record target synced-up-to generation including applying what we + # sent + info = self._syncing_info + self.source._set_replica_gen_and_trans_id( + info["target_replica_uid"], info["new_gen"], info["new_trans_id"]) + + # if gapless record current reached generation with target + return self._record_sync_info_with_the_target(info["my_gen"]) + + def _record_sync_info_with_the_target(self, start_generation): + """ + Store local replica metadata in server. + + :param start_generation: The local generation when the sync was + started. + :type start_generation: int + + :return: A deferred which will fire when the operation has been + completed. + :rtype: twisted.internet.defer.Deferred + """ + cur_gen, trans_id = self.source._get_generation_info() + if (cur_gen == start_generation + self.num_inserted and + self.num_inserted > 0): + return self.sync_target.record_sync_info( + self.source._replica_uid, cur_gen, trans_id) + return defer.succeed(None) |