diff options
3 files changed, 63 insertions, 5 deletions
| diff --git a/client/changes/feature_4451_avoid_concurrent_syncs b/client/changes/feature_4451_avoid_concurrent_syncs new file mode 100644 index 00000000..04a2c4df --- /dev/null +++ b/client/changes/feature_4451_avoid_concurrent_syncs @@ -0,0 +1,2 @@ +  o Avoid concurrent syncs for the same account, but allow for distinct +    accounts (4451). diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index a0b3f45a..d35d3a2a 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -34,6 +34,8 @@ import urlparse  import hmac  from hashlib import sha256 +from threading import Lock +from collections import defaultdict  try:      import cchardet as chardet @@ -245,6 +247,12 @@ class Soledad(object):      Prefix for default values for path.      """ +    syncing_lock = defaultdict(Lock) +    """ +    A dictionary that hold locks which avoid multiple sync attempts from the +    same database replica. +    """ +      def __init__(self, uuid, passphrase, secrets_path, local_db_path,                   server_url, cert_file, auth_token=None, secret_id=None):          """ @@ -1063,6 +1071,9 @@ class Soledad(object):          """          Synchronize the local encrypted replica with a remote replica. +        This method blocks until a syncing lock is acquired, so there are no +        attempts of concurrent syncs from the same client replica. +          :param url: the url of the target replica to sync with          :type url: str @@ -1071,11 +1082,13 @@ class Soledad(object):          :rtype: str          """          if self._db: -            local_gen = self._db.sync( -                urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), -                creds=self._creds, autocreate=True) -            signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) -            return local_gen +            # acquire lock before attempt to sync +            with Soledad.syncing_lock[self._db._get_replica_uid()]: +                local_gen = self._db.sync( +                    urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), +                    creds=self._creds, autocreate=True) +                signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) +                return local_gen      def need_sync(self, url):          """ diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index a0c473b1..8b001859 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -337,3 +337,46 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):              self.assertEqual(                  1,                  len(filter(lambda t: t[0] == doc_id, transaction_log))) + +    def test_concurrent_syncs_do_not_fail(self): +        """ +        Assert that concurrent attempts to sync end up being executed +        sequentially and do not fail. +        """ +        threads = [] +        docs = [] +        pool = threading.BoundedSemaphore(value=1) +        self.startServer() +        sol = self._soledad_instance( +            auth_token='auth-token', +            server_url=self.getURL()) + +        def _run_method(self): +            # create a lot of documents +            doc = self._params['sol'].create_doc({}) +            # do the sync! +            sol.sync() +            pool.acquire() +            docs.append(doc.doc_id) +            pool.release() + +        # launch threads to create documents in parallel +        for i in range(0, REPEAT_TIMES): +            thread = self._WorkerThread( +                {'sol': sol, 'syncs': i}, +                _run_method) +            thread.start() +            threads.append(thread) + +        # wait for threads to finish +        for thread in threads: +            thread.join() + +        transaction_log = self.db._get_transaction_log() +        self.assertEqual(REPEAT_TIMES, len(transaction_log)) +        # assert all documents are in the remote log +        self.assertEqual(REPEAT_TIMES, len(docs)) +        for doc_id in docs: +            self.assertEqual( +                1, +                len(filter(lambda t: t[0] == doc_id, transaction_log))) | 
