summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/sync.py10
-rw-r--r--client/src/leap/soledad/client/target.py77
2 files changed, 56 insertions, 31 deletions
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index be60d0ab..5d545a77 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -32,6 +32,7 @@ Extend u1db Synchronizer with the ability to:
import json
import logging
+import traceback
from threading import Lock
from u1db import errors
@@ -85,13 +86,11 @@ class SoledadSynchronizer(Synchronizer):
return self._sync(autocreate=autocreate,
defer_decryption=defer_decryption)
except Exception:
- # We release the lock if there was an error.
- # Otherwise, the lock should be released from the function
- # `complete_sync`.
- self.release_syncing_lock()
# re-raising the exceptions to let syqlcipher.sync catch them
# (and re-create the syncer instance if needed)
raise
+ finally:
+ self.release_syncing_lock()
def _sync(self, autocreate=False, defer_decryption=True):
"""
@@ -161,7 +160,6 @@ class SoledadSynchronizer(Synchronizer):
if not changes and target_last_known_gen == target_gen:
if target_trans_id != target_last_known_trans_id:
raise errors.InvalidTransactionId
- self.release_syncing_lock()
return my_gen
# prepare to send all the changed docs
@@ -205,6 +203,7 @@ class SoledadSynchronizer(Synchronizer):
self.complete_sync()
except Exception as e:
logger.error("Soledad sync error: %s" % str(e))
+ logger.error(traceback.format_exc())
sync_target.stop()
finally:
sync_target.close()
@@ -228,7 +227,6 @@ class SoledadSynchronizer(Synchronizer):
# if gapless record current reached generation with target
self._record_sync_info_with_the_target(info["my_gen"])
- self.syncing_lock.release()
@property
def syncing(self):
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 6d8ecfeb..70e4d3a2 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -794,15 +794,21 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self.source_replica_uid = source_replica_uid
self._defer_decryption = False
+ # deferred decryption attributes
self._sync_db = None
+ self._sync_db_write_lock = None
+ self._decryption_callback = None
self._sync_decr_pool = None
self._sync_watcher = None
-
if sync_db and sync_db_write_lock is not None:
self._sync_db = sync_db
- self._decryption_callback = None
self._sync_db_write_lock = sync_db_write_lock
+ def _setup_sync_decr_pool(self):
+ """
+ Set up the SyncDecrypterPool for deferred decryption.
+ """
+ if self._sync_decr_pool is None:
# initialize syncing queue decryption pool
self._sync_decr_pool = SyncDecrypterPool(
self._crypto, self._sync_db,
@@ -810,10 +816,33 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
insert_doc_cb=self._insert_doc_cb)
self._sync_decr_pool.set_source_replica_uid(
self.source_replica_uid)
+
+ def _teardown_sync_decr_pool(self):
+ """
+ Tear down the SyncDecrypterPool.
+ """
+ if self._sync_decr_pool is not None:
+ self._sync_decr_pool.close()
+ self._sync_decr_pool = None
+
+ def _setup_sync_watcher(self):
+ """
+ Set up the sync watcher for deferred decryption.
+ """
+ if self._sync_watcher is None:
self._sync_watcher = TimerTask(
self._decrypt_syncing_received_docs,
delay=self.DECRYPT_TASK_PERIOD)
+ def _teardown_sync_watcher(self):
+ """
+ Tear down the sync watcher.
+ """
+ if self._sync_watcher is not None:
+ self._sync_watcher.stop()
+ self._sync_watcher.shutdown()
+ self._sync_watcher = None
+
def _get_replica_uid(self, url):
"""
Return replica uid from the url, or None.
@@ -824,17 +853,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url)
return replica_uid_match[0] if len(replica_uid_match) > 0 else None
- def close(self):
- """
- Cleanly close pool of workers.
- """
- if self._sync_watcher is not None:
- self._sync_watcher.stop()
- self._sync_watcher.shutdown()
- if self._sync_decr_pool is not None:
- self._sync_decr_pool.close()
- HTTPSyncTarget.close(self)
-
@staticmethod
def connect(url, source_replica_uid=None, crypto=None):
return SoledadSyncTarget(
@@ -1044,9 +1062,18 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if last_successful_thread is not None:
body, _ = last_successful_thread.response
parsed_body = json.loads(body)
- metadata = parsed_body[0]
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
+ # get current target gen and trans id in case no documents were
+ # transferred
+ if len(parsed_body) == 1:
+ metadata = parsed_body[0]
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
+ # get current target gen and trans id from last transferred
+ # document
+ else:
+ doc_data = parsed_body[1]
+ new_generation = doc_data['gen']
+ new_transaction_id = doc_data['trans_id']
return new_generation, new_transaction_id
@@ -1097,16 +1124,15 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:rtype: tuple
"""
self._ensure_callback = ensure_callback
+
if defer_decryption:
- if self._sync_watcher is None:
- logger.warning(
- "Soledad syncer: can't defer decryption, falling back to "
- "normal syncing mode.")
- defer_decryption = False
- else:
- self._sync_exchange_lock.acquire()
- self._defer_decryption = True
+ self._sync_exchange_lock.acquire()
+ self._setup_sync_decr_pool()
+ self._setup_sync_watcher()
+ self._defer_decryption = True
+
self.start()
+
if sync_id is None:
sync_id = str(uuid4())
self.source_replica_uid = source_replica_uid
@@ -1248,8 +1274,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if defer_decryption:
while self.clear_to_sync() is False:
sleep(self.DECRYPT_TASK_PERIOD)
+ self._teardown_sync_watcher()
+ self._teardown_sync_decr_pool()
self._sync_exchange_lock.release()
- self._sync_watcher.stop()
self.stop()
return cur_target_gen, cur_target_trans_id