summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2014-11-18 15:33:23 -0200
committerKali Kaneko <kali@leap.se>2015-02-11 14:03:18 -0400
commit4b317c9f5a9033afaa7435e11f761de4bc3095a3 (patch)
treec60efc8d91e44eda52fb151015ed2d9134208839
parent8b3982ada921af765e7ede7dd3c77ef3fbf075f1 (diff)
Fix interruptable sync.
-rw-r--r--client/src/leap/soledad/client/target.py34
1 files changed, 24 insertions, 10 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 9b546402..ba61cdff 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -348,7 +348,7 @@ class DocumentSyncerPool(object):
self._threads.remove(syncer_thread)
self._semaphore_pool.release()
- def cancel_threads(self, calling_thread):
+ def cancel_threads(self):
"""
Stop all threads in the pool.
"""
@@ -794,6 +794,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._sync_exchange_lock = threading.Lock()
self.source_replica_uid = source_replica_uid
self._defer_decryption = False
+ self._syncer_pool = None
# deferred decryption attributes
self._sync_db = None
@@ -952,7 +953,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
headers, return_doc_cb, ensure_callback, sync_id,
- syncer_pool, defer_decryption=False):
+ defer_decryption=False):
"""
Fetch sync documents from the remote database and insert them in the
local database.
@@ -1013,7 +1014,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
break
# launch a thread to fetch one document from target
- t = syncer_pool.new_syncer_thread(
+ t = self._syncer_pool.new_syncer_thread(
idx, number_of_changes,
last_callback_lock=last_callback_lock)
@@ -1047,6 +1048,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
t.join()
if t.success:
number_of_changes, _, _ = t.result
+ else:
+ raise t.exception
first_request = False
# make sure all threads finished and we have up-to-date info
@@ -1057,6 +1060,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
t.join()
if t.success:
last_successful_thread = t
+ else:
+ raise t.exception
# get information about last successful thread
if last_successful_thread is not None:
@@ -1162,9 +1167,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
logger.debug("Soledad sync send status: %s" % msg)
defer_encryption = self._sync_db is not None
- syncer_pool = DocumentSyncerPool(
+ self._syncer_pool = DocumentSyncerPool(
self._raw_url, self._raw_creds, url, headers, ensure_callback,
- self.stop)
+ self.stop_syncer)
threads = []
last_callback_lock = None
sent = 0
@@ -1209,7 +1214,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# -------------------------------------------------------------
# end of symmetric encryption
# -------------------------------------------------------------
- t = syncer_pool.new_syncer_thread(
+ t = self._syncer_pool.new_syncer_thread(
sent + 1, total, last_request_lock=last_request_lock,
last_callback_lock=last_callback_lock)
@@ -1264,6 +1269,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if t.success:
synced.append((doc.doc_id, doc.rev))
last_successful_thread = t
+ else:
+ raise t.exception
# delete documents from the sync database
if defer_encryption:
@@ -1282,10 +1289,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
last_known_generation, last_known_trans_id, headers,
- return_doc_cb, ensure_callback, sync_id, syncer_pool,
+ return_doc_cb, ensure_callback, sync_id,
defer_decryption=defer_decryption)
- syncer_pool.cleanup()
+ self._syncer_pool.cleanup()
# decrypt docs in case of deferred decryption
if defer_decryption:
@@ -1303,6 +1310,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_trans_id = trans_id_after_send
self.stop()
+ self._syncer_pool = None
return cur_target_gen, cur_target_trans_id
def start(self):
@@ -1312,6 +1320,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
with self._stop_lock:
self._stopped = False
+
+ def stop_syncer(self):
+ with self._stop_lock:
+ self._stopped = True
+
def stop(self):
"""
Mark current sync session as stopped.
@@ -1320,8 +1333,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
enough information to the synchronizer so the sync session can be
recovered afterwards.
"""
- with self._stop_lock:
- self._stopped = True
+ self.stop_syncer()
+ if self._syncer_pool:
+ self._syncer_pool.cancel_threads()
@property
def stopped(self):