summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/target.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r--client/src/leap/soledad/client/target.py94
1 files changed, 55 insertions, 39 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 651d3ee5..dd61c070 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -14,14 +14,10 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
A U1DB backend for encrypting data before sending to server and decrypting
after receiving.
"""
-
-
import cStringIO
import gzip
import logging
@@ -34,7 +30,7 @@ from time import sleep
from uuid import uuid4
import simplejson as json
-from taskthread import TimerTask
+
from u1db import errors
from u1db.remote import utils, http_errors
from u1db.remote.http_target import HTTPSyncTarget
@@ -42,6 +38,8 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
from zope.proxy import ProxyBase
from zope.proxy import sameProxiedObjects, setProxiedObject
+from twisted.internet.task import LoopingCall
+
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
from leap.soledad.client.crypto import is_symmetrically_encrypted
@@ -190,7 +188,7 @@ class DocumentSyncerThread(threading.Thread):
self._doc_syncer.failure_callback(
self._idx, self._total, self._exception)
- self._failed_method(self)
+ self._failed_method()
# we do not release the callback lock here because we
# failed and so we don't want other threads to succeed.
@@ -350,7 +348,7 @@ class DocumentSyncerPool(object):
self._threads.remove(syncer_thread)
self._semaphore_pool.release()
- def cancel_threads(self, calling_thread):
+ def cancel_threads(self):
"""
Stop all threads in the pool.
"""
@@ -755,7 +753,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
Period of recurrence of the periodic decrypting task, in seconds.
"""
- DECRYPT_TASK_PERIOD = 0.5
+ DECRYPT_LOOP_PERIOD = 0.5
#
# Modified HTTPSyncTarget methods.
@@ -796,13 +794,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._sync_exchange_lock = threading.Lock()
self.source_replica_uid = source_replica_uid
self._defer_decryption = False
+ self._syncer_pool = None
# deferred decryption attributes
self._sync_db = None
self._sync_db_write_lock = None
self._decryption_callback = None
self._sync_decr_pool = None
- self._sync_watcher = None
+ self._sync_loop = None
if sync_db and sync_db_write_lock is not None:
self._sync_db = sync_db
self._sync_db_write_lock = sync_db_write_lock
@@ -828,23 +827,22 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._sync_decr_pool.close()
self._sync_decr_pool = None
- def _setup_sync_watcher(self):
+ def _setup_sync_loop(self):
"""
- Set up the sync watcher for deferred decryption.
+ Set up the sync loop for deferred decryption.
"""
- if self._sync_watcher is None:
- self._sync_watcher = TimerTask(
- self._decrypt_syncing_received_docs,
- delay=self.DECRYPT_TASK_PERIOD)
+ if self._sync_loop is None:
+ self._sync_loop = LoopingCall(
+ self._decrypt_syncing_received_docs)
+ self._sync_loop.start(self.DECRYPT_LOOP_PERIOD)
- def _teardown_sync_watcher(self):
+ def _teardown_sync_loop(self):
"""
- Tear down the sync watcher.
+ Tear down the sync loop.
"""
- if self._sync_watcher is not None:
- self._sync_watcher.stop()
- self._sync_watcher.shutdown()
- self._sync_watcher = None
+ if self._sync_loop is not None:
+ self._sync_loop.stop()
+ self._sync_loop = None
def _get_replica_uid(self, url):
"""
@@ -955,7 +953,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
headers, return_doc_cb, ensure_callback, sync_id,
- syncer_pool, defer_decryption=False):
+ defer_decryption=False):
"""
Fetch sync documents from the remote database and insert them in the
local database.
@@ -1016,7 +1014,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
break
# launch a thread to fetch one document from target
- t = syncer_pool.new_syncer_thread(
+ t = self._syncer_pool.new_syncer_thread(
idx, number_of_changes,
last_callback_lock=last_callback_lock)
@@ -1050,6 +1048,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
t.join()
if t.success:
number_of_changes, _, _ = t.result
+ else:
+ raise t.exception
first_request = False
# make sure all threads finished and we have up-to-date info
@@ -1060,6 +1060,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
t.join()
if t.success:
last_successful_thread = t
+ else:
+ raise t.exception
# get information about last successful thread
if last_successful_thread is not None:
@@ -1131,7 +1133,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if defer_decryption and self._sync_db is not None:
self._sync_exchange_lock.acquire()
self._setup_sync_decr_pool()
- self._setup_sync_watcher()
+ self._setup_sync_loop()
self._defer_decryption = True
else:
# fall back
@@ -1165,9 +1167,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
logger.debug("Soledad sync send status: %s" % msg)
defer_encryption = self._sync_db is not None
- syncer_pool = DocumentSyncerPool(
+ self._syncer_pool = DocumentSyncerPool(
self._raw_url, self._raw_creds, url, headers, ensure_callback,
- self.stop)
+ self.stop_syncer)
threads = []
last_callback_lock = None
sent = 0
@@ -1212,7 +1214,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# -------------------------------------------------------------
# end of symmetric encryption
# -------------------------------------------------------------
- t = syncer_pool.new_syncer_thread(
+ t = self._syncer_pool.new_syncer_thread(
sent + 1, total, last_request_lock=last_request_lock,
last_callback_lock=last_callback_lock)
@@ -1267,6 +1269,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if t.success:
synced.append((doc.doc_id, doc.rev))
last_successful_thread = t
+ else:
+ raise t.exception
# delete documents from the sync database
if defer_encryption:
@@ -1285,17 +1289,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
last_known_generation, last_known_trans_id, headers,
- return_doc_cb, ensure_callback, sync_id, syncer_pool,
+ return_doc_cb, ensure_callback, sync_id,
defer_decryption=defer_decryption)
- syncer_pool.cleanup()
+ self._syncer_pool.cleanup()
# decrypt docs in case of deferred decryption
if defer_decryption:
- self._sync_watcher.start()
while self.clear_to_sync() is False:
- sleep(self.DECRYPT_TASK_PERIOD)
- self._teardown_sync_watcher()
+ sleep(self.DECRYPT_LOOP_PERIOD)
+ self._teardown_sync_loop()
self._teardown_sync_decr_pool()
self._sync_exchange_lock.release()
@@ -1306,6 +1309,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_trans_id = trans_id_after_send
self.stop()
+ self._syncer_pool = None
return cur_target_gen, cur_target_trans_id
def start(self):
@@ -1315,6 +1319,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
with self._stop_lock:
self._stopped = False
+
+ def stop_syncer(self):
+ with self._stop_lock:
+ self._stopped = True
+
def stop(self):
"""
Mark current sync session as stopped.
@@ -1323,8 +1332,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
enough information to the synchronizer so the sync session can be
recovered afterwards.
"""
- with self._stop_lock:
- self._stopped = True
+ self.stop_syncer()
+ if self._syncer_pool:
+ self._syncer_pool.cancel_threads()
@property
def stopped(self):
@@ -1351,11 +1361,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
encr = SyncEncrypterPool
sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
encr.TABLE_NAME,))
- res = self._sync_db.select(sql, (doc_id, doc_rev))
- try:
- val = res.next()
+ res = self._fetchall(sql, (doc_id, doc_rev))
+ if res:
+ val = res.pop()
return val[0]
- except StopIteration:
+ else:
# no doc found
return None
@@ -1460,7 +1470,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
Decrypt the documents received from remote replica and insert them
into the local one.
- Called periodically from TimerTask self._sync_watcher.
+ Called periodically from LoopingCall self._sync_loop.
"""
if sameProxiedObjects(
self._insert_doc_cb.get(self.source_replica_uid),
@@ -1497,3 +1507,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:type token: str
"""
TokenBasedAuth.set_token_credentials(self, uuid, token)
+
+ def _fetchall(self, *args, **kwargs):
+ with self._sync_db:
+ c = self._sync_db.cursor()
+ c.execute(*args, **kwargs)
+ return c.fetchall()