diff options
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r-- | client/src/leap/soledad/client/target.py | 94 |
1 files changed, 55 insertions, 39 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 651d3ee5..dd61c070 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -14,14 +14,10 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ A U1DB backend for encrypting data before sending to server and decrypting after receiving. """ - - import cStringIO import gzip import logging @@ -34,7 +30,7 @@ from time import sleep from uuid import uuid4 import simplejson as json -from taskthread import TimerTask + from u1db import errors from u1db.remote import utils, http_errors from u1db.remote.http_target import HTTPSyncTarget @@ -42,6 +38,8 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase from zope.proxy import ProxyBase from zope.proxy import sameProxiedObjects, setProxiedObject +from twisted.internet.task import LoopingCall + from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth from leap.soledad.client.crypto import is_symmetrically_encrypted @@ -190,7 +188,7 @@ class DocumentSyncerThread(threading.Thread): self._doc_syncer.failure_callback( self._idx, self._total, self._exception) - self._failed_method(self) + self._failed_method() # we do not release the callback lock here because we # failed and so we don't want other threads to succeed. @@ -350,7 +348,7 @@ class DocumentSyncerPool(object): self._threads.remove(syncer_thread) self._semaphore_pool.release() - def cancel_threads(self, calling_thread): + def cancel_threads(self): """ Stop all threads in the pool. """ @@ -755,7 +753,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ Period of recurrence of the periodic decrypting task, in seconds. """ - DECRYPT_TASK_PERIOD = 0.5 + DECRYPT_LOOP_PERIOD = 0.5 # # Modified HTTPSyncTarget methods. @@ -796,13 +794,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._sync_exchange_lock = threading.Lock() self.source_replica_uid = source_replica_uid self._defer_decryption = False + self._syncer_pool = None # deferred decryption attributes self._sync_db = None self._sync_db_write_lock = None self._decryption_callback = None self._sync_decr_pool = None - self._sync_watcher = None + self._sync_loop = None if sync_db and sync_db_write_lock is not None: self._sync_db = sync_db self._sync_db_write_lock = sync_db_write_lock @@ -828,23 +827,22 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._sync_decr_pool.close() self._sync_decr_pool = None - def _setup_sync_watcher(self): + def _setup_sync_loop(self): """ - Set up the sync watcher for deferred decryption. + Set up the sync loop for deferred decryption. """ - if self._sync_watcher is None: - self._sync_watcher = TimerTask( - self._decrypt_syncing_received_docs, - delay=self.DECRYPT_TASK_PERIOD) + if self._sync_loop is None: + self._sync_loop = LoopingCall( + self._decrypt_syncing_received_docs) + self._sync_loop.start(self.DECRYPT_LOOP_PERIOD) - def _teardown_sync_watcher(self): + def _teardown_sync_loop(self): """ - Tear down the sync watcher. + Tear down the sync loop. """ - if self._sync_watcher is not None: - self._sync_watcher.stop() - self._sync_watcher.shutdown() - self._sync_watcher = None + if self._sync_loop is not None: + self._sync_loop.stop() + self._sync_loop = None def _get_replica_uid(self, url): """ @@ -955,7 +953,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, headers, return_doc_cb, ensure_callback, sync_id, - syncer_pool, defer_decryption=False): + defer_decryption=False): """ Fetch sync documents from the remote database and insert them in the local database. @@ -1016,7 +1014,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): break # launch a thread to fetch one document from target - t = syncer_pool.new_syncer_thread( + t = self._syncer_pool.new_syncer_thread( idx, number_of_changes, last_callback_lock=last_callback_lock) @@ -1050,6 +1048,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): t.join() if t.success: number_of_changes, _, _ = t.result + else: + raise t.exception first_request = False # make sure all threads finished and we have up-to-date info @@ -1060,6 +1060,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): t.join() if t.success: last_successful_thread = t + else: + raise t.exception # get information about last successful thread if last_successful_thread is not None: @@ -1131,7 +1133,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if defer_decryption and self._sync_db is not None: self._sync_exchange_lock.acquire() self._setup_sync_decr_pool() - self._setup_sync_watcher() + self._setup_sync_loop() self._defer_decryption = True else: # fall back @@ -1165,9 +1167,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): logger.debug("Soledad sync send status: %s" % msg) defer_encryption = self._sync_db is not None - syncer_pool = DocumentSyncerPool( + self._syncer_pool = DocumentSyncerPool( self._raw_url, self._raw_creds, url, headers, ensure_callback, - self.stop) + self.stop_syncer) threads = [] last_callback_lock = None sent = 0 @@ -1212,7 +1214,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # ------------------------------------------------------------- # end of symmetric encryption # ------------------------------------------------------------- - t = syncer_pool.new_syncer_thread( + t = self._syncer_pool.new_syncer_thread( sent + 1, total, last_request_lock=last_request_lock, last_callback_lock=last_callback_lock) @@ -1267,6 +1269,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if t.success: synced.append((doc.doc_id, doc.rev)) last_successful_thread = t + else: + raise t.exception # delete documents from the sync database if defer_encryption: @@ -1285,17 +1289,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, last_known_generation, last_known_trans_id, headers, - return_doc_cb, ensure_callback, sync_id, syncer_pool, + return_doc_cb, ensure_callback, sync_id, defer_decryption=defer_decryption) - syncer_pool.cleanup() + self._syncer_pool.cleanup() # decrypt docs in case of deferred decryption if defer_decryption: - self._sync_watcher.start() while self.clear_to_sync() is False: - sleep(self.DECRYPT_TASK_PERIOD) - self._teardown_sync_watcher() + sleep(self.DECRYPT_LOOP_PERIOD) + self._teardown_sync_loop() self._teardown_sync_decr_pool() self._sync_exchange_lock.release() @@ -1306,6 +1309,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_trans_id = trans_id_after_send self.stop() + self._syncer_pool = None return cur_target_gen, cur_target_trans_id def start(self): @@ -1315,6 +1319,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): with self._stop_lock: self._stopped = False + + def stop_syncer(self): + with self._stop_lock: + self._stopped = True + def stop(self): """ Mark current sync session as stopped. @@ -1323,8 +1332,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): enough information to the synchronizer so the sync session can be recovered afterwards. """ - with self._stop_lock: - self._stopped = True + self.stop_syncer() + if self._syncer_pool: + self._syncer_pool.cancel_threads() @property def stopped(self): @@ -1351,11 +1361,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): encr = SyncEncrypterPool sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( encr.TABLE_NAME,)) - res = self._sync_db.select(sql, (doc_id, doc_rev)) - try: - val = res.next() + res = self._fetchall(sql, (doc_id, doc_rev)) + if res: + val = res.pop() return val[0] - except StopIteration: + else: # no doc found return None @@ -1460,7 +1470,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): Decrypt the documents received from remote replica and insert them into the local one. - Called periodically from TimerTask self._sync_watcher. + Called periodically from LoopingCall self._sync_loop. """ if sameProxiedObjects( self._insert_doc_cb.get(self.source_replica_uid), @@ -1497,3 +1507,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :type token: str """ TokenBasedAuth.set_token_credentials(self, uuid, token) + + def _fetchall(self, *args, **kwargs): + with self._sync_db: + c = self._sync_db.cursor() + c.execute(*args, **kwargs) + return c.fetchall() |