diff options
Diffstat (limited to 'client/src')
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 175 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 64 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sync.py | 4 | ||||
-rw-r--r-- | client/src/leap/soledad/client/target.py | 217 |
4 files changed, 275 insertions, 185 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 0466ec5d..0c1f92ea 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -85,6 +85,36 @@ class SyncEncryptDecryptPool(object): logger.debug("Terminating %s" % (self.__class__.__name__,)) self._pool.terminate() + def _runOperation(self, query, *args): + """ + Run an operation on the sync db. + + :param query: The query to be executed. + :type query: str + :param args: A list of query arguments. + :type args: list + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + return self._sync_db.runOperation(query, *args) + + def _runQuery(self, query, *args): + """ + Run a query on the sync db. + + :param query: The query to be executed. + :type query: str + :param args: A list of query arguments. + :type args: list + + :return: A deferred that will fire with the results of the database + query. + :rtype: twisted.internet.defer.Deferred + """ + return self._sync_db.runQuery(query, *args) + def encrypt_doc_task(doc_id, doc_rev, content, key, secret): """ @@ -119,7 +149,50 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): TABLE_NAME = "docs_tosync" FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" - def encrypt_doc(self, doc, workers=True): + ENCRYPT_LOOP_PERIOD = 0.5 + + def __init__(self, *args, **kwargs): + """ + Initialize the sync encrypter pool. + """ + SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + + self._stopped = False + self._sync_queue = multiprocessing.Queue() + + # start the encryption loop + self._deferred_loop = deferToThread(self._encrypt_docs_loop) + self._deferred_loop.addCallback( + lambda _: logger.debug("Finished encrypter thread.")) + + def enqueue_doc_for_encryption(self, doc): + """ + Enqueue a document for encryption. + + :param doc: The document to be encrypted. + :type doc: SoledadDocument + """ + try: + self.sync_queue.put_nowait(doc) + except multiprocessing.Queue.Full: + # do not asynchronously encrypt this file if the queue is full + pass + + def _encrypt_docs_loop(self): + """ + Process the syncing queue and send the documents there to be encrypted + in the sync db. They will be read by the SoledadSyncTarget during the + sync_exchange. + """ + logger.debug("Starting encrypter thread.") + while not self._stopped: + try: + doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD) + self._encrypt_doc(doc) + except multiprocessing.Queue.Empty: + pass + + def _encrypt_doc(self, doc, workers=True): """ Symmetrically encrypt a document. @@ -136,20 +209,20 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): secret = self._crypto.secret args = doc.doc_id, doc.rev, docstr, key, secret - try: - if workers: - res = self._pool.apply_async( - encrypt_doc_task, args, - callback=self.encrypt_doc_cb) - else: - # encrypt inline + if workers: + # encrypt asynchronously + self._pool.apply_async( + encrypt_doc_task, args, + callback=self._encrypt_doc_cb) + else: + # encrypt inline + try: res = encrypt_doc_task(*args) - self.encrypt_doc_cb(res) + self._encrypt_doc_cb(res) + except Exception as exc: + logger.exception(exc) - except Exception as exc: - logger.exception(exc) - - def encrypt_doc_cb(self, result): + def _encrypt_doc_cb(self, result): """ Insert results of encryption routine into the local sync database. @@ -158,9 +231,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :type result: tuple(str, str, str) """ doc_id, doc_rev, content = result - return self.insert_encrypted_local_doc(doc_id, doc_rev, content) + return self._insert_encrypted_local_doc(doc_id, doc_rev, content) - def insert_encrypted_local_doc(self, doc_id, doc_rev, content): + def _insert_encrypted_local_doc(self, doc_id, doc_rev, content): """ Insert the contents of the encrypted doc into the local sync database. @@ -174,7 +247,58 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): """ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ % (self.TABLE_NAME,) - return self._sync_db.runOperation(query, (doc_id, doc_rev, content)) + return self._runOperation(query, (doc_id, doc_rev, content)) + + @defer.inlineCallbacks + def get_encrypted_doc(self, doc_id, doc_rev): + """ + Get an encrypted document from the sync db. + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + + :return: A deferred that will fire with the encrypted content of the + document or None if the document was not found in the sync + db. + :rtype: twisted.internet.defer.Deferred + """ + logger.debug("Trying to get encrypted doc from sync db: %s" % doc_id) + query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \ + % self.TABLE_NAME + result = yield self._runQuery(query, (doc_id, doc_rev)) + if result: + val = result.pop() + defer.returnValue(val[0]) + defer.returnValue(None) + + def delete_encrypted_doc(self, doc_id, doc_rev): + """ + Delete an encrypted document from the sync db. + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM %s WHERE doc_id=? and rev=?" \ + % self.TABLE_NAME + self._runOperation(query, (doc_id, doc_rev)) + + def close(self): + """ + Close the encrypter pool. + """ + self._stopped = True + self._sync_queue.close() + q = self._sync_queue + del q + self._sync_queue = None def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, @@ -275,9 +399,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): d.addCallback(lambda _: self._empty_db.set()) # start the decryption loop - self._deferred_loop = deferToThread(self._decrypt_and_process_docs) + self._deferred_loop = deferToThread( + self._decrypt_and_process_docs_loop) self._deferred_loop.addCallback( - lambda _: logger.debug("Finished decryptor thread.")) + lambda _: logger.debug("Finished decrypter thread.")) def set_docs_to_process(self, docs_to_process): """ @@ -317,7 +442,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): docstr = json.dumps(content) query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ % self.TABLE_NAME - return self._sync_db.runOperation( + return self._runOperation( query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx)) def insert_received_doc( @@ -348,7 +473,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): content = json.dumps(content) query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ % self.TABLE_NAME - return self._sync_db.runOperation( + return self._runOperation( query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) def _delete_received_doc(self, doc_id): @@ -364,7 +489,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ query = "DELETE FROM '%s' WHERE doc_id=?" \ % self.TABLE_NAME - return self._sync_db.runOperation(query, (doc_id,)) + return self._runOperation(query, (doc_id,)) def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx, workers=True): @@ -474,7 +599,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): if encrypted is not None: query += " WHERE encrypted = %d" % int(encrypted) query += " ORDER BY %s %s" % (order_by, order) - return self._sync_db.runQuery(query) + return self._runQuery(query) @defer.inlineCallbacks def _get_insertable_docs(self): @@ -612,7 +737,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :rtype: twisted.internet.defer.Deferred """ query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) - return self._sync_db.runOperation(query) + return self._runOperation(query) def _raise_if_async_fails(self): """ @@ -627,7 +752,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # re-raise the exception raised by the remote call res.get() - def _decrypt_and_process_docs(self): + def _decrypt_and_process_docs_loop(self): """ Decrypt the documents received from remote replica and insert them into the local one. @@ -668,6 +793,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ Wait for the decrypt-and-process loop to finish. """ + logger.debug("Waiting for asynchronous decryption of incoming documents...") self._finished.wait() + logger.debug("Asynchronous decryption of incoming documents finished.") if self._exception: raise self._exception diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 39d5dd0e..16241621 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -42,7 +42,6 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases handled by Soledad should be created by SQLCipher >= 2.0. """ import logging -import multiprocessing import os import threading import json @@ -286,10 +285,9 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :rtype: str """ doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) - - # TODO XXX move to API XXX if self.defer_encryption: - self.sync_queue.put_nowait(doc) + # TODO move to api? + self._sync_enc_pool.enqueue_doc_for_encryption(doc) return doc_rev # @@ -429,13 +427,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db' """ - A dictionary that hold locks which avoid multiple sync attempts from the - same database replica. - """ - # XXX We do not need the lock here now. Remove. - encrypting_lock = threading.Lock() - - """ Period or recurrence of the Looping Call that will do the encryption to the syncdb (in seconds). """ @@ -458,7 +449,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._sync_db_key = opts.sync_db_key self._sync_db = None self._sync_enc_pool = None - self.sync_queue = None # we store syncers in a dictionary indexed by the target URL. We also # store a hash of the auth info in case auth info expires and we need @@ -468,7 +458,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # self._syncers = {'<url>': ('<auth_hash>', syncer), ...} self._syncers = {} - self.sync_queue = multiprocessing.Queue() self.running = False self._sync_threadpool = None @@ -486,24 +475,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._initialize_sync_db(opts) if defer_encryption: - # initialize syncing queue encryption pool self._sync_enc_pool = encdecpool.SyncEncrypterPool( self._crypto, self._sync_db) - # ----------------------------------------------------------------- - # From the documentation: If f returns a deferred, rescheduling - # will not take place until the deferred has fired. The result - # value is ignored. - - # TODO use this to avoid multiple sync attempts if the sync has not - # finished! - # ----------------------------------------------------------------- - - # XXX this was called sync_watcher --- trace any remnants - self._sync_loop = LoopingCall(self._encrypt_syncing_docs) - self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD) - self.shutdownID = None @property @@ -703,7 +678,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._replica_uid, creds=creds, crypto=self._crypto, - sync_db=self._sync_db)) + sync_db=self._sync_db, + sync_enc_pool=self._sync_enc_pool)) self._syncers[url] = (h, syncer) # in order to reuse the same synchronizer multiple times we have to # reset its state (i.e. the number of documents received from target @@ -715,33 +691,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # Symmetric encryption of syncing docs # - def _encrypt_syncing_docs(self): - """ - Process the syncing queue and send the documents there - to be encrypted in the sync db. They will be read by the - SoledadSyncTarget during the sync_exchange. - - Called periodically from the LoopingCall self._sync_loop. - """ - # TODO should return a deferred that would firewhen the encryption is - # done. See note on __init__ - - lock = self.encrypting_lock - # optional wait flag used to avoid blocking - if not lock.acquire(False): - return - else: - queue = self.sync_queue - try: - while not queue.empty(): - doc = queue.get_nowait() - self._sync_enc_pool.encrypt_doc(doc) - - except Exception as exc: - logger.error("Error while encrypting docs to sync") - logger.exception(exc) - finally: - lock.release() def get_generation(self): # FIXME @@ -779,11 +728,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): if self._sync_db is not None: self._sync_db.close() self._sync_db = None - # close the sync queue - if self.sync_queue is not None: - self.sync_queue.close() - del self.sync_queue - self.sync_queue = None class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index d3f106da..d4ca4258 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -195,10 +195,6 @@ class SoledadSynchronizer(Synchronizer): "my_gen": my_gen } self._syncing_info = info - if defer_decryption and not sync_target.has_syncdb(): - logger.debug("Sync target has no valid sync db, " - "aborting defer_decryption") - defer_decryption = False self.complete_sync() except Exception as e: logger.error("Soledad sync error: %s" % str(e)) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index f2415218..667aab15 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -28,6 +28,7 @@ import threading from collections import defaultdict from time import sleep from uuid import uuid4 +from functools import partial import simplejson as json @@ -38,12 +39,12 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase from zope.proxy import ProxyBase from zope.proxy import setProxiedObject +from twisted.internet import defer from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth from leap.soledad.client.crypto import is_symmetrically_encrypted from leap.soledad.client.crypto import encrypt_doc, decrypt_doc -from leap.soledad.client.encdecpool import SyncEncrypterPool from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS @@ -111,6 +112,7 @@ class DocumentSyncerThread(threading.Thread): self._exception = None self._result = None self._success = False + self.started = threading.Event() # a lock so we can signal when we're finished self._request_lock = threading.Lock() self._request_lock.acquire() @@ -128,6 +130,8 @@ class DocumentSyncerThread(threading.Thread): finish before actually performing the request. It also traps any exception and register any failure with the request. """ + self.started.set() + with self._stop_lock: if self._stopped is None: self._stopped = False @@ -756,7 +760,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, - sync_db=None): + sync_db=None, sync_enc_pool=None): """ Initialize the SoledadSyncTarget. @@ -787,8 +791,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self.source_replica_uid = source_replica_uid self._syncer_pool = None - # deferred decryption attributes + # asynchronous encryption/decryption attributes self._sync_db = sync_db + self._sync_enc_pool = sync_enc_pool self._decryption_callback = None self._sync_decr_pool = None @@ -796,7 +801,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ Set up the SyncDecrypterPool for deferred decryption. """ - if self._sync_decr_pool is None: + if self._sync_decr_pool is None and self._sync_db is not None: # initialize syncing queue decryption pool self._sync_decr_pool = SyncDecrypterPool( self._crypto, @@ -1018,7 +1023,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): t.join() if t.success: number_of_changes, _, _ = t.result - if defer_decryption and number_of_changes: + if defer_decryption: self._sync_decr_pool.set_docs_to_process( number_of_changes) else: @@ -1060,6 +1065,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): return new_generation, new_transaction_id + @property + def _defer_encryption(self): + return self._sync_enc_pool is not None + + @property + def _defer_decryption(self): + return self._sync_decr_pool is not None + def sync_exchange(self, docs_by_generations, source_replica_uid, last_known_generation, last_known_trans_id, return_doc_cb, @@ -1126,17 +1139,19 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen = last_known_generation cur_target_trans_id = last_known_trans_id - # send docs + # ------------------------------------------------------------------- + # start of send documents to target + # ------------------------------------------------------------------- msg = "%d/%d" % (0, len(docs_by_generations)) signal(SOLEDAD_SYNC_SEND_STATUS, msg) logger.debug("Soledad sync send status: %s" % msg) - defer_encryption = self._sync_db is not None self._syncer_pool = DocumentSyncerPool( self._raw_url, self._raw_creds, url, headers, ensure_callback, self.stop_syncer) threads = [] last_callback_lock = None + sent = 0 total = len(docs_by_generations) @@ -1156,66 +1171,78 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # ------------------------------------------------------------- # symmetric encryption of document's contents # ------------------------------------------------------------- - doc_json = doc.get_json() - if not doc.is_tombstone(): - if not defer_encryption: - # fallback case, for tests - doc_json = encrypt_doc(self._crypto, doc) - else: - try: - doc_json = self.get_encrypted_doc_from_db( - doc.doc_id, doc.rev) - except Exception as exc: - logger.error("Error while getting " - "encrypted doc from db") - logger.exception(exc) - continue + + # the following var will hold a deferred because we may try to + # fetch the encrypted document from the sync db + d = None + + if doc.is_tombstone(): + d = defer.succeed(None) + elif not self._defer_encryption: + # fallback case, for tests + d = defer.succeed(encrypt_doc(self._crypto, doc)) + else: + + def _maybe_encrypt_doc_inline(doc_json): if doc_json is None: - # Not marked as tombstone, but we got nothing - # from the sync db. As it is not encrypted yet, we - # force inline encryption. + # the document is not marked as tombstone, but we got + # nothing from the sync db. As it is not encrypted + # yet, we force inline encryption. # TODO: implement a queue to deal with these cases. - doc_json = encrypt_doc(self._crypto, doc) + return encrypt_doc(self._crypto, doc) + return doc_json + + d = self.get_encrypted_doc_from_db(doc.doc_id, doc.rev) + d.addCallback(_maybe_encrypt_doc_inline) # ------------------------------------------------------------- # end of symmetric encryption # ------------------------------------------------------------- + t = self._syncer_pool.new_syncer_thread( sent + 1, total, last_request_lock=last_request_lock, last_callback_lock=last_callback_lock) - # bail out if any thread failed + # bail out if creation of any thread failed if t is None: self.stop(fail=True) break - # set the request method - t.doc_syncer.set_request_method( - 'put', sync_id, cur_target_gen, cur_target_trans_id, - id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, - trans_id=trans_id, number_of_docs=number_of_docs, - doc_idx=sent + 1) - # set the success calback + # the following callback will be called when the document's + # encrypted content is available, either because it was found on + # the sync db or because it has been encrypted inline. - def _success_callback(idx, total, response): - _success_msg = "Soledad sync send status: %d/%d" \ - % (idx, total) - signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) - logger.debug(_success_msg) + def _configure_and_start_thread(t, doc_json): + # set the request method + t.doc_syncer.set_request_method( + 'put', sync_id, cur_target_gen, cur_target_trans_id, + id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, + trans_id=trans_id, number_of_docs=number_of_docs, + doc_idx=sent + 1) + # set the success calback - t.doc_syncer.set_success_callback(_success_callback) + def _success_callback(idx, total, response): + _success_msg = "Soledad sync send status: %d/%d" \ + % (idx, total) + signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) + logger.debug(_success_msg) - # set the failure callback - def _failure_callback(idx, total, exception): - _failure_msg = "Soledad sync: error while sending document " \ - "%d/%d: %s" % (idx, total, exception) - logger.warning("%s" % _failure_msg) - logger.warning("Soledad sync: failing gracefully, will " - "recover on next sync.") + t.doc_syncer.set_success_callback(_success_callback) - t.doc_syncer.set_failure_callback(_failure_callback) + # set the failure callback + def _failure_callback(idx, total, exception): + _failure_msg = "Soledad sync: error while sending document " \ + "%d/%d: %s" % (idx, total, exception) + logger.warning("%s" % _failure_msg) + logger.warning("Soledad sync: failing gracefully, will " + "recover on next sync.") + + t.doc_syncer.set_failure_callback(_failure_callback) + + # save thread and append + t.start() + + d.addCallback(partial(_configure_and_start_thread, t)) - # save thread and append - t.start() threads.append((t, doc)) # update lock references so they can be used in next call to @@ -1230,6 +1257,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): while threads: # check if there are failures t, doc = threads.pop(0) + t.started.wait() t.join() if t.success: synced.append((doc.doc_id, doc.rev)) @@ -1238,8 +1266,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): raise t.exception # delete documents from the sync database - if defer_encryption: - self.delete_encrypted_docs_from_db(synced) + if self._defer_encryption: + self._delete_encrypted_docs_from_db(synced) # get target gen and trans_id after docs gen_after_send = None @@ -1248,16 +1276,23 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): response_dict = json.loads(last_successful_thread.response[0])[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] - - # get docs from target - if self._sync_db is None: - defer_decryption = False + # ------------------------------------------------------------------- + # end of send documents to target + # ------------------------------------------------------------------- + + # ------------------------------------------------------------------- + # start of fetch documents from target + # ------------------------------------------------------------------- + defer_decryption = defer_decryption and self._defer_decryption if self.stopped is False: cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, last_known_generation, last_known_trans_id, headers, return_doc_cb, ensure_callback, sync_id, defer_decryption=defer_decryption) + # ------------------------------------------------------------------- + # end of fetch documents from target + # ------------------------------------------------------------------- self._syncer_pool.cleanup() @@ -1308,6 +1343,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): with self._stop_lock: return self._stopped is True + # + # Symmetric encryption of syncing docs + # + def get_encrypted_doc_from_db(self, doc_id, doc_rev): """ Retrieve encrypted document from the database of encrypted docs for @@ -1318,33 +1357,31 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :param doc_rev: The document revision :type doc_rev: str + + :return: A deferred which is fired with the document's encrypted + content or None if the document was not found on the sync db. + :rtype: twisted.internet.defer.Deferred """ - encr = SyncEncrypterPool - sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( - encr.TABLE_NAME,)) - res = self._fetchall(sql, (doc_id, doc_rev)) - if res: - val = res.pop() - return val[0] - else: - # no doc found - return None + logger.debug("Looking for encrypted document on sync db: %s" % doc_id) + return self._sync_enc_pool.get_encrypted_doc(doc_id, doc_rev) - def delete_encrypted_docs_from_db(self, docs_ids): + def _delete_encrypted_docs_from_db(self, docs): """ Delete several encrypted documents from the database of symmetrically encrypted docs to sync. - :param docs_ids: an iterable with (doc_id, doc_rev) for all documents - to be deleted. - :type docs_ids: any iterable of tuples of str + :param docs: an iterable with (doc_id, doc_rev) for all documents + to be deleted. + :type docs: any iterable of tuples of str """ - if docs_ids: - encr = SyncEncrypterPool - for doc_id, doc_rev in docs_ids: - sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( - encr.TABLE_NAME,)) - self._sync_db.execute(sql, (doc_id, doc_rev)) + for doc_id, doc_rev in docs: + logger.debug("Removing encrypted document on sync db: %s" + % doc_id) + return self._sync_enc_pool.delete_encrypted_doc(doc_id, doc_rev) + + # + # Symmetric decryption of syncing docs + # def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total): """ @@ -1357,15 +1394,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :param gen: The generation. :type gen: str :param trans_id: Transacion id. - :type gen: str + :param idx: The index count of the current operation. :type idx: int :param total: The total number of operations. :type total: int """ - logger.debug( - "Enqueueing doc for decryption: %d/%d." - % (idx + 1, total)) + logger.debug("Enqueueing doc for decryption: %d/%d." + % (idx + 1, total)) self._sync_decr_pool.insert_encrypted_received_doc( doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) @@ -1384,16 +1420,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :param total: The total number of operations. :type total: int """ - logger.debug( - "Enqueueing doc, no decryption needed: %d/%d." - % (idx + 1, total)) + logger.debug("Enqueueing doc, no decryption needed: %d/%d." + % (idx + 1, total)) self._sync_decr_pool.insert_received_doc( doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) - # - # Symmetric decryption of syncing docs - # - def set_decryption_callback(self, cb): """ Set callback to be called when the decryption finishes. @@ -1410,11 +1441,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ return self._decryption_callback is not None - def has_syncdb(self): - """ - Return True if we have an initialized syncdb. - """ - return self._sync_db is not None + # + # Authentication methods + # def _sign_request(self, method, url_query, params): """ @@ -1442,9 +1471,3 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :type token: str """ TokenBasedAuth.set_token_credentials(self, uuid, token) - - def _fetchall(self, *args, **kwargs): - with self._sync_db: - c = self._sync_db.cursor() - c.execute(*args, **kwargs) - return c.fetchall() |