diff options
Diffstat (limited to 'client/src')
-rw-r--r-- | client/src/leap/soledad/client/adbapi.py | 3 | ||||
-rw-r--r-- | client/src/leap/soledad/client/api.py | 27 | ||||
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 42 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target.py | 7 | ||||
-rw-r--r-- | client/src/leap/soledad/client/interfaces.py | 31 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 13 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sync.py | 10 |
7 files changed, 114 insertions, 19 deletions
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 5b882bbe..bc0ab7a5 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -217,6 +217,9 @@ class U1DBConnectionPool(adbapi.ConnectionPool): """ meth = getattr(trans, meth) return meth(*args, **kw) + # XXX should return a fetchall? + + # XXX add _runOperation too def _runInteraction(self, interaction, *args, **kw): """ diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 76d6acc3..6c2b3673 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -38,13 +38,16 @@ try: import cchardet as chardet except ImportError: import chardet +from itertools import chain from StringIO import StringIO from u1db.remote import http_client from u1db.remote.ssl_match_hostname import match_hostname +from twisted.plugin import getPlugins from zope.interface import implements from leap.common.config import get_path_prefix +from leap.common.plugins import collect_plugins from leap.soledad.common import SHARED_DB_NAME from leap.soledad.common import soledad_assert @@ -656,6 +659,20 @@ class Soledad(object): defer_decryption=defer_decryption) def _sync_callback(local_gen): + self._last_received_docs = docs = self._dbsyncer.received_docs + + # Post-Sync Hooks + if docs: + iface = soledad_interfaces.ISoledadPostSyncPlugin + suitable_plugins = collect_plugins(iface) + for plugin in suitable_plugins: + watched = plugin.watched_doc_types + r = [filter( + lambda s: s.startswith(preffix), + docs) for preffix in watched] + filtered = list(chain(*r)) + plugin.process_received_docs(filtered) + soledad_events.emit( soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid) return local_gen @@ -788,10 +805,18 @@ class Soledad(object): def raw_sqlcipher_query(self, *args, **kw): """ - Run a raw sqlcipher query in the local database. + Run a raw sqlcipher query in the local database, and return a deferred + that will be fired with the result. """ return self._dbpool.runQuery(*args, **kw) + def raw_sqlcipher_operation(self, *args, **kw): + """ + Run a raw sqlcipher operation in the local database, and return a + deferred that will be fired with None. + """ + return self._dbpool.runOperation(*args, **kw) + def _convert_to_unicode(content): """ diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index d9a72b25..f81cd2d1 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -29,7 +29,7 @@ import logging from twisted.internet import reactor from twisted.internet import defer -from twisted.internet.threads import deferToThread +from twisted.python import log from leap.soledad.common.document import SoledadDocument from leap.soledad.common import soledad_assert @@ -147,7 +147,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): TABLE_NAME = "docs_tosync" FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" - ENCRYPT_LOOP_PERIOD = 0.5 + ENCRYPT_LOOP_PERIOD = 2 def __init__(self, *args, **kwargs): """ @@ -159,9 +159,8 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): self._sync_queue = multiprocessing.Queue() # start the encryption loop - self._deferred_loop = deferToThread(self._encrypt_docs_loop) - self._deferred_loop.addCallback( - lambda _: logger.debug("Finished encrypter thread.")) + logger.debug("Starting the encryption loop...") + reactor.callWhenRunning(self._maybe_encrypt_and_recurse) def enqueue_doc_for_encryption(self, doc): """ @@ -171,24 +170,28 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :type doc: SoledadDocument """ try: - self.sync_queue.put_nowait(doc) - except multiprocessing.Queue.Full: + self._sync_queue.put_nowait(doc) + except Queue.Full: # do not asynchronously encrypt this file if the queue is full pass - def _encrypt_docs_loop(self): + def _maybe_encrypt_and_recurse(self): """ Process the syncing queue and send the documents there to be encrypted in the sync db. They will be read by the SoledadSyncTarget during the sync_exchange. """ - logger.debug("Starting encrypter thread.") - while not self._stopped: + if not self._stopped: try: - doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD) + doc = self._sync_queue.get(False) self._encrypt_doc(doc) except Queue.Empty: pass + reactor.callLater( + self.ENCRYPT_LOOP_PERIOD, + self._maybe_encrypt_and_recurse) + else: + logger.debug("Finished encrypter thread.") def _encrypt_doc(self, doc): """ @@ -374,9 +377,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self.source_replica_uid = kwargs.pop("source_replica_uid") SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self._last_inserted_idx = 0 self._docs_to_process = None self._processed_docs = 0 + self._last_inserted_idx = 0 self._async_results = [] self._failure = None @@ -386,6 +389,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # asynchronous call, so we have to somehow make sure that it is # executed before any other call to the database, without # blocking. + # XXX in mail and keymanager we have a pattern for that -- kali. self._empty() def _launch_decrypt_and_process(self): @@ -402,6 +406,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): return self._failure def _set_failure(self, failure): + log.err(failure) self._failure = failure self._finished = True @@ -419,6 +424,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type docs_to_process: int """ self._docs_to_process = docs_to_process + self._finished = False self._schedule_decrypt_and_process() def insert_encrypted_received_doc( @@ -729,7 +735,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :rtype: twisted.internet.defer.Deferred """ if not self.failed(): - if self._processed_docs < self._docs_to_process: + processed = self._processed_docs + pending = self._docs_to_process + + if not self.has_finished() and processed < pending: yield self._async_decrypt_received_docs() yield self._collect_async_decryption_results() docs = yield self._process_decrypted_docs() @@ -737,7 +746,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # recurse self._schedule_decrypt_and_process() else: - self._finished = True + self._mark_finished() + + def _mark_finished(self): + self._finished = True + self._processed_docs = 0 + self._last_inserted_idx = 0 def has_finished(self): """ diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index 30590ae1..ac078f39 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -75,8 +75,6 @@ class SoledadHTTPSyncTarget(SyncTarget): :param source_replica_uid: The source replica uid which we use when deferring decryption. :type source_replica_uid: str - :param url: The url of the target replica to sync with. - :type url: str :param creds: A dictionary containing the uuid and token. :type creds: creds :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt @@ -98,7 +96,7 @@ class SoledadHTTPSyncTarget(SyncTarget): """ if url.endswith("/"): url = url[:-1] - self._url = str(url) + "/sync-from/" + source_replica_uid + self._url = str(url) + "/sync-from/" + str(source_replica_uid) self.source_replica_uid = source_replica_uid self._auth_header = None self.set_creds(creds) @@ -111,6 +109,9 @@ class SoledadHTTPSyncTarget(SyncTarget): self._sync_decr_pool = None self._http = HTTPClient(cert_file) + def close(self): + self._http.close() + def set_creds(self, creds): """ Update credentials. diff --git a/client/src/leap/soledad/client/interfaces.py b/client/src/leap/soledad/client/interfaces.py index 4f7b0779..14b34d24 100644 --- a/client/src/leap/soledad/client/interfaces.py +++ b/client/src/leap/soledad/client/interfaces.py @@ -19,6 +19,37 @@ Interfaces used by the Soledad Client. """ from zope.interface import Interface, Attribute +# +# Plugins +# + + +class ISoledadPostSyncPlugin(Interface): + """ + I implement the minimal methods and attributes for a plugin that can be + called after a soledad synchronization has ended. + """ + + def process_received_docs(self, doc_id_list): + """ + Do something with the passed list of doc_ids received after the last + sync. + + :param doc_id_list: a list of strings for the received doc_ids + """ + + watched_doc_types = Attribute(""" + a tuple of the watched doc types for this plugin. So far, the + `doc-types` convention is just the preffix of the doc_id, which is + basically its first character, followed by a dash. So, for instance, + `M-` is used for meta-docs in mail, and `F-` is used for flag-docs in + mail. For now there's no central register of all the doc-types + used.""") + + +# +# Soledad storage +# class ILocalStorage(Interface): """ diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index b2025130..75d786a6 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -456,6 +456,9 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._syncers = {} + # Storage for the documents received during a sync + self.received_docs = [] + self.running = False self._sync_threadpool = None self._initialize_sync_threadpool() @@ -587,8 +590,16 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # the following context manager blocks until the syncing lock can be # acquired. with self._syncer(url, creds=creds) as syncer: + + def _record_received_docs(result): + # beware, closure. syncer is in scope. + self.received_docs = syncer.received_docs + return result + # XXX could mark the critical section here... - return syncer.sync(defer_decryption=defer_decryption) + d = syncer.sync(defer_decryption=defer_decryption) + d.addCallback(_record_received_docs) + return d @contextmanager def _syncer(self, url, creds=None): diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 53172f31..14c547cf 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -39,6 +39,7 @@ class SoledadSynchronizer(Synchronizer): Also modified to allow for interrupting the synchronization process. """ + received_docs = [] @defer.inlineCallbacks def sync(self, defer_decryption=True): @@ -62,6 +63,7 @@ class SoledadSynchronizer(Synchronizer): :rtype: twisted.internet.defer.Deferred """ sync_target = self.sync_target + self.received_docs = [] # get target identifier, its current generation, # and its last-seen database generation for this source @@ -123,12 +125,14 @@ class SoledadSynchronizer(Synchronizer): changed_doc_ids = [doc_id for doc_id, _, _ in changes] docs_to_send = self.source.get_docs( changed_doc_ids, check_for_conflicts=False, include_deleted=True) + ids_sent = [] docs_by_generation = [] idx = 0 for doc in docs_to_send: _, gen, trans = changes[idx] docs_by_generation.append((doc, gen, trans)) idx += 1 + ids_sent.append(doc.doc_id) # exchange documents and try to insert the returned ones with # the target, return target synced-up-to gen. @@ -151,6 +155,12 @@ class SoledadSynchronizer(Synchronizer): self._syncing_info = info yield self.complete_sync() + _, _, changes = self.source.whats_changed(target_my_gen) + changed_doc_ids = [doc_id for doc_id, _, _ in changes] + + just_received = list(set(changed_doc_ids) - set(ids_sent)) + self.received_docs = just_received + defer.returnValue(my_gen) def complete_sync(self): |