summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
Diffstat (limited to 'client')
-rw-r--r--client/src/leap/soledad/client/encdecpool.py175
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py64
-rw-r--r--client/src/leap/soledad/client/sync.py4
-rw-r--r--client/src/leap/soledad/client/target.py217
4 files changed, 275 insertions, 185 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 0466ec5d..0c1f92ea 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -85,6 +85,36 @@ class SyncEncryptDecryptPool(object):
logger.debug("Terminating %s" % (self.__class__.__name__,))
self._pool.terminate()
+ def _runOperation(self, query, *args):
+ """
+ Run an operation on the sync db.
+
+ :param query: The query to be executed.
+ :type query: str
+ :param args: A list of query arguments.
+ :type args: list
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._sync_db.runOperation(query, *args)
+
+ def _runQuery(self, query, *args):
+ """
+ Run a query on the sync db.
+
+ :param query: The query to be executed.
+ :type query: str
+ :param args: A list of query arguments.
+ :type args: list
+
+ :return: A deferred that will fire with the results of the database
+ query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._sync_db.runQuery(query, *args)
+
def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
"""
@@ -119,7 +149,50 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
TABLE_NAME = "docs_tosync"
FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
- def encrypt_doc(self, doc, workers=True):
+ ENCRYPT_LOOP_PERIOD = 0.5
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initialize the sync encrypter pool.
+ """
+ SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+
+ self._stopped = False
+ self._sync_queue = multiprocessing.Queue()
+
+ # start the encryption loop
+ self._deferred_loop = deferToThread(self._encrypt_docs_loop)
+ self._deferred_loop.addCallback(
+ lambda _: logger.debug("Finished encrypter thread."))
+
+ def enqueue_doc_for_encryption(self, doc):
+ """
+ Enqueue a document for encryption.
+
+ :param doc: The document to be encrypted.
+ :type doc: SoledadDocument
+ """
+ try:
+ self.sync_queue.put_nowait(doc)
+ except multiprocessing.Queue.Full:
+ # do not asynchronously encrypt this file if the queue is full
+ pass
+
+ def _encrypt_docs_loop(self):
+ """
+ Process the syncing queue and send the documents there to be encrypted
+ in the sync db. They will be read by the SoledadSyncTarget during the
+ sync_exchange.
+ """
+ logger.debug("Starting encrypter thread.")
+ while not self._stopped:
+ try:
+ doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD)
+ self._encrypt_doc(doc)
+ except multiprocessing.Queue.Empty:
+ pass
+
+ def _encrypt_doc(self, doc, workers=True):
"""
Symmetrically encrypt a document.
@@ -136,20 +209,20 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
secret = self._crypto.secret
args = doc.doc_id, doc.rev, docstr, key, secret
- try:
- if workers:
- res = self._pool.apply_async(
- encrypt_doc_task, args,
- callback=self.encrypt_doc_cb)
- else:
- # encrypt inline
+ if workers:
+ # encrypt asynchronously
+ self._pool.apply_async(
+ encrypt_doc_task, args,
+ callback=self._encrypt_doc_cb)
+ else:
+ # encrypt inline
+ try:
res = encrypt_doc_task(*args)
- self.encrypt_doc_cb(res)
+ self._encrypt_doc_cb(res)
+ except Exception as exc:
+ logger.exception(exc)
- except Exception as exc:
- logger.exception(exc)
-
- def encrypt_doc_cb(self, result):
+ def _encrypt_doc_cb(self, result):
"""
Insert results of encryption routine into the local sync database.
@@ -158,9 +231,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
:type result: tuple(str, str, str)
"""
doc_id, doc_rev, content = result
- return self.insert_encrypted_local_doc(doc_id, doc_rev, content)
+ return self._insert_encrypted_local_doc(doc_id, doc_rev, content)
- def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
+ def _insert_encrypted_local_doc(self, doc_id, doc_rev, content):
"""
Insert the contents of the encrypted doc into the local sync
database.
@@ -174,7 +247,58 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
"""
query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \
% (self.TABLE_NAME,)
- return self._sync_db.runOperation(query, (doc_id, doc_rev, content))
+ return self._runOperation(query, (doc_id, doc_rev, content))
+
+ @defer.inlineCallbacks
+ def get_encrypted_doc(self, doc_id, doc_rev):
+ """
+ Get an encrypted document from the sync db.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+
+ :return: A deferred that will fire with the encrypted content of the
+ document or None if the document was not found in the sync
+ db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ logger.debug("Trying to get encrypted doc from sync db: %s" % doc_id)
+ query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \
+ % self.TABLE_NAME
+ result = yield self._runQuery(query, (doc_id, doc_rev))
+ if result:
+ val = result.pop()
+ defer.returnValue(val[0])
+ defer.returnValue(None)
+
+ def delete_encrypted_doc(self, doc_id, doc_rev):
+ """
+ Delete an encrypted document from the sync db.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM %s WHERE doc_id=? and rev=?" \
+ % self.TABLE_NAME
+ self._runOperation(query, (doc_id, doc_rev))
+
+ def close(self):
+ """
+ Close the encrypter pool.
+ """
+ self._stopped = True
+ self._sync_queue.close()
+ q = self._sync_queue
+ del q
+ self._sync_queue = None
def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
@@ -275,9 +399,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
d.addCallback(lambda _: self._empty_db.set())
# start the decryption loop
- self._deferred_loop = deferToThread(self._decrypt_and_process_docs)
+ self._deferred_loop = deferToThread(
+ self._decrypt_and_process_docs_loop)
self._deferred_loop.addCallback(
- lambda _: logger.debug("Finished decryptor thread."))
+ lambda _: logger.debug("Finished decrypter thread."))
def set_docs_to_process(self, docs_to_process):
"""
@@ -317,7 +442,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
docstr = json.dumps(content)
query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
% self.TABLE_NAME
- return self._sync_db.runOperation(
+ return self._runOperation(
query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx))
def insert_received_doc(
@@ -348,7 +473,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
content = json.dumps(content)
query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
% self.TABLE_NAME
- return self._sync_db.runOperation(
+ return self._runOperation(
query, (doc_id, doc_rev, content, gen, trans_id, 0, idx))
def _delete_received_doc(self, doc_id):
@@ -364,7 +489,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
query = "DELETE FROM '%s' WHERE doc_id=?" \
% self.TABLE_NAME
- return self._sync_db.runOperation(query, (doc_id,))
+ return self._runOperation(query, (doc_id,))
def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx,
workers=True):
@@ -474,7 +599,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if encrypted is not None:
query += " WHERE encrypted = %d" % int(encrypted)
query += " ORDER BY %s %s" % (order_by, order)
- return self._sync_db.runQuery(query)
+ return self._runQuery(query)
@defer.inlineCallbacks
def _get_insertable_docs(self):
@@ -612,7 +737,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:rtype: twisted.internet.defer.Deferred
"""
query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
- return self._sync_db.runOperation(query)
+ return self._runOperation(query)
def _raise_if_async_fails(self):
"""
@@ -627,7 +752,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# re-raise the exception raised by the remote call
res.get()
- def _decrypt_and_process_docs(self):
+ def _decrypt_and_process_docs_loop(self):
"""
Decrypt the documents received from remote replica and insert them
into the local one.
@@ -668,6 +793,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
Wait for the decrypt-and-process loop to finish.
"""
+ logger.debug("Waiting for asynchronous decryption of incoming documents...")
self._finished.wait()
+ logger.debug("Asynchronous decryption of incoming documents finished.")
if self._exception:
raise self._exception
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 39d5dd0e..16241621 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -42,7 +42,6 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases
handled by Soledad should be created by SQLCipher >= 2.0.
"""
import logging
-import multiprocessing
import os
import threading
import json
@@ -286,10 +285,9 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:rtype: str
"""
doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc)
-
- # TODO XXX move to API XXX
if self.defer_encryption:
- self.sync_queue.put_nowait(doc)
+ # TODO move to api?
+ self._sync_enc_pool.enqueue_doc_for_encryption(doc)
return doc_rev
#
@@ -429,13 +427,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db'
"""
- A dictionary that hold locks which avoid multiple sync attempts from the
- same database replica.
- """
- # XXX We do not need the lock here now. Remove.
- encrypting_lock = threading.Lock()
-
- """
Period or recurrence of the Looping Call that will do the encryption to the
syncdb (in seconds).
"""
@@ -458,7 +449,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self._sync_db_key = opts.sync_db_key
self._sync_db = None
self._sync_enc_pool = None
- self.sync_queue = None
# we store syncers in a dictionary indexed by the target URL. We also
# store a hash of the auth info in case auth info expires and we need
@@ -468,7 +458,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# self._syncers = {'<url>': ('<auth_hash>', syncer), ...}
self._syncers = {}
- self.sync_queue = multiprocessing.Queue()
self.running = False
self._sync_threadpool = None
@@ -486,24 +475,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self._initialize_sync_db(opts)
if defer_encryption:
-
# initialize syncing queue encryption pool
self._sync_enc_pool = encdecpool.SyncEncrypterPool(
self._crypto, self._sync_db)
- # -----------------------------------------------------------------
- # From the documentation: If f returns a deferred, rescheduling
- # will not take place until the deferred has fired. The result
- # value is ignored.
-
- # TODO use this to avoid multiple sync attempts if the sync has not
- # finished!
- # -----------------------------------------------------------------
-
- # XXX this was called sync_watcher --- trace any remnants
- self._sync_loop = LoopingCall(self._encrypt_syncing_docs)
- self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD)
-
self.shutdownID = None
@property
@@ -703,7 +678,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self._replica_uid,
creds=creds,
crypto=self._crypto,
- sync_db=self._sync_db))
+ sync_db=self._sync_db,
+ sync_enc_pool=self._sync_enc_pool))
self._syncers[url] = (h, syncer)
# in order to reuse the same synchronizer multiple times we have to
# reset its state (i.e. the number of documents received from target
@@ -715,33 +691,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# Symmetric encryption of syncing docs
#
- def _encrypt_syncing_docs(self):
- """
- Process the syncing queue and send the documents there
- to be encrypted in the sync db. They will be read by the
- SoledadSyncTarget during the sync_exchange.
-
- Called periodically from the LoopingCall self._sync_loop.
- """
- # TODO should return a deferred that would firewhen the encryption is
- # done. See note on __init__
-
- lock = self.encrypting_lock
- # optional wait flag used to avoid blocking
- if not lock.acquire(False):
- return
- else:
- queue = self.sync_queue
- try:
- while not queue.empty():
- doc = queue.get_nowait()
- self._sync_enc_pool.encrypt_doc(doc)
-
- except Exception as exc:
- logger.error("Error while encrypting docs to sync")
- logger.exception(exc)
- finally:
- lock.release()
def get_generation(self):
# FIXME
@@ -779,11 +728,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
if self._sync_db is not None:
self._sync_db.close()
self._sync_db = None
- # close the sync queue
- if self.sync_queue is not None:
- self.sync_queue.close()
- del self.sync_queue
- self.sync_queue = None
class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index d3f106da..d4ca4258 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -195,10 +195,6 @@ class SoledadSynchronizer(Synchronizer):
"my_gen": my_gen
}
self._syncing_info = info
- if defer_decryption and not sync_target.has_syncdb():
- logger.debug("Sync target has no valid sync db, "
- "aborting defer_decryption")
- defer_decryption = False
self.complete_sync()
except Exception as e:
logger.error("Soledad sync error: %s" % str(e))
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index f2415218..667aab15 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -28,6 +28,7 @@ import threading
from collections import defaultdict
from time import sleep
from uuid import uuid4
+from functools import partial
import simplejson as json
@@ -38,12 +39,12 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
from zope.proxy import ProxyBase
from zope.proxy import setProxiedObject
+from twisted.internet import defer
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
from leap.soledad.client.crypto import is_symmetrically_encrypted
from leap.soledad.client.crypto import encrypt_doc, decrypt_doc
-from leap.soledad.client.encdecpool import SyncEncrypterPool
from leap.soledad.client.encdecpool import SyncDecrypterPool
from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
@@ -111,6 +112,7 @@ class DocumentSyncerThread(threading.Thread):
self._exception = None
self._result = None
self._success = False
+ self.started = threading.Event()
# a lock so we can signal when we're finished
self._request_lock = threading.Lock()
self._request_lock.acquire()
@@ -128,6 +130,8 @@ class DocumentSyncerThread(threading.Thread):
finish before actually performing the request. It also traps any
exception and register any failure with the request.
"""
+ self.started.set()
+
with self._stop_lock:
if self._stopped is None:
self._stopped = False
@@ -756,7 +760,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
#
def __init__(self, url, source_replica_uid=None, creds=None, crypto=None,
- sync_db=None):
+ sync_db=None, sync_enc_pool=None):
"""
Initialize the SoledadSyncTarget.
@@ -787,8 +791,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self.source_replica_uid = source_replica_uid
self._syncer_pool = None
- # deferred decryption attributes
+ # asynchronous encryption/decryption attributes
self._sync_db = sync_db
+ self._sync_enc_pool = sync_enc_pool
self._decryption_callback = None
self._sync_decr_pool = None
@@ -796,7 +801,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
Set up the SyncDecrypterPool for deferred decryption.
"""
- if self._sync_decr_pool is None:
+ if self._sync_decr_pool is None and self._sync_db is not None:
# initialize syncing queue decryption pool
self._sync_decr_pool = SyncDecrypterPool(
self._crypto,
@@ -1018,7 +1023,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
t.join()
if t.success:
number_of_changes, _, _ = t.result
- if defer_decryption and number_of_changes:
+ if defer_decryption:
self._sync_decr_pool.set_docs_to_process(
number_of_changes)
else:
@@ -1060,6 +1065,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
return new_generation, new_transaction_id
+ @property
+ def _defer_encryption(self):
+ return self._sync_enc_pool is not None
+
+ @property
+ def _defer_decryption(self):
+ return self._sync_decr_pool is not None
+
def sync_exchange(self, docs_by_generations,
source_replica_uid, last_known_generation,
last_known_trans_id, return_doc_cb,
@@ -1126,17 +1139,19 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_gen = last_known_generation
cur_target_trans_id = last_known_trans_id
- # send docs
+ # -------------------------------------------------------------------
+ # start of send documents to target
+ # -------------------------------------------------------------------
msg = "%d/%d" % (0, len(docs_by_generations))
signal(SOLEDAD_SYNC_SEND_STATUS, msg)
logger.debug("Soledad sync send status: %s" % msg)
- defer_encryption = self._sync_db is not None
self._syncer_pool = DocumentSyncerPool(
self._raw_url, self._raw_creds, url, headers, ensure_callback,
self.stop_syncer)
threads = []
last_callback_lock = None
+
sent = 0
total = len(docs_by_generations)
@@ -1156,66 +1171,78 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# -------------------------------------------------------------
# symmetric encryption of document's contents
# -------------------------------------------------------------
- doc_json = doc.get_json()
- if not doc.is_tombstone():
- if not defer_encryption:
- # fallback case, for tests
- doc_json = encrypt_doc(self._crypto, doc)
- else:
- try:
- doc_json = self.get_encrypted_doc_from_db(
- doc.doc_id, doc.rev)
- except Exception as exc:
- logger.error("Error while getting "
- "encrypted doc from db")
- logger.exception(exc)
- continue
+
+ # the following var will hold a deferred because we may try to
+ # fetch the encrypted document from the sync db
+ d = None
+
+ if doc.is_tombstone():
+ d = defer.succeed(None)
+ elif not self._defer_encryption:
+ # fallback case, for tests
+ d = defer.succeed(encrypt_doc(self._crypto, doc))
+ else:
+
+ def _maybe_encrypt_doc_inline(doc_json):
if doc_json is None:
- # Not marked as tombstone, but we got nothing
- # from the sync db. As it is not encrypted yet, we
- # force inline encryption.
+ # the document is not marked as tombstone, but we got
+ # nothing from the sync db. As it is not encrypted
+ # yet, we force inline encryption.
# TODO: implement a queue to deal with these cases.
- doc_json = encrypt_doc(self._crypto, doc)
+ return encrypt_doc(self._crypto, doc)
+ return doc_json
+
+ d = self.get_encrypted_doc_from_db(doc.doc_id, doc.rev)
+ d.addCallback(_maybe_encrypt_doc_inline)
# -------------------------------------------------------------
# end of symmetric encryption
# -------------------------------------------------------------
+
t = self._syncer_pool.new_syncer_thread(
sent + 1, total, last_request_lock=last_request_lock,
last_callback_lock=last_callback_lock)
- # bail out if any thread failed
+ # bail out if creation of any thread failed
if t is None:
self.stop(fail=True)
break
- # set the request method
- t.doc_syncer.set_request_method(
- 'put', sync_id, cur_target_gen, cur_target_trans_id,
- id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
- trans_id=trans_id, number_of_docs=number_of_docs,
- doc_idx=sent + 1)
- # set the success calback
+ # the following callback will be called when the document's
+ # encrypted content is available, either because it was found on
+ # the sync db or because it has been encrypted inline.
- def _success_callback(idx, total, response):
- _success_msg = "Soledad sync send status: %d/%d" \
- % (idx, total)
- signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg)
- logger.debug(_success_msg)
+ def _configure_and_start_thread(t, doc_json):
+ # set the request method
+ t.doc_syncer.set_request_method(
+ 'put', sync_id, cur_target_gen, cur_target_trans_id,
+ id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
+ trans_id=trans_id, number_of_docs=number_of_docs,
+ doc_idx=sent + 1)
+ # set the success calback
- t.doc_syncer.set_success_callback(_success_callback)
+ def _success_callback(idx, total, response):
+ _success_msg = "Soledad sync send status: %d/%d" \
+ % (idx, total)
+ signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg)
+ logger.debug(_success_msg)
- # set the failure callback
- def _failure_callback(idx, total, exception):
- _failure_msg = "Soledad sync: error while sending document " \
- "%d/%d: %s" % (idx, total, exception)
- logger.warning("%s" % _failure_msg)
- logger.warning("Soledad sync: failing gracefully, will "
- "recover on next sync.")
+ t.doc_syncer.set_success_callback(_success_callback)
- t.doc_syncer.set_failure_callback(_failure_callback)
+ # set the failure callback
+ def _failure_callback(idx, total, exception):
+ _failure_msg = "Soledad sync: error while sending document " \
+ "%d/%d: %s" % (idx, total, exception)
+ logger.warning("%s" % _failure_msg)
+ logger.warning("Soledad sync: failing gracefully, will "
+ "recover on next sync.")
+
+ t.doc_syncer.set_failure_callback(_failure_callback)
+
+ # save thread and append
+ t.start()
+
+ d.addCallback(partial(_configure_and_start_thread, t))
- # save thread and append
- t.start()
threads.append((t, doc))
# update lock references so they can be used in next call to
@@ -1230,6 +1257,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
while threads:
# check if there are failures
t, doc = threads.pop(0)
+ t.started.wait()
t.join()
if t.success:
synced.append((doc.doc_id, doc.rev))
@@ -1238,8 +1266,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
raise t.exception
# delete documents from the sync database
- if defer_encryption:
- self.delete_encrypted_docs_from_db(synced)
+ if self._defer_encryption:
+ self._delete_encrypted_docs_from_db(synced)
# get target gen and trans_id after docs
gen_after_send = None
@@ -1248,16 +1276,23 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
response_dict = json.loads(last_successful_thread.response[0])[0]
gen_after_send = response_dict['new_generation']
trans_id_after_send = response_dict['new_transaction_id']
-
- # get docs from target
- if self._sync_db is None:
- defer_decryption = False
+ # -------------------------------------------------------------------
+ # end of send documents to target
+ # -------------------------------------------------------------------
+
+ # -------------------------------------------------------------------
+ # start of fetch documents from target
+ # -------------------------------------------------------------------
+ defer_decryption = defer_decryption and self._defer_decryption
if self.stopped is False:
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
last_known_generation, last_known_trans_id, headers,
return_doc_cb, ensure_callback, sync_id,
defer_decryption=defer_decryption)
+ # -------------------------------------------------------------------
+ # end of fetch documents from target
+ # -------------------------------------------------------------------
self._syncer_pool.cleanup()
@@ -1308,6 +1343,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
with self._stop_lock:
return self._stopped is True
+ #
+ # Symmetric encryption of syncing docs
+ #
+
def get_encrypted_doc_from_db(self, doc_id, doc_rev):
"""
Retrieve encrypted document from the database of encrypted docs for
@@ -1318,33 +1357,31 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:param doc_rev: The document revision
:type doc_rev: str
+
+ :return: A deferred which is fired with the document's encrypted
+ content or None if the document was not found on the sync db.
+ :rtype: twisted.internet.defer.Deferred
"""
- encr = SyncEncrypterPool
- sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
- encr.TABLE_NAME,))
- res = self._fetchall(sql, (doc_id, doc_rev))
- if res:
- val = res.pop()
- return val[0]
- else:
- # no doc found
- return None
+ logger.debug("Looking for encrypted document on sync db: %s" % doc_id)
+ return self._sync_enc_pool.get_encrypted_doc(doc_id, doc_rev)
- def delete_encrypted_docs_from_db(self, docs_ids):
+ def _delete_encrypted_docs_from_db(self, docs):
"""
Delete several encrypted documents from the database of symmetrically
encrypted docs to sync.
- :param docs_ids: an iterable with (doc_id, doc_rev) for all documents
- to be deleted.
- :type docs_ids: any iterable of tuples of str
+ :param docs: an iterable with (doc_id, doc_rev) for all documents
+ to be deleted.
+ :type docs: any iterable of tuples of str
"""
- if docs_ids:
- encr = SyncEncrypterPool
- for doc_id, doc_rev in docs_ids:
- sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % (
- encr.TABLE_NAME,))
- self._sync_db.execute(sql, (doc_id, doc_rev))
+ for doc_id, doc_rev in docs:
+ logger.debug("Removing encrypted document on sync db: %s"
+ % doc_id)
+ return self._sync_enc_pool.delete_encrypted_doc(doc_id, doc_rev)
+
+ #
+ # Symmetric decryption of syncing docs
+ #
def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
"""
@@ -1357,15 +1394,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:param gen: The generation.
:type gen: str
:param trans_id: Transacion id.
- :type gen: str
+
:param idx: The index count of the current operation.
:type idx: int
:param total: The total number of operations.
:type total: int
"""
- logger.debug(
- "Enqueueing doc for decryption: %d/%d."
- % (idx + 1, total))
+ logger.debug("Enqueueing doc for decryption: %d/%d."
+ % (idx + 1, total))
self._sync_decr_pool.insert_encrypted_received_doc(
doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1)
@@ -1384,16 +1420,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:param total: The total number of operations.
:type total: int
"""
- logger.debug(
- "Enqueueing doc, no decryption needed: %d/%d."
- % (idx + 1, total))
+ logger.debug("Enqueueing doc, no decryption needed: %d/%d."
+ % (idx + 1, total))
self._sync_decr_pool.insert_received_doc(
doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1)
- #
- # Symmetric decryption of syncing docs
- #
-
def set_decryption_callback(self, cb):
"""
Set callback to be called when the decryption finishes.
@@ -1410,11 +1441,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
return self._decryption_callback is not None
- def has_syncdb(self):
- """
- Return True if we have an initialized syncdb.
- """
- return self._sync_db is not None
+ #
+ # Authentication methods
+ #
def _sign_request(self, method, url_query, params):
"""
@@ -1442,9 +1471,3 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:type token: str
"""
TokenBasedAuth.set_token_credentials(self, uuid, token)
-
- def _fetchall(self, *args, **kwargs):
- with self._sync_db:
- c = self._sync_db.cursor()
- c.execute(*args, **kwargs)
- return c.fetchall()