summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad
diff options
context:
space:
mode:
authorVictor Shyba <victor.shyba@gmail.com>2016-02-16 12:36:43 -0300
committerKali Kaneko <kali@leap.se>2016-06-06 19:58:50 -0400
commitebcf2a098fb8e9b1211e31b4955aa67cfebc5854 (patch)
tree340fbb3dcb929ed0318ad35a05f38d2176aeef8c /client/src/leap/soledad
parent498e9e1353700b61950ef87c007e6c0a84fbe201 (diff)
[bug] delete all docs on start and ensure isolation
Docs created from one failed sync would be there for the next one, possibly causing a lot of hard to find errors. This commit adds a sync_id field to track each sync documents isolated and cleans up the pool on start instead of constructor.
Diffstat (limited to 'client/src/leap/soledad')
-rw-r--r--client/src/leap/soledad/client/encdecpool.py96
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py7
2 files changed, 35 insertions, 68 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 218ebfa9..7d646c51 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -24,6 +24,7 @@ during synchronization.
import json
import logging
+from uuid import uuid4
from twisted.internet.task import LoopingCall
from twisted.internet import threads
@@ -65,13 +66,9 @@ class SyncEncryptDecryptPool(object):
self._started = False
def start(self):
- if self.running:
- return
self._started = True
def stop(self):
- if not self.running:
- return
self._started = False
# maybe cancel the next delayed call
if self._delayed_call \
@@ -312,7 +309,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
TABLE_NAME = "docs_received"
FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \
- "trans_id, encrypted, idx"
+ "trans_id, encrypted, idx, sync_id"
"""
Period of recurrence of the periodic decrypting task, in seconds.
@@ -343,42 +340,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._processed_docs = 0
self._last_inserted_idx = 0
- # initialize db and make sure any database operation happens after
- # db initialization
- self._deferred_init = self._init_db()
- self._wait_init_db('_runOperation', '_runQuery')
self._loop = LoopingCall(self._decrypt_and_recurse)
- self._decrypted_docs_indexes = set()
-
- def _wait_init_db(self, *methods):
- """
- Methods that need to wait for db initialization.
-
- :param methods: methods that need to wait for initialization
- :type methods: tuple(str)
- """
- self._waiting = []
- self._stored = {}
-
- def _restore(_):
- for method in self._stored:
- setattr(self, method, self._stored[method])
- for d in self._waiting:
- d.callback(None)
-
- def _makeWrapper(method):
- def wrapper(*args, **kw):
- d = defer.Deferred()
- d.addCallback(lambda _: self._stored[method](*args, **kw))
- self._waiting.append(d)
- return d
- return wrapper
-
- for method in methods:
- self._stored[method] = getattr(self, method)
- setattr(self, method, _makeWrapper(method))
-
- self._deferred_init.addCallback(_restore)
def start(self, docs_to_process):
"""
@@ -391,9 +353,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type docs_to_process: int
"""
SyncEncryptDecryptPool.start(self)
+ self._decrypted_docs_indexes = set()
+ self._sync_id = uuid4().hex
self._docs_to_process = docs_to_process
self._deferred = defer.Deferred()
- self._loop.start(self.DECRYPT_LOOP_PERIOD)
+ d = self._init_db()
+ d.addCallback(lambda _: self._loop.start(self.DECRYPT_LOOP_PERIOD))
+ return d
def stop(self):
if self._loop.running:
@@ -401,6 +367,17 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._finish()
SyncEncryptDecryptPool.stop(self)
+ def _init_db(self):
+ """
+ Empty the received docs table of the sync database.
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,)
+ return self._runOperation(query, (self._sync_id,))
+
def _errback(self, failure):
log.err(failure)
self._deferred.errback(failure)
@@ -474,10 +451,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
if not isinstance(content, str):
content = json.dumps(content)
- query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \
% self.TABLE_NAME
d = self._runOperation(
- query, (doc_id, doc_rev, content, gen, trans_id, 0, idx))
+ query, (doc_id, doc_rev, content, gen, trans_id, 0,
+ idx, self._sync_id))
d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx))
return d
@@ -516,8 +494,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
return self.insert_received_doc(
doc_id, rev, content, gen, trans_id, idx)
- def _get_docs(self, encrypted=None, order_by='idx', order='ASC',
- sequence=None):
+ def _get_docs(self, encrypted=None, sequence=None):
"""
Get documents from the received docs table in the sync db.
@@ -525,9 +502,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
field equal to given parameter.
:type encrypted: bool or None
:param order_by: The name of the field to order results.
- :type order_by: str
- :param order: Whether the order should be ASC or DESC.
- :type order: str
:return: A deferred that will fire with the results of the database
query.
@@ -535,15 +509,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \
"idx FROM %s" % self.TABLE_NAME
+ parameters = []
if encrypted or sequence:
- query += " WHERE"
+ query += " WHERE sync_id = ? and"
+ parameters += [self._sync_id]
if encrypted:
- query += " encrypted = %d" % int(encrypted)
+ query += " encrypted = ?"
+ parameters += [int(encrypted)]
if sequence:
- query += " idx in (" + ', '.join(sequence) + ")"
-
- query += " ORDER BY %s %s" % (order_by, order)
- return self._runQuery(query)
+ query += " idx in (" + ', '.join('?' * len(sequence)) + ")"
+ parameters += [int(i) for i in sequence]
+ query += " ORDER BY idx ASC"
+ return self._runQuery(query, parameters)
@defer.inlineCallbacks
def _get_insertable_docs(self):
@@ -625,17 +602,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._last_inserted_idx = idx
self._processed_docs += 1
- def _init_db(self):
- """
- Empty the received docs table of the sync database.
-
- :return: A deferred that will fire when the operation in the database
- has finished.
- :rtype: twisted.internet.defer.Deferred
- """
- query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
- return self._runOperation(query)
-
@defer.inlineCallbacks
def _decrypt_and_recurse(self):
"""
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 9f7a4193..9801c3d9 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -81,9 +81,6 @@ class HTTPDocFetcher(object):
new_generation = ngen
new_transaction_id = ntrans
- if defer_decryption:
- self._sync_decr_pool.start(number_of_changes)
-
# ---------------------------------------------------------------------
# maybe receive the rest of the documents
# ---------------------------------------------------------------------
@@ -151,6 +148,10 @@ class HTTPDocFetcher(object):
new_generation, new_transaction_id, number_of_changes, doc_id, \
rev, content, gen, trans_id = \
self._parse_received_doc_response(response)
+
+ if self._sync_decr_pool and not self._sync_decr_pool.running:
+ self._sync_decr_pool.start(number_of_changes)
+
if doc_id is not None:
# decrypt incoming document and insert into local database
# -------------------------------------------------------------