summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2014-07-03 10:23:11 -0300
committerdrebs <drebs@leap.se>2014-07-03 10:23:11 -0300
commitba109986d55e008c7855d20538d84f2c69ca9271 (patch)
treef5a1f124e421beb588cafb3853389c7a9c6e94a7
parentdd82ea1c9055eb46bdfac46c9a1ef9ed41fac850 (diff)
Properly stop sync and cancel threads on fail.
-rw-r--r--client/src/leap/soledad/client/target.py46
1 files changed, 14 insertions, 32 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 28edd027..c459925d 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -246,7 +246,7 @@ class DocumentSyncerPool(object):
"""
def __init__(self, raw_url, raw_creds, query_string, headers,
- ensure_callback):
+ ensure_callback, stop_method):
"""
Initialize the document syncer pool.
@@ -269,6 +269,7 @@ class DocumentSyncerPool(object):
self._query_string = query_string
self._headers = headers
self._ensure_callback = ensure_callback
+ self._stop_method = stop_method
# pool attributes
self._failures = False
self._semaphore_pool = threading.BoundedSemaphore(
@@ -356,9 +357,11 @@ class DocumentSyncerPool(object):
"""
Stop all threads in the pool.
"""
+ # stop sync
+ self._stop_method()
stopped = []
# stop all threads
- logger.warning("Soledad sync: cancelling sync threads.")
+ logger.warning("Soledad sync: cancelling sync threads...")
with self._pool_access_lock:
self._failures = True
while self._threads:
@@ -373,8 +376,7 @@ class DocumentSyncerPool(object):
t.request_lock.release()
t.callback_lock.acquire(False) # just in case
t.callback_lock.release()
- if t is not calling_thread and t.is_alive():
- t.join()
+ logger.warning("Soledad sync: cancelled sync threads.")
def cleanup(self):
"""
@@ -414,9 +416,6 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):
self._request_method = None
self._success_callback = None
self._failure_callback = None
- # storage of info returned by the request
- self._success = None
- self._exception = None
def _reset(self):
"""
@@ -426,8 +425,6 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):
self._success_callback = None
self._failure_callback = None
self._request_method = None
- self._success = None
- self._exception = None
def set_request_method(self, method, *args, **kwargs):
"""
@@ -994,12 +991,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if self.stopped is True:
break
- # bail out if any thread failed
- if syncer_pool.failures is True:
- #syncer_pool.cancel_threads()
- self.stop()
- break
-
# launch a thread to fetch one document from target
t = syncer_pool.new_syncer_thread(
idx, number_of_changes,
@@ -1037,9 +1028,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
number_of_changes, _, _ = t.result
first_request = False
- if syncer_pool.failures is True:
- self.stop()
-
# make sure all threads finished and we have up-to-date info
last_successful_thread = None
while threads:
@@ -1142,7 +1130,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
defer_encryption = self._sync_db is not None
syncer_pool = DocumentSyncerPool(
- self._raw_url, self._raw_creds, url, headers, ensure_callback)
+ self._raw_url, self._raw_creds, url, headers, ensure_callback,
+ self.stop)
threads = []
last_request_lock = None
last_callback_lock = None
@@ -1157,11 +1146,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if self.stopped is True:
break
- # bail out if any thread failed
- if syncer_pool.failures is True:
- self.stop()
- break
-
# skip non-syncable docs
if isinstance(doc, SoledadDocument) and not doc.syncable:
continue
@@ -1236,9 +1220,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# make sure all threads finished and we have up-to-date info
while threads:
# check if there are failures
- if syncer_pool.failures is True:
- #syncer_pool.cancel_threads()
- self.stop()
t, doc = threads.pop(0)
t.join()
if t.success:
@@ -1248,11 +1229,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._sync_watcher.start()
# get docs from target
- cur_target_gen, cur_target_trans_id = self._get_remote_docs(
- url,
- last_known_generation, last_known_trans_id, headers,
- return_doc_cb, ensure_callback, sync_id, syncer_pool,
- defer_decryption=defer_decryption)
+ if self.stopped is False:
+ cur_target_gen, cur_target_trans_id = self._get_remote_docs(
+ url,
+ last_known_generation, last_known_trans_id, headers,
+ return_doc_cb, ensure_callback, sync_id, syncer_pool,
+ defer_decryption=defer_decryption)
# delete documents from the sync database
if defer_encryption: