diff options
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r-- | client/src/leap/soledad/client/target.py | 87 |
1 files changed, 54 insertions, 33 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 70e4d3a2..ae2010a6 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -28,12 +28,10 @@ import logging import re import urllib import threading -import urlparse from collections import defaultdict from time import sleep from uuid import uuid4 -from contextlib import contextmanager import simplejson as json from taskthread import TimerTask @@ -44,7 +42,6 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase from zope.proxy import ProxyBase from zope.proxy import sameProxiedObjects, setProxiedObject -from leap.soledad.common import soledad_assert from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth from leap.soledad.client.crypto import is_symmetrically_encrypted @@ -87,7 +84,7 @@ class DocumentSyncerThread(threading.Thread): """ def __init__(self, doc_syncer, release_method, failed_method, - idx, total, last_request_lock=None, last_callback_lock=None): + idx, total, last_request_lock=None, last_callback_lock=None): """ Initialize a new syncer thread. @@ -246,7 +243,7 @@ class DocumentSyncerPool(object): """ def __init__(self, raw_url, raw_creds, query_string, headers, - ensure_callback, stop_method): + ensure_callback, stop_method): """ Initialize the document syncer pool. @@ -279,7 +276,7 @@ class DocumentSyncerPool(object): self._threads = [] def new_syncer_thread(self, idx, total, last_request_lock=None, - last_callback_lock=None): + last_callback_lock=None): """ Yield a new document syncer thread. @@ -376,6 +373,12 @@ class DocumentSyncerPool(object): t.request_lock.release() t.callback_lock.acquire(False) # just in case t.callback_lock.release() + # release any blocking semaphores + for i in xrange(DocumentSyncerPool.POOL_SIZE): + try: + self._semaphore_pool.release() + except ValueError: + break logger.warning("Soledad sync: cancelled sync threads.") def cleanup(self): @@ -613,7 +616,7 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): self._conn.endheaders() def _get_doc(self, received, sync_id, last_known_generation, - last_known_trans_id): + last_known_trans_id): """ Get a sync document from server by means of a POST request. @@ -652,7 +655,7 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): return self._response() def _put_doc(self, sync_id, last_known_generation, last_known_trans_id, - id, rev, content, gen, trans_id, number_of_docs, doc_idx): + id, rev, content, gen, trans_id, number_of_docs, doc_idx): """ Put a sync document on server by means of a POST request. @@ -759,7 +762,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, - sync_db=None, sync_db_write_lock=None): + sync_db=None, sync_db_write_lock=None): """ Initialize the SoledadSyncTarget. @@ -916,7 +919,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ new_generation, new_transaction_id, number_of_changes, doc_id, \ rev, content, gen, trans_id = \ - self._parse_received_doc_response(response) + self._parse_received_doc_response(response) if doc_id is not None: # decrypt incoming document and insert into local database # ------------------------------------------------------------- @@ -1125,11 +1128,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ self._ensure_callback = ensure_callback - if defer_decryption: + if defer_decryption and self._sync_db is not None: self._sync_exchange_lock.acquire() self._setup_sync_decr_pool() self._setup_sync_watcher() self._defer_decryption = True + else: + # fall back + defer_decryption = False self.start() @@ -1140,8 +1146,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): setProxiedObject(self._insert_doc_cb[source_replica_uid], return_doc_cb) - if not self.clear_to_sync(): - raise PendingReceivedDocsSyncError + # empty the database before starting a new sync + if defer_decryption is True and not self.clear_to_sync(): + self._sync_decr_pool.empty() self._ensure_connection() if self._trace_hook: # for tests @@ -1162,7 +1169,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._raw_url, self._raw_creds, url, headers, ensure_callback, self.stop) threads = [] - last_request_lock = None last_callback_lock = None sent = 0 total = len(docs_by_generations) @@ -1218,7 +1224,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): t.doc_syncer.set_request_method( 'put', sync_id, cur_target_gen, cur_target_trans_id, id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, - trans_id=trans_id, number_of_docs=number_of_docs, doc_idx=sent + 1) + trans_id=trans_id, number_of_docs=number_of_docs, + doc_idx=sent + 1) # set the success calback def _success_callback(idx, total, response): @@ -1242,20 +1249,30 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # save thread and append t.start() threads.append((t, doc)) - last_request_lock = t.request_lock last_callback_lock = t.callback_lock sent += 1 # make sure all threads finished and we have up-to-date info + last_successful_thread = None while threads: # check if there are failures t, doc = threads.pop(0) t.join() if t.success: synced.append((doc.doc_id, doc.rev)) + last_successful_thread = t - if defer_decryption: - self._sync_watcher.start() + # delete documents from the sync database + if defer_encryption: + self.delete_encrypted_docs_from_db(synced) + + # get target gen and trans_id after docs + gen_after_send = None + trans_id_after_send = None + if last_successful_thread is not None: + response_dict = json.loads(last_successful_thread.response[0])[0] + gen_after_send = response_dict['new_generation'] + trans_id_after_send = response_dict['new_transaction_id'] # get docs from target if self.stopped is False: @@ -1264,20 +1281,24 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): last_known_generation, last_known_trans_id, headers, return_doc_cb, ensure_callback, sync_id, syncer_pool, defer_decryption=defer_decryption) - syncer_pool.cleanup() - # delete documents from the sync database - if defer_encryption: - self.delete_encrypted_docs_from_db(synced) + syncer_pool.cleanup() - # wait for deferred decryption to finish + # decrypt docs in case of deferred decryption if defer_decryption: + self._sync_watcher.start() while self.clear_to_sync() is False: sleep(self.DECRYPT_TASK_PERIOD) self._teardown_sync_watcher() self._teardown_sync_decr_pool() self._sync_exchange_lock.release() + # update gen and trans id info in case we just sent and did not + # receive docs. + if gen_after_send is not None and gen_after_send > cur_target_gen: + cur_target_gen = gen_after_send + cur_target_trans_id = trans_id_after_send + self.stop() return cur_target_gen, cur_target_trans_id @@ -1322,13 +1343,15 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :type doc_rev: str """ encr = SyncEncrypterPool - c = self._sync_db.cursor() sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( encr.TABLE_NAME,)) - c.execute(sql, (doc_id, doc_rev)) - res = c.fetchall() - if len(res) != 0: - return res[0][0] + res = self._sync_db.select(sql, (doc_id, doc_rev)) + try: + val = res.next() + return val[0] + except StopIteration: + # no doc found + return None def delete_encrypted_docs_from_db(self, docs_ids): """ @@ -1341,12 +1364,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ if docs_ids: encr = SyncEncrypterPool - c = self._sync_db.cursor() for doc_id, doc_rev in docs_ids: sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( encr.TABLE_NAME,)) - c.execute(sql, (doc_id, doc_rev)) - self._sync_db.commit() + self._sync_db.execute(sql, (doc_id, doc_rev)) def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): """ @@ -1402,7 +1423,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :rtype: bool """ if self._sync_decr_pool is not None: - return self._sync_decr_pool.count_received_encrypted_docs() == 0 + return self._sync_decr_pool.count_docs_in_sync_db() == 0 else: return True @@ -1442,7 +1463,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): decrypter = self._sync_decr_pool decrypter.decrypt_received_docs() - done = decrypter.process_decrypted() + decrypter.process_decrypted() def _sign_request(self, method, url_query, params): """ |