summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/target.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r--client/src/leap/soledad/client/target.py87
1 files changed, 54 insertions, 33 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 70e4d3a2..ae2010a6 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -28,12 +28,10 @@ import logging
import re
import urllib
import threading
-import urlparse
from collections import defaultdict
from time import sleep
from uuid import uuid4
-from contextlib import contextmanager
import simplejson as json
from taskthread import TimerTask
@@ -44,7 +42,6 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
from zope.proxy import ProxyBase
from zope.proxy import sameProxiedObjects, setProxiedObject
-from leap.soledad.common import soledad_assert
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
from leap.soledad.client.crypto import is_symmetrically_encrypted
@@ -87,7 +84,7 @@ class DocumentSyncerThread(threading.Thread):
"""
def __init__(self, doc_syncer, release_method, failed_method,
- idx, total, last_request_lock=None, last_callback_lock=None):
+ idx, total, last_request_lock=None, last_callback_lock=None):
"""
Initialize a new syncer thread.
@@ -246,7 +243,7 @@ class DocumentSyncerPool(object):
"""
def __init__(self, raw_url, raw_creds, query_string, headers,
- ensure_callback, stop_method):
+ ensure_callback, stop_method):
"""
Initialize the document syncer pool.
@@ -279,7 +276,7 @@ class DocumentSyncerPool(object):
self._threads = []
def new_syncer_thread(self, idx, total, last_request_lock=None,
- last_callback_lock=None):
+ last_callback_lock=None):
"""
Yield a new document syncer thread.
@@ -376,6 +373,12 @@ class DocumentSyncerPool(object):
t.request_lock.release()
t.callback_lock.acquire(False) # just in case
t.callback_lock.release()
+ # release any blocking semaphores
+ for i in xrange(DocumentSyncerPool.POOL_SIZE):
+ try:
+ self._semaphore_pool.release()
+ except ValueError:
+ break
logger.warning("Soledad sync: cancelled sync threads.")
def cleanup(self):
@@ -613,7 +616,7 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):
self._conn.endheaders()
def _get_doc(self, received, sync_id, last_known_generation,
- last_known_trans_id):
+ last_known_trans_id):
"""
Get a sync document from server by means of a POST request.
@@ -652,7 +655,7 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):
return self._response()
def _put_doc(self, sync_id, last_known_generation, last_known_trans_id,
- id, rev, content, gen, trans_id, number_of_docs, doc_idx):
+ id, rev, content, gen, trans_id, number_of_docs, doc_idx):
"""
Put a sync document on server by means of a POST request.
@@ -759,7 +762,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
#
def __init__(self, url, source_replica_uid=None, creds=None, crypto=None,
- sync_db=None, sync_db_write_lock=None):
+ sync_db=None, sync_db_write_lock=None):
"""
Initialize the SoledadSyncTarget.
@@ -916,7 +919,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
new_generation, new_transaction_id, number_of_changes, doc_id, \
rev, content, gen, trans_id = \
- self._parse_received_doc_response(response)
+ self._parse_received_doc_response(response)
if doc_id is not None:
# decrypt incoming document and insert into local database
# -------------------------------------------------------------
@@ -1125,11 +1128,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
self._ensure_callback = ensure_callback
- if defer_decryption:
+ if defer_decryption and self._sync_db is not None:
self._sync_exchange_lock.acquire()
self._setup_sync_decr_pool()
self._setup_sync_watcher()
self._defer_decryption = True
+ else:
+ # fall back
+ defer_decryption = False
self.start()
@@ -1140,8 +1146,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
setProxiedObject(self._insert_doc_cb[source_replica_uid],
return_doc_cb)
- if not self.clear_to_sync():
- raise PendingReceivedDocsSyncError
+ # empty the database before starting a new sync
+ if defer_decryption is True and not self.clear_to_sync():
+ self._sync_decr_pool.empty()
self._ensure_connection()
if self._trace_hook: # for tests
@@ -1162,7 +1169,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._raw_url, self._raw_creds, url, headers, ensure_callback,
self.stop)
threads = []
- last_request_lock = None
last_callback_lock = None
sent = 0
total = len(docs_by_generations)
@@ -1218,7 +1224,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
t.doc_syncer.set_request_method(
'put', sync_id, cur_target_gen, cur_target_trans_id,
id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
- trans_id=trans_id, number_of_docs=number_of_docs, doc_idx=sent + 1)
+ trans_id=trans_id, number_of_docs=number_of_docs,
+ doc_idx=sent + 1)
# set the success calback
def _success_callback(idx, total, response):
@@ -1242,20 +1249,30 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# save thread and append
t.start()
threads.append((t, doc))
- last_request_lock = t.request_lock
last_callback_lock = t.callback_lock
sent += 1
# make sure all threads finished and we have up-to-date info
+ last_successful_thread = None
while threads:
# check if there are failures
t, doc = threads.pop(0)
t.join()
if t.success:
synced.append((doc.doc_id, doc.rev))
+ last_successful_thread = t
- if defer_decryption:
- self._sync_watcher.start()
+ # delete documents from the sync database
+ if defer_encryption:
+ self.delete_encrypted_docs_from_db(synced)
+
+ # get target gen and trans_id after docs
+ gen_after_send = None
+ trans_id_after_send = None
+ if last_successful_thread is not None:
+ response_dict = json.loads(last_successful_thread.response[0])[0]
+ gen_after_send = response_dict['new_generation']
+ trans_id_after_send = response_dict['new_transaction_id']
# get docs from target
if self.stopped is False:
@@ -1264,20 +1281,24 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
last_known_generation, last_known_trans_id, headers,
return_doc_cb, ensure_callback, sync_id, syncer_pool,
defer_decryption=defer_decryption)
- syncer_pool.cleanup()
- # delete documents from the sync database
- if defer_encryption:
- self.delete_encrypted_docs_from_db(synced)
+ syncer_pool.cleanup()
- # wait for deferred decryption to finish
+ # decrypt docs in case of deferred decryption
if defer_decryption:
+ self._sync_watcher.start()
while self.clear_to_sync() is False:
sleep(self.DECRYPT_TASK_PERIOD)
self._teardown_sync_watcher()
self._teardown_sync_decr_pool()
self._sync_exchange_lock.release()
+ # update gen and trans id info in case we just sent and did not
+ # receive docs.
+ if gen_after_send is not None and gen_after_send > cur_target_gen:
+ cur_target_gen = gen_after_send
+ cur_target_trans_id = trans_id_after_send
+
self.stop()
return cur_target_gen, cur_target_trans_id
@@ -1322,13 +1343,15 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:type doc_rev: str
"""
encr = SyncEncrypterPool
- c = self._sync_db.cursor()
sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
encr.TABLE_NAME,))
- c.execute(sql, (doc_id, doc_rev))
- res = c.fetchall()
- if len(res) != 0:
- return res[0][0]
+ res = self._sync_db.select(sql, (doc_id, doc_rev))
+ try:
+ val = res.next()
+ return val[0]
+ except StopIteration:
+ # no doc found
+ return None
def delete_encrypted_docs_from_db(self, docs_ids):
"""
@@ -1341,12 +1364,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
if docs_ids:
encr = SyncEncrypterPool
- c = self._sync_db.cursor()
for doc_id, doc_rev in docs_ids:
sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % (
encr.TABLE_NAME,))
- c.execute(sql, (doc_id, doc_rev))
- self._sync_db.commit()
+ self._sync_db.execute(sql, (doc_id, doc_rev))
def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
"""
@@ -1402,7 +1423,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:rtype: bool
"""
if self._sync_decr_pool is not None:
- return self._sync_decr_pool.count_received_encrypted_docs() == 0
+ return self._sync_decr_pool.count_docs_in_sync_db() == 0
else:
return True
@@ -1442,7 +1463,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
decrypter = self._sync_decr_pool
decrypter.decrypt_received_docs()
- done = decrypter.process_decrypted()
+ decrypter.process_decrypted()
def _sign_request(self, method, url_query, params):
"""