summaryrefslogtreecommitdiff
path: root/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'client/src')
-rw-r--r--client/src/leap/soledad/client/adbapi.py3
-rw-r--r--client/src/leap/soledad/client/api.py27
-rw-r--r--client/src/leap/soledad/client/encdecpool.py42
-rw-r--r--client/src/leap/soledad/client/http_target.py7
-rw-r--r--client/src/leap/soledad/client/interfaces.py31
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py13
-rw-r--r--client/src/leap/soledad/client/sync.py10
7 files changed, 114 insertions, 19 deletions
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py
index 5b882bbe..bc0ab7a5 100644
--- a/client/src/leap/soledad/client/adbapi.py
+++ b/client/src/leap/soledad/client/adbapi.py
@@ -217,6 +217,9 @@ class U1DBConnectionPool(adbapi.ConnectionPool):
"""
meth = getattr(trans, meth)
return meth(*args, **kw)
+ # XXX should return a fetchall?
+
+ # XXX add _runOperation too
def _runInteraction(self, interaction, *args, **kw):
"""
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py
index 76d6acc3..6c2b3673 100644
--- a/client/src/leap/soledad/client/api.py
+++ b/client/src/leap/soledad/client/api.py
@@ -38,13 +38,16 @@ try:
import cchardet as chardet
except ImportError:
import chardet
+from itertools import chain
from StringIO import StringIO
from u1db.remote import http_client
from u1db.remote.ssl_match_hostname import match_hostname
+from twisted.plugin import getPlugins
from zope.interface import implements
from leap.common.config import get_path_prefix
+from leap.common.plugins import collect_plugins
from leap.soledad.common import SHARED_DB_NAME
from leap.soledad.common import soledad_assert
@@ -656,6 +659,20 @@ class Soledad(object):
defer_decryption=defer_decryption)
def _sync_callback(local_gen):
+ self._last_received_docs = docs = self._dbsyncer.received_docs
+
+ # Post-Sync Hooks
+ if docs:
+ iface = soledad_interfaces.ISoledadPostSyncPlugin
+ suitable_plugins = collect_plugins(iface)
+ for plugin in suitable_plugins:
+ watched = plugin.watched_doc_types
+ r = [filter(
+ lambda s: s.startswith(preffix),
+ docs) for preffix in watched]
+ filtered = list(chain(*r))
+ plugin.process_received_docs(filtered)
+
soledad_events.emit(
soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid)
return local_gen
@@ -788,10 +805,18 @@ class Soledad(object):
def raw_sqlcipher_query(self, *args, **kw):
"""
- Run a raw sqlcipher query in the local database.
+ Run a raw sqlcipher query in the local database, and return a deferred
+ that will be fired with the result.
"""
return self._dbpool.runQuery(*args, **kw)
+ def raw_sqlcipher_operation(self, *args, **kw):
+ """
+ Run a raw sqlcipher operation in the local database, and return a
+ deferred that will be fired with None.
+ """
+ return self._dbpool.runOperation(*args, **kw)
+
def _convert_to_unicode(content):
"""
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index d9a72b25..f81cd2d1 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -29,7 +29,7 @@ import logging
from twisted.internet import reactor
from twisted.internet import defer
-from twisted.internet.threads import deferToThread
+from twisted.python import log
from leap.soledad.common.document import SoledadDocument
from leap.soledad.common import soledad_assert
@@ -147,7 +147,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
TABLE_NAME = "docs_tosync"
FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
- ENCRYPT_LOOP_PERIOD = 0.5
+ ENCRYPT_LOOP_PERIOD = 2
def __init__(self, *args, **kwargs):
"""
@@ -159,9 +159,8 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
self._sync_queue = multiprocessing.Queue()
# start the encryption loop
- self._deferred_loop = deferToThread(self._encrypt_docs_loop)
- self._deferred_loop.addCallback(
- lambda _: logger.debug("Finished encrypter thread."))
+ logger.debug("Starting the encryption loop...")
+ reactor.callWhenRunning(self._maybe_encrypt_and_recurse)
def enqueue_doc_for_encryption(self, doc):
"""
@@ -171,24 +170,28 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
:type doc: SoledadDocument
"""
try:
- self.sync_queue.put_nowait(doc)
- except multiprocessing.Queue.Full:
+ self._sync_queue.put_nowait(doc)
+ except Queue.Full:
# do not asynchronously encrypt this file if the queue is full
pass
- def _encrypt_docs_loop(self):
+ def _maybe_encrypt_and_recurse(self):
"""
Process the syncing queue and send the documents there to be encrypted
in the sync db. They will be read by the SoledadSyncTarget during the
sync_exchange.
"""
- logger.debug("Starting encrypter thread.")
- while not self._stopped:
+ if not self._stopped:
try:
- doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD)
+ doc = self._sync_queue.get(False)
self._encrypt_doc(doc)
except Queue.Empty:
pass
+ reactor.callLater(
+ self.ENCRYPT_LOOP_PERIOD,
+ self._maybe_encrypt_and_recurse)
+ else:
+ logger.debug("Finished encrypter thread.")
def _encrypt_doc(self, doc):
"""
@@ -374,9 +377,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self.source_replica_uid = kwargs.pop("source_replica_uid")
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self._last_inserted_idx = 0
self._docs_to_process = None
self._processed_docs = 0
+ self._last_inserted_idx = 0
self._async_results = []
self._failure = None
@@ -386,6 +389,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# asynchronous call, so we have to somehow make sure that it is
# executed before any other call to the database, without
# blocking.
+ # XXX in mail and keymanager we have a pattern for that -- kali.
self._empty()
def _launch_decrypt_and_process(self):
@@ -402,6 +406,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
return self._failure
def _set_failure(self, failure):
+ log.err(failure)
self._failure = failure
self._finished = True
@@ -419,6 +424,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type docs_to_process: int
"""
self._docs_to_process = docs_to_process
+ self._finished = False
self._schedule_decrypt_and_process()
def insert_encrypted_received_doc(
@@ -729,7 +735,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:rtype: twisted.internet.defer.Deferred
"""
if not self.failed():
- if self._processed_docs < self._docs_to_process:
+ processed = self._processed_docs
+ pending = self._docs_to_process
+
+ if not self.has_finished() and processed < pending:
yield self._async_decrypt_received_docs()
yield self._collect_async_decryption_results()
docs = yield self._process_decrypted_docs()
@@ -737,7 +746,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# recurse
self._schedule_decrypt_and_process()
else:
- self._finished = True
+ self._mark_finished()
+
+ def _mark_finished(self):
+ self._finished = True
+ self._processed_docs = 0
+ self._last_inserted_idx = 0
def has_finished(self):
"""
diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py
index 30590ae1..ac078f39 100644
--- a/client/src/leap/soledad/client/http_target.py
+++ b/client/src/leap/soledad/client/http_target.py
@@ -75,8 +75,6 @@ class SoledadHTTPSyncTarget(SyncTarget):
:param source_replica_uid: The source replica uid which we use when
deferring decryption.
:type source_replica_uid: str
- :param url: The url of the target replica to sync with.
- :type url: str
:param creds: A dictionary containing the uuid and token.
:type creds: creds
:param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
@@ -98,7 +96,7 @@ class SoledadHTTPSyncTarget(SyncTarget):
"""
if url.endswith("/"):
url = url[:-1]
- self._url = str(url) + "/sync-from/" + source_replica_uid
+ self._url = str(url) + "/sync-from/" + str(source_replica_uid)
self.source_replica_uid = source_replica_uid
self._auth_header = None
self.set_creds(creds)
@@ -111,6 +109,9 @@ class SoledadHTTPSyncTarget(SyncTarget):
self._sync_decr_pool = None
self._http = HTTPClient(cert_file)
+ def close(self):
+ self._http.close()
+
def set_creds(self, creds):
"""
Update credentials.
diff --git a/client/src/leap/soledad/client/interfaces.py b/client/src/leap/soledad/client/interfaces.py
index 4f7b0779..14b34d24 100644
--- a/client/src/leap/soledad/client/interfaces.py
+++ b/client/src/leap/soledad/client/interfaces.py
@@ -19,6 +19,37 @@ Interfaces used by the Soledad Client.
"""
from zope.interface import Interface, Attribute
+#
+# Plugins
+#
+
+
+class ISoledadPostSyncPlugin(Interface):
+ """
+ I implement the minimal methods and attributes for a plugin that can be
+ called after a soledad synchronization has ended.
+ """
+
+ def process_received_docs(self, doc_id_list):
+ """
+ Do something with the passed list of doc_ids received after the last
+ sync.
+
+ :param doc_id_list: a list of strings for the received doc_ids
+ """
+
+ watched_doc_types = Attribute("""
+ a tuple of the watched doc types for this plugin. So far, the
+ `doc-types` convention is just the preffix of the doc_id, which is
+ basically its first character, followed by a dash. So, for instance,
+ `M-` is used for meta-docs in mail, and `F-` is used for flag-docs in
+ mail. For now there's no central register of all the doc-types
+ used.""")
+
+
+#
+# Soledad storage
+#
class ILocalStorage(Interface):
"""
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index b2025130..75d786a6 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -456,6 +456,9 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self._syncers = {}
+ # Storage for the documents received during a sync
+ self.received_docs = []
+
self.running = False
self._sync_threadpool = None
self._initialize_sync_threadpool()
@@ -587,8 +590,16 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# the following context manager blocks until the syncing lock can be
# acquired.
with self._syncer(url, creds=creds) as syncer:
+
+ def _record_received_docs(result):
+ # beware, closure. syncer is in scope.
+ self.received_docs = syncer.received_docs
+ return result
+
# XXX could mark the critical section here...
- return syncer.sync(defer_decryption=defer_decryption)
+ d = syncer.sync(defer_decryption=defer_decryption)
+ d.addCallback(_record_received_docs)
+ return d
@contextmanager
def _syncer(self, url, creds=None):
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index 53172f31..14c547cf 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -39,6 +39,7 @@ class SoledadSynchronizer(Synchronizer):
Also modified to allow for interrupting the synchronization process.
"""
+ received_docs = []
@defer.inlineCallbacks
def sync(self, defer_decryption=True):
@@ -62,6 +63,7 @@ class SoledadSynchronizer(Synchronizer):
:rtype: twisted.internet.defer.Deferred
"""
sync_target = self.sync_target
+ self.received_docs = []
# get target identifier, its current generation,
# and its last-seen database generation for this source
@@ -123,12 +125,14 @@ class SoledadSynchronizer(Synchronizer):
changed_doc_ids = [doc_id for doc_id, _, _ in changes]
docs_to_send = self.source.get_docs(
changed_doc_ids, check_for_conflicts=False, include_deleted=True)
+ ids_sent = []
docs_by_generation = []
idx = 0
for doc in docs_to_send:
_, gen, trans = changes[idx]
docs_by_generation.append((doc, gen, trans))
idx += 1
+ ids_sent.append(doc.doc_id)
# exchange documents and try to insert the returned ones with
# the target, return target synced-up-to gen.
@@ -151,6 +155,12 @@ class SoledadSynchronizer(Synchronizer):
self._syncing_info = info
yield self.complete_sync()
+ _, _, changes = self.source.whats_changed(target_my_gen)
+ changed_doc_ids = [doc_id for doc_id, _, _ in changes]
+
+ just_received = list(set(changed_doc_ids) - set(ids_sent))
+ self.received_docs = just_received
+
defer.returnValue(my_gen)
def complete_sync(self):