summaryrefslogtreecommitdiff
path: root/client/src/leap
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2015-05-13 10:31:47 -0300
committerdrebs <drebs@leap.se>2015-05-20 10:16:46 -0300
commit94cbe24f6c6cd54e14d8d1b14e617c2d52c427fd (patch)
tree7d682242f2e9a1e293d15b42e2d4c58852887321 /client/src/leap
parent93717f50c9e8fc6295f74b6117268ba595f13ce9 (diff)
[feature] use twisted adbapi for async encryption
The access to the sync db was modified to use twisted.enterprise.adbapi, but only the asynchronous decryption of incoming documents during sync was adapted. This commit modifies the asynchornous encryption of documents to also use the adbapi for accessing the sync db.
Diffstat (limited to 'client/src/leap')
-rw-r--r--client/src/leap/soledad/client/encdecpool.py175
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py64
-rw-r--r--client/src/leap/soledad/client/sync.py4
-rw-r--r--client/src/leap/soledad/client/target.py217
4 files changed, 275 insertions, 185 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 0466ec5d..0c1f92ea 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -85,6 +85,36 @@ class SyncEncryptDecryptPool(object):
logger.debug("Terminating %s" % (self.__class__.__name__,))
self._pool.terminate()
+ def _runOperation(self, query, *args):
+ """
+ Run an operation on the sync db.
+
+ :param query: The query to be executed.
+ :type query: str
+ :param args: A list of query arguments.
+ :type args: list
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._sync_db.runOperation(query, *args)
+
+ def _runQuery(self, query, *args):
+ """
+ Run a query on the sync db.
+
+ :param query: The query to be executed.
+ :type query: str
+ :param args: A list of query arguments.
+ :type args: list
+
+ :return: A deferred that will fire with the results of the database
+ query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._sync_db.runQuery(query, *args)
+
def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
"""
@@ -119,7 +149,50 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
TABLE_NAME = "docs_tosync"
FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
- def encrypt_doc(self, doc, workers=True):
+ ENCRYPT_LOOP_PERIOD = 0.5
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initialize the sync encrypter pool.
+ """
+ SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+
+ self._stopped = False
+ self._sync_queue = multiprocessing.Queue()
+
+ # start the encryption loop
+ self._deferred_loop = deferToThread(self._encrypt_docs_loop)
+ self._deferred_loop.addCallback(
+ lambda _: logger.debug("Finished encrypter thread."))
+
+ def enqueue_doc_for_encryption(self, doc):
+ """
+ Enqueue a document for encryption.
+
+ :param doc: The document to be encrypted.
+ :type doc: SoledadDocument
+ """
+ try:
+ self.sync_queue.put_nowait(doc)
+ except multiprocessing.Queue.Full:
+ # do not asynchronously encrypt this file if the queue is full
+ pass
+
+ def _encrypt_docs_loop(self):
+ """
+ Process the syncing queue and send the documents there to be encrypted
+ in the sync db. They will be read by the SoledadSyncTarget during the
+ sync_exchange.
+ """
+ logger.debug("Starting encrypter thread.")
+ while not self._stopped:
+ try:
+ doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD)
+ self._encrypt_doc(doc)
+ except multiprocessing.Queue.Empty:
+ pass
+
+ def _encrypt_doc(self, doc, workers=True):
"""
Symmetrically encrypt a document.
@@ -136,20 +209,20 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
secret = self._crypto.secret
args = doc.doc_id, doc.rev, docstr, key, secret
- try:
- if workers:
- res = self._pool.apply_async(
- encrypt_doc_task, args,
- callback=self.encrypt_doc_cb)
- else:
- # encrypt inline
+ if workers:
+ # encrypt asynchronously
+ self._pool.apply_async(
+ encrypt_doc_task, args,
+ callback=self._encrypt_doc_cb)
+ else:
+ # encrypt inline
+ try:
res = encrypt_doc_task(*args)
- self.encrypt_doc_cb(res)
+ self._encrypt_doc_cb(res)
+ except Exception as exc:
+ logger.exception(exc)
- except Exception as exc:
- logger.exception(exc)
-
- def encrypt_doc_cb(self, result):
+ def _encrypt_doc_cb(self, result):
"""
Insert results of encryption routine into the local sync database.
@@ -158,9 +231,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
:type result: tuple(str, str, str)
"""
doc_id, doc_rev, content = result
- return self.insert_encrypted_local_doc(doc_id, doc_rev, content)
+ return self._insert_encrypted_local_doc(doc_id, doc_rev, content)
- def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
+ def _insert_encrypted_local_doc(self, doc_id, doc_rev, content):
"""
Insert the contents of the encrypted doc into the local sync
database.
@@ -174,7 +247,58 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
"""
query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \
% (self.TABLE_NAME,)
- return self._sync_db.runOperation(query, (doc_id, doc_rev, content))
+ return self._runOperation(query, (doc_id, doc_rev, content))
+
+ @defer.inlineCallbacks
+ def get_encrypted_doc(self, doc_id, doc_rev):
+ """
+ Get an encrypted document from the sync db.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+
+ :return: A deferred that will fire with the encrypted content of the
+ document or None if the document was not found in the sync
+ db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ logger.debug("Trying to get encrypted doc from sync db: %s" % doc_id)
+ query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \
+ % self.TABLE_NAME
+ result = yield self._runQuery(query, (doc_id, doc_rev))
+ if result:
+ val = result.pop()
+ defer.returnValue(val[0])
+ defer.returnValue(None)
+
+ def delete_encrypted_doc(self, doc_id, doc_rev):
+ """
+ Delete an encrypted document from the sync db.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM %s WHERE doc_id=? and rev=?" \
+ % self.TABLE_NAME
+ self._runOperation(query, (doc_id, doc_rev))
+
+ def close(self):
+ """
+ Close the encrypter pool.
+ """
+ self._stopped = True
+ self._sync_queue.close()
+ q = self._sync_queue
+ del q
+ self._sync_queue = None
def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
@@ -275,9 +399,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
d.addCallback(lambda _: self._empty_db.set())
# start the decryption loop
- self._deferred_loop = deferToThread(self._decrypt_and_process_docs)
+ self._deferred_loop = deferToThread(
+ self._decrypt_and_process_docs_loop)
self._deferred_loop.addCallback(
- lambda _: logger.debug("Finished decryptor thread."))
+ lambda _: logger.debug("Finished decrypter thread."))
def set_docs_to_process(self, docs_to_process):
"""
@@ -317,7 +442,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
docstr = json.dumps(content)
query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
% self.TABLE_NAME
- return self._sync_db.runOperation(
+ return self._runOperation(
query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx))
def insert_received_doc(
@@ -348,7 +473,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
content = json.dumps(content)
query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
% self.TABLE_NAME
- return self._sync_db.runOperation(
+ return self._runOperation(
query, (doc_id, doc_rev, content, gen, trans_id, 0, idx))
def _delete_received_doc(self, doc_id):
@@ -364,7 +489,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
query = "DELETE FROM '%s' WHERE doc_id=?" \
% self.TABLE_NAME
- return self._sync_db.runOperation(query, (doc_id,))
+ return self._runOperation(query, (doc_id,))
def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx,
workers=True):
@@ -474,7 +599,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if encrypted is not None:
query += " WHERE encrypted = %d" % int(encrypted)
query += " ORDER BY %s %s" % (order_by, order)
- return self._sync_db.runQuery(query)
+ return self._runQuery(query)
@defer.inlineCallbacks
def _get_insertable_docs(self):
@@ -612,7 +737,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:rtype: twisted.internet.defer.Deferred
"""
query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
- return self._sync_db.runOperation(query)
+ return self._runOperation(query)
def _raise_if_async_fails(self):
"""
@@ -627,7 +752,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# re-raise the exception raised by the remote call
res.get()
- def _decrypt_and_process_docs(self):
+ def _decrypt_and_process_docs_loop(self):
"""
Decrypt the documents received from remote replica and insert them
into the local one.
@@ -668,6 +793,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
Wait for the decrypt-and-process loop to finish.
"""
+ logger.debug("Waiting for asynchronous decryption of incoming documents...")
self._finished.wait()
+ logger.debug("Asynchronous decryption of incoming documents finished.")
if self._exception:
raise self._exception
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 39d5dd0e..16241621 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -42,7 +42,6 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases
handled by Soledad should be created by SQLCipher >= 2.0.
"""
import logging
-import multiprocessing
import os
import threading
import json
@@ -286,10 +285,9 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:rtype: str
"""
doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc)
-
- # TODO XXX move to API XXX
if self.defer_encryption:
- self.sync_queue.put_nowait(doc)
+ # TODO move to api?
+ self._sync_enc_pool.enqueue_doc_for_encryption(doc)
return doc_rev
#
@@ -429,13 +427,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db'
"""
- A dictionary that hold locks which avoid multiple sync attempts from the
- same database replica.
- """
- # XXX We do not need the lock here now. Remove.
- encrypting_lock = threading.Lock()
-
- """
Period or recurrence of the Looping Call that will do the encryption to the
syncdb (in seconds).
"""
@@ -458,7 +449,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self._sync_db_key = opts.sync_db_key
self._sync_db = None
self._sync_enc_pool = None
- self.sync_queue = None
# we store syncers in a dictionary indexed by the target URL. We also
# store a hash of the auth info in case auth info expires and we need
@@ -468,7 +458,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# self._syncers = {'<url>': ('<auth_hash>', syncer), ...}
self._syncers = {}
- self.sync_queue = multiprocessing.Queue()
self.running = False
self._sync_threadpool = None
@@ -486,24 +475,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self._initialize_sync_db(opts)
if defer_encryption:
-
# initialize syncing queue encryption pool
self._sync_enc_pool = encdecpool.SyncEncrypterPool(
self._crypto, self._sync_db)
- # -----------------------------------------------------------------
- # From the documentation: If f returns a deferred, rescheduling
- # will not take place until the deferred has fired. The result
- # value is ignored.
-
- # TODO use this to avoid multiple sync attempts if the sync has not
- # finished!
- # -----------------------------------------------------------------
-
- # XXX this was called sync_watcher --- trace any remnants
- self._sync_loop = LoopingCall(self._encrypt_syncing_docs)
- self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD)
-
self.shutdownID = None
@property
@@ -703,7 +678,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self._replica_uid,
creds=creds,
crypto=self._crypto,
- sync_db=self._sync_db))
+ sync_db=self._sync_db,
+ sync_enc_pool=self._sync_enc_pool))
self._syncers[url] = (h, syncer)
# in order to reuse the same synchronizer multiple times we have to
# reset its state (i.e. the number of documents received from target
@@ -715,33 +691,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# Symmetric encryption of syncing docs
#
- def _encrypt_syncing_docs(self):
- """
- Process the syncing queue and send the documents there
- to be encrypted in the sync db. They will be read by the
- SoledadSyncTarget during the sync_exchange.
-
- Called periodically from the LoopingCall self._sync_loop.
- """
- # TODO should return a deferred that would firewhen the encryption is
- # done. See note on __init__
-
- lock = self.encrypting_lock
- # optional wait flag used to avoid blocking
- if not lock.acquire(False):
- return
- else:
- queue = self.sync_queue
- try:
- while not queue.empty():
- doc = queue.get_nowait()
- self._sync_enc_pool.encrypt_doc(doc)
-
- except Exception as exc:
- logger.error("Error while encrypting docs to sync")
- logger.exception(exc)
- finally:
- lock.release()
def get_generation(self):
# FIXME
@@ -779,11 +728,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
if self._sync_db is not None:
self._sync_db.close()
self._sync_db = None
- # close the sync queue
- if self.sync_queue is not None:
- self.sync_queue.close()
- del self.sync_queue
- self.sync_queue = None
class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index d3f106da..d4ca4258 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -195,10 +195,6 @@ class SoledadSynchronizer(Synchronizer):
"my_gen": my_gen
}
self._syncing_info = info
- if defer_decryption and not sync_target.has_syncdb():
- logger.debug("Sync target has no valid sync db, "
- "aborting defer_decryption")
- defer_decryption = False
self.complete_sync()
except Exception as e:
logger.error("Soledad sync error: %s" % str(e))
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index f2415218..667aab15 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -28,6 +28,7 @@ import threading
from collections import defaultdict
from time import sleep
from uuid import uuid4
+from functools import partial
import simplejson as json
@@ -38,12 +39,12 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
from zope.proxy import ProxyBase
from zope.proxy import setProxiedObject
+from twisted.internet import defer
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
from leap.soledad.client.crypto import is_symmetrically_encrypted
from leap.soledad.client.crypto import encrypt_doc, decrypt_doc
-from leap.soledad.client.encdecpool import SyncEncrypterPool
from leap.soledad.client.encdecpool import SyncDecrypterPool
from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
@@ -111,6 +112,7 @@ class DocumentSyncerThread(threading.Thread):
self._exception = None
self._result = None
self._success = False
+ self.started = threading.Event()
# a lock so we can signal when we're finished
self._request_lock = threading.Lock()
self._request_lock.acquire()
@@ -128,6 +130,8 @@ class DocumentSyncerThread(threading.Thread):
finish before actually performing the request. It also traps any
exception and register any failure with the request.
"""
+ self.started.set()
+
with self._stop_lock:
if self._stopped is None:
self._stopped = False
@@ -756,7 +760,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
#
def __init__(self, url, source_replica_uid=None, creds=None, crypto=None,
- sync_db=None):
+ sync_db=None, sync_enc_pool=None):
"""
Initialize the SoledadSyncTarget.
@@ -787,8 +791,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self.source_replica_uid = source_replica_uid
self._syncer_pool = None
- # deferred decryption attributes
+ # asynchronous encryption/decryption attributes
self._sync_db = sync_db
+ self._sync_enc_pool = sync_enc_pool
self._decryption_callback = None
self._sync_decr_pool = None
@@ -796,7 +801,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
Set up the SyncDecrypterPool for deferred decryption.
"""
- if self._sync_decr_pool is None:
+ if self._sync_decr_pool is None and self._sync_db is not None:
# initialize syncing queue decryption pool
self._sync_decr_pool = SyncDecrypterPool(
self._crypto,
@@ -1018,7 +1023,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
t.join()
if t.success:
number_of_changes, _, _ = t.result
- if defer_decryption and number_of_changes:
+ if defer_decryption:
self._sync_decr_pool.set_docs_to_process(
number_of_changes)
else:
@@ -1060,6 +1065,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
return new_generation, new_transaction_id
+ @property
+ def _defer_encryption(self):
+ return self._sync_enc_pool is not None
+
+ @property
+ def _defer_decryption(self):
+ return self._sync_decr_pool is not None
+
def sync_exchange(self, docs_by_generations,
source_replica_uid, last_known_generation,
last_known_trans_id, return_doc_cb,
@@ -1126,17 +1139,19 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_gen = last_known_generation
cur_target_trans_id = last_known_trans_id
- # send docs
+ # -------------------------------------------------------------------
+ # start of send documents to target
+ # -------------------------------------------------------------------
msg = "%d/%d" % (0, len(docs_by_generations))
signal(SOLEDAD_SYNC_SEND_STATUS, msg)
logger.debug("Soledad sync send status: %s" % msg)
- defer_encryption = self._sync_db is not None
self._syncer_pool = DocumentSyncerPool(
self._raw_url, self._raw_creds, url, headers, ensure_callback,
self.stop_syncer)
threads = []
last_callback_lock = None
+
sent = 0
total = len(docs_by_generations)
@@ -1156,66 +1171,78 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# -------------------------------------------------------------
# symmetric encryption of document's contents
# -------------------------------------------------------------
- doc_json = doc.get_json()
- if not doc.is_tombstone():
- if not defer_encryption:
- # fallback case, for tests
- doc_json = encrypt_doc(self._crypto, doc)
- else:
- try:
- doc_json = self.get_encrypted_doc_from_db(
- doc.doc_id, doc.rev)
- except Exception as exc:
- logger.error("Error while getting "
- "encrypted doc from db")
- logger.exception(exc)
- continue
+
+ # the following var will hold a deferred because we may try to
+ # fetch the encrypted document from the sync db
+ d = None
+
+ if doc.is_tombstone():
+ d = defer.succeed(None)
+ elif not self._defer_encryption:
+ # fallback case, for tests
+ d = defer.succeed(encrypt_doc(self._crypto, doc))
+ else:
+
+ def _maybe_encrypt_doc_inline(doc_json):
if doc_json is None:
- # Not marked as tombstone, but we got nothing
- # from the sync db. As it is not encrypted yet, we
- # force inline encryption.
+ # the document is not marked as tombstone, but we got
+ # nothing from the sync db. As it is not encrypted
+ # yet, we force inline encryption.
# TODO: implement a queue to deal with these cases.
- doc_json = encrypt_doc(self._crypto, doc)
+ return encrypt_doc(self._crypto, doc)
+ return doc_json
+
+ d = self.get_encrypted_doc_from_db(doc.doc_id, doc.rev)
+ d.addCallback(_maybe_encrypt_doc_inline)
# -------------------------------------------------------------
# end of symmetric encryption
# -------------------------------------------------------------
+
t = self._syncer_pool.new_syncer_thread(
sent + 1, total, last_request_lock=last_request_lock,
last_callback_lock=last_callback_lock)
- # bail out if any thread failed
+ # bail out if creation of any thread failed
if t is None:
self.stop(fail=True)
break
- # set the request method
- t.doc_syncer.set_request_method(
- 'put', sync_id, cur_target_gen, cur_target_trans_id,
- id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
- trans_id=trans_id, number_of_docs=number_of_docs,
- doc_idx=sent + 1)
- # set the success calback
+ # the following callback will be called when the document's
+ # encrypted content is available, either because it was found on
+ # the sync db or because it has been encrypted inline.
- def _success_callback(idx, total, response):
- _success_msg = "Soledad sync send status: %d/%d" \
- % (idx, total)
- signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg)
- logger.debug(_success_msg)
+ def _configure_and_start_thread(t, doc_json):
+ # set the request method
+ t.doc_syncer.set_request_method(
+ 'put', sync_id, cur_target_gen, cur_target_trans_id,
+ id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
+ trans_id=trans_id, number_of_docs=number_of_docs,
+ doc_idx=sent + 1)
+ # set the success calback
- t.doc_syncer.set_success_callback(_success_callback)
+ def _success_callback(idx, total, response):
+ _success_msg = "Soledad sync send status: %d/%d" \
+ % (idx, total)
+ signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg)
+ logger.debug(_success_msg)
- # set the failure callback
- def _failure_callback(idx, total, exception):
- _failure_msg = "Soledad sync: error while sending document " \
- "%d/%d: %s" % (idx, total, exception)
- logger.warning("%s" % _failure_msg)
- logger.warning("Soledad sync: failing gracefully, will "
- "recover on next sync.")
+ t.doc_syncer.set_success_callback(_success_callback)
- t.doc_syncer.set_failure_callback(_failure_callback)
+ # set the failure callback
+ def _failure_callback(idx, total, exception):
+ _failure_msg = "Soledad sync: error while sending document " \
+ "%d/%d: %s" % (idx, total, exception)
+ logger.warning("%s" % _failure_msg)
+ logger.warning("Soledad sync: failing gracefully, will "
+ "recover on next sync.")
+
+ t.doc_syncer.set_failure_callback(_failure_callback)
+
+ # save thread and append
+ t.start()
+
+ d.addCallback(partial(_configure_and_start_thread, t))
- # save thread and append
- t.start()
threads.append((t, doc))
# update lock references so they can be used in next call to
@@ -1230,6 +1257,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
while threads:
# check if there are failures
t, doc = threads.pop(0)
+ t.started.wait()
t.join()
if t.success:
synced.append((doc.doc_id, doc.rev))
@@ -1238,8 +1266,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
raise t.exception
# delete documents from the sync database
- if defer_encryption:
- self.delete_encrypted_docs_from_db(synced)
+ if self._defer_encryption:
+ self._delete_encrypted_docs_from_db(synced)
# get target gen and trans_id after docs
gen_after_send = None
@@ -1248,16 +1276,23 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
response_dict = json.loads(last_successful_thread.response[0])[0]
gen_after_send = response_dict['new_generation']
trans_id_after_send = response_dict['new_transaction_id']
-
- # get docs from target
- if self._sync_db is None:
- defer_decryption = False
+ # -------------------------------------------------------------------
+ # end of send documents to target
+ # -------------------------------------------------------------------
+
+ # -------------------------------------------------------------------
+ # start of fetch documents from target
+ # -------------------------------------------------------------------
+ defer_decryption = defer_decryption and self._defer_decryption
if self.stopped is False:
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
last_known_generation, last_known_trans_id, headers,
return_doc_cb, ensure_callback, sync_id,
defer_decryption=defer_decryption)
+ # -------------------------------------------------------------------
+ # end of fetch documents from target
+ # -------------------------------------------------------------------
self._syncer_pool.cleanup()
@@ -1308,6 +1343,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
with self._stop_lock:
return self._stopped is True
+ #
+ # Symmetric encryption of syncing docs
+ #
+
def get_encrypted_doc_from_db(self, doc_id, doc_rev):
"""
Retrieve encrypted document from the database of encrypted docs for
@@ -1318,33 +1357,31 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:param doc_rev: The document revision
:type doc_rev: str
+
+ :return: A deferred which is fired with the document's encrypted
+ content or None if the document was not found on the sync db.
+ :rtype: twisted.internet.defer.Deferred
"""
- encr = SyncEncrypterPool
- sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
- encr.TABLE_NAME,))
- res = self._fetchall(sql, (doc_id, doc_rev))
- if res:
- val = res.pop()
- return val[0]
- else:
- # no doc found
- return None
+ logger.debug("Looking for encrypted document on sync db: %s" % doc_id)
+ return self._sync_enc_pool.get_encrypted_doc(doc_id, doc_rev)
- def delete_encrypted_docs_from_db(self, docs_ids):
+ def _delete_encrypted_docs_from_db(self, docs):
"""
Delete several encrypted documents from the database of symmetrically
encrypted docs to sync.
- :param docs_ids: an iterable with (doc_id, doc_rev) for all documents
- to be deleted.
- :type docs_ids: any iterable of tuples of str
+ :param docs: an iterable with (doc_id, doc_rev) for all documents
+ to be deleted.
+ :type docs: any iterable of tuples of str
"""
- if docs_ids:
- encr = SyncEncrypterPool
- for doc_id, doc_rev in docs_ids:
- sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % (
- encr.TABLE_NAME,))
- self._sync_db.execute(sql, (doc_id, doc_rev))
+ for doc_id, doc_rev in docs:
+ logger.debug("Removing encrypted document on sync db: %s"
+ % doc_id)
+ return self._sync_enc_pool.delete_encrypted_doc(doc_id, doc_rev)
+
+ #
+ # Symmetric decryption of syncing docs
+ #
def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
"""
@@ -1357,15 +1394,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:param gen: The generation.
:type gen: str
:param trans_id: Transacion id.
- :type gen: str
+
:param idx: The index count of the current operation.
:type idx: int
:param total: The total number of operations.
:type total: int
"""
- logger.debug(
- "Enqueueing doc for decryption: %d/%d."
- % (idx + 1, total))
+ logger.debug("Enqueueing doc for decryption: %d/%d."
+ % (idx + 1, total))
self._sync_decr_pool.insert_encrypted_received_doc(
doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1)
@@ -1384,16 +1420,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:param total: The total number of operations.
:type total: int
"""
- logger.debug(
- "Enqueueing doc, no decryption needed: %d/%d."
- % (idx + 1, total))
+ logger.debug("Enqueueing doc, no decryption needed: %d/%d."
+ % (idx + 1, total))
self._sync_decr_pool.insert_received_doc(
doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1)
- #
- # Symmetric decryption of syncing docs
- #
-
def set_decryption_callback(self, cb):
"""
Set callback to be called when the decryption finishes.
@@ -1410,11 +1441,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
return self._decryption_callback is not None
- def has_syncdb(self):
- """
- Return True if we have an initialized syncdb.
- """
- return self._sync_db is not None
+ #
+ # Authentication methods
+ #
def _sign_request(self, method, url_query, params):
"""
@@ -1442,9 +1471,3 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:type token: str
"""
TokenBasedAuth.set_token_credentials(self, uuid, token)
-
- def _fetchall(self, *args, **kwargs):
- with self._sync_db:
- c = self._sync_db.cursor()
- c.execute(*args, **kwargs)
- return c.fetchall()