From 8d504fa812da93df3a26c4b4b761a74685d40f25 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 24 Dec 2013 08:17:37 -0200 Subject: Avoid concurrent sync attempts from the same replica in the client (#4451). --- client/changes/feature_4451_avoid_concurrent_syncs | 2 + client/src/leap/soledad/client/__init__.py | 23 +++++++++--- .../tests/test_couch_operations_atomicity.py | 43 ++++++++++++++++++++++ 3 files changed, 63 insertions(+), 5 deletions(-) create mode 100644 client/changes/feature_4451_avoid_concurrent_syncs diff --git a/client/changes/feature_4451_avoid_concurrent_syncs b/client/changes/feature_4451_avoid_concurrent_syncs new file mode 100644 index 00000000..04a2c4df --- /dev/null +++ b/client/changes/feature_4451_avoid_concurrent_syncs @@ -0,0 +1,2 @@ + o Avoid concurrent syncs for the same account, but allow for distinct + accounts (4451). diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index a0b3f45a..d35d3a2a 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -34,6 +34,8 @@ import urlparse import hmac from hashlib import sha256 +from threading import Lock +from collections import defaultdict try: import cchardet as chardet @@ -245,6 +247,12 @@ class Soledad(object): Prefix for default values for path. """ + syncing_lock = defaultdict(Lock) + """ + A dictionary that hold locks which avoid multiple sync attempts from the + same database replica. + """ + def __init__(self, uuid, passphrase, secrets_path, local_db_path, server_url, cert_file, auth_token=None, secret_id=None): """ @@ -1063,6 +1071,9 @@ class Soledad(object): """ Synchronize the local encrypted replica with a remote replica. + This method blocks until a syncing lock is acquired, so there are no + attempts of concurrent syncs from the same client replica. + :param url: the url of the target replica to sync with :type url: str @@ -1071,11 +1082,13 @@ class Soledad(object): :rtype: str """ if self._db: - local_gen = self._db.sync( - urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), - creds=self._creds, autocreate=True) - signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) - return local_gen + # acquire lock before attempt to sync + with Soledad.syncing_lock[self._db._get_replica_uid()]: + local_gen = self._db.sync( + urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), + creds=self._creds, autocreate=True) + signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) + return local_gen def need_sync(self, url): """ diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index a0c473b1..8b001859 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -337,3 +337,46 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): self.assertEqual( 1, len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_concurrent_syncs_do_not_fail(self): + """ + Assert that concurrent attempts to sync end up being executed + sequentially and do not fail. + """ + threads = [] + docs = [] + pool = threading.BoundedSemaphore(value=1) + self.startServer() + sol = self._soledad_instance( + auth_token='auth-token', + server_url=self.getURL()) + + def _run_method(self): + # create a lot of documents + doc = self._params['sol'].create_doc({}) + # do the sync! + sol.sync() + pool.acquire() + docs.append(doc.doc_id) + pool.release() + + # launch threads to create documents in parallel + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread( + {'sol': sol, 'syncs': i}, + _run_method) + thread.start() + threads.append(thread) + + # wait for threads to finish + for thread in threads: + thread.join() + + transaction_log = self.db._get_transaction_log() + self.assertEqual(REPEAT_TIMES, len(transaction_log)) + # assert all documents are in the remote log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) -- cgit v1.2.3