diff options
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r-- | client/src/leap/soledad/client/target.py | 73 |
1 files changed, 60 insertions, 13 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 7b77055c..06e79e63 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # target.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -25,6 +25,7 @@ import hashlib import hmac import logging import urllib +import threading import simplejson as json from time import sleep @@ -313,6 +314,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ HTTPSyncTarget.__init__(self, url, creds) self._crypto = crypto + self._stopped = True + self._sync_state = None + self._stop_lock = threading.Lock() def _init_post_request(self, url, action, headers, content_length): """ @@ -367,13 +371,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :rtype: list of str """ - def _post_get_doc(received): + def _post_get_doc(): """ Get a sync document from server by means of a POST request. - - :param received: How many documents have already been received in - this sync session. - :type received: int """ entries = ['['] size = 1 @@ -384,7 +384,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): last_known_trans_id=last_known_trans_id, ensure=ensure_callback is not None) # inform server of how many documents have already been received - size += self._prepare(',', entries, received=received) + size += self._prepare( + ',', entries, received=self._sync_state.received) entries.append('\r\n]') size += len(entries[-1]) # send headers @@ -394,13 +395,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._conn.send(entry) return self._response() - received = 0 number_of_changes = None - while number_of_changes is None or received < number_of_changes: + while number_of_changes is None or \ + self._sync_state.received < number_of_changes: + # bail out if sync process was interrupted + if self.stopped is True: + return None, None # try to fetch one document from target - data, _ = _post_get_doc(received) - received += 1 + data, _ = _post_get_doc() + self._sync_state.received += 1 # decode incoming stream entries = None try: @@ -545,7 +549,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): def sync_exchange(self, docs_by_generations, source_replica_uid, last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback=None): + return_doc_cb, ensure_callback=None, + sync_state=None): """ Find out which documents the remote database does not know about, encrypt and send them. @@ -574,6 +579,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :return: The new generation and transaction id of the target replica. :rtype: tuple """ + self.start() + self._sync_state = sync_state + self._ensure_connection() if self._trace_hook: # for tests self._trace_hook('sync_exchange') @@ -616,7 +624,15 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen = last_known_generation cur_target_trans_id = last_known_trans_id + # skip docs that were already sent + if self._sync_state.sent > 0: + docs_by_generations = docs_by_generations[self._sync_state.sent:] + + # send docs for doc, gen, trans_id in docs_by_generations: + # allow for interrupting the sync process + if self.stopped is True: + break # skip non-syncable docs if isinstance(doc, SoledadDocument) and not doc.syncable: continue @@ -632,10 +648,41 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen, cur_target_trans_id = _post_put_doc( headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id) + self._sync_state.sent += 1 + # get docs from target cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, last_known_generation, last_known_trans_id, headers, return_doc_cb, ensure_callback) - + self.stop() return cur_target_gen, cur_target_trans_id + + def start(self): + """ + Mark current sync session as running. + """ + with self._stop_lock: + self._stopped = False + + def stop(self): + """ + Mark current sync session as stopped. + + This will eventually interrupt the sync_exchange() method and return + enough information to the synchronizer so the sync session can be + recovered afterwards. + """ + with self._stop_lock: + self._stopped = True + + @property + def stopped(self): + """ + Return wether this sync session is stopped. + + :return: Wether this sync session is stopped. + :rtype: bool + """ + with self._stop_lock: + return self._stopped is True |