From ba109986d55e008c7855d20538d84f2c69ca9271 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 3 Jul 2014 10:23:11 -0300 Subject: Properly stop sync and cancel threads on fail. --- client/src/leap/soledad/client/target.py | 46 ++++++++++---------------------- 1 file changed, 14 insertions(+), 32 deletions(-) (limited to 'client/src/leap/soledad') diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 28edd027..c459925d 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -246,7 +246,7 @@ class DocumentSyncerPool(object): """ def __init__(self, raw_url, raw_creds, query_string, headers, - ensure_callback): + ensure_callback, stop_method): """ Initialize the document syncer pool. @@ -269,6 +269,7 @@ class DocumentSyncerPool(object): self._query_string = query_string self._headers = headers self._ensure_callback = ensure_callback + self._stop_method = stop_method # pool attributes self._failures = False self._semaphore_pool = threading.BoundedSemaphore( @@ -356,9 +357,11 @@ class DocumentSyncerPool(object): """ Stop all threads in the pool. """ + # stop sync + self._stop_method() stopped = [] # stop all threads - logger.warning("Soledad sync: cancelling sync threads.") + logger.warning("Soledad sync: cancelling sync threads...") with self._pool_access_lock: self._failures = True while self._threads: @@ -373,8 +376,7 @@ class DocumentSyncerPool(object): t.request_lock.release() t.callback_lock.acquire(False) # just in case t.callback_lock.release() - if t is not calling_thread and t.is_alive(): - t.join() + logger.warning("Soledad sync: cancelled sync threads.") def cleanup(self): """ @@ -414,9 +416,6 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): self._request_method = None self._success_callback = None self._failure_callback = None - # storage of info returned by the request - self._success = None - self._exception = None def _reset(self): """ @@ -426,8 +425,6 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): self._success_callback = None self._failure_callback = None self._request_method = None - self._success = None - self._exception = None def set_request_method(self, method, *args, **kwargs): """ @@ -994,12 +991,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if self.stopped is True: break - # bail out if any thread failed - if syncer_pool.failures is True: - #syncer_pool.cancel_threads() - self.stop() - break - # launch a thread to fetch one document from target t = syncer_pool.new_syncer_thread( idx, number_of_changes, @@ -1037,9 +1028,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): number_of_changes, _, _ = t.result first_request = False - if syncer_pool.failures is True: - self.stop() - # make sure all threads finished and we have up-to-date info last_successful_thread = None while threads: @@ -1142,7 +1130,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): defer_encryption = self._sync_db is not None syncer_pool = DocumentSyncerPool( - self._raw_url, self._raw_creds, url, headers, ensure_callback) + self._raw_url, self._raw_creds, url, headers, ensure_callback, + self.stop) threads = [] last_request_lock = None last_callback_lock = None @@ -1157,11 +1146,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if self.stopped is True: break - # bail out if any thread failed - if syncer_pool.failures is True: - self.stop() - break - # skip non-syncable docs if isinstance(doc, SoledadDocument) and not doc.syncable: continue @@ -1236,9 +1220,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # make sure all threads finished and we have up-to-date info while threads: # check if there are failures - if syncer_pool.failures is True: - #syncer_pool.cancel_threads() - self.stop() t, doc = threads.pop(0) t.join() if t.success: @@ -1248,11 +1229,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._sync_watcher.start() # get docs from target - cur_target_gen, cur_target_trans_id = self._get_remote_docs( - url, - last_known_generation, last_known_trans_id, headers, - return_doc_cb, ensure_callback, sync_id, syncer_pool, - defer_decryption=defer_decryption) + if self.stopped is False: + cur_target_gen, cur_target_trans_id = self._get_remote_docs( + url, + last_known_generation, last_known_trans_id, headers, + return_doc_cb, ensure_callback, sync_id, syncer_pool, + defer_decryption=defer_decryption) # delete documents from the sync database if defer_encryption: -- cgit v1.2.3