summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/changes/feature_5388_avoid-concurrent-syncs-update-handler-conflict2
-rw-r--r--common/src/leap/soledad/common/couch.py20
-rw-r--r--scripts/profiling/sync/sync-many.py125
3 files changed, 139 insertions, 8 deletions
diff --git a/common/changes/feature_5388_avoid-concurrent-syncs-update-handler-conflict b/common/changes/feature_5388_avoid-concurrent-syncs-update-handler-conflict
new file mode 100644
index 00000000..d0c820db
--- /dev/null
+++ b/common/changes/feature_5388_avoid-concurrent-syncs-update-handler-conflict
@@ -0,0 +1,2 @@
+ o Avoid concurrent syncs problem by adding a lock for PUTting to the sync
+ log update handler (#5388).
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index c0ebc425..ebe8477f 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -30,6 +30,7 @@ import threading
from StringIO import StringIO
+from collections import defaultdict
from couchdb.client import Server
@@ -338,6 +339,8 @@ class CouchDatabase(CommonBackend):
# We spawn threads to parallelize the CouchDatabase.get_docs() method
MAX_GET_DOCS_THREADS = 20
+ update_handler_lock = defaultdict(threading.Lock)
+
class _GetDocThread(threading.Thread):
"""
A thread that gets a document from a database.
@@ -1133,13 +1136,14 @@ class CouchDatabase(CommonBackend):
ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log']
res = self._database.resource(*ddoc_path)
try:
- res.put_json(
- body={
- 'other_replica_uid': other_replica_uid,
- 'other_generation': other_generation,
- 'other_transaction_id': other_transaction_id,
- },
- headers={'content-type': 'application/json'})
+ with CouchDatabase.update_handler_lock[self._get_replica_uid()]:
+ res.put_json(
+ body={
+ 'other_replica_uid': other_replica_uid,
+ 'other_generation': other_generation,
+ 'other_transaction_id': other_transaction_id,
+ },
+ headers={'content-type': 'application/json'})
except ResourceNotFound as e:
raise_missing_design_doc_error(e, ddoc_path)
@@ -1367,7 +1371,7 @@ class CouchDatabase(CommonBackend):
if save_conflict:
self._force_doc_sync_conflict(doc)
if replica_uid is not None and replica_gen is not None:
- self._do_set_replica_gen_and_trans_id(
+ self._set_replica_gen_and_trans_id(
replica_uid, replica_gen, replica_trans_id)
# update info
old_doc.rev = doc.rev
diff --git a/scripts/profiling/sync/sync-many.py b/scripts/profiling/sync/sync-many.py
new file mode 100644
index 00000000..3666df2c
--- /dev/null
+++ b/scripts/profiling/sync/sync-many.py
@@ -0,0 +1,125 @@
+#!/usr/bin/python
+
+# The purpose of this script is to stress a soledad server by:
+#
+# - Instantiating multiple clients.
+# - Creating many documents in each client.
+# - Syncing all at the same time with th server multiple times, until
+# they've all reached an agreement on the state of the databases and
+# there's nothing else to be synced.
+
+
+import threading
+import tempfile
+import argparse
+import logging
+import re
+import getpass
+import time
+import shutil
+
+
+from client_side_db import get_soledad_instance
+
+
+from leap.soledad.client import BootstrapSequenceError
+
+
+NUMBER_OF_REPLICAS = 5
+DOCUMENTS_PER_REPLICA = 10
+
+
+# create a logger
+logger = logging.getLogger(__name__)
+LOG_FORMAT = '%(asctime)s %(message)s'
+logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
+
+
+class WorkerThread(threading.Thread):
+
+ def __init__(self, thread_id, soledad, all_set):
+ threading.Thread.__init__(self)
+ self._id = thread_id
+ self._soledad = soledad
+ self._all_set = all_set
+ self._done_creating = threading.Event()
+
+ def run(self):
+ # create many documents
+ logger.info('[replica %d] creating documents...' % self._id)
+ for i in xrange(DOCUMENTS_PER_REPLICA):
+ self._soledad.create_doc({'a_doc': i})
+ # wait for others
+ self._done_creating.set()
+ logger.info('[replica %d] done creating documents.' % self._id)
+ self._all_set.wait()
+ # sync
+ successes = 0
+ while True:
+ logger.info('[replica %d] syncing.' % self._id)
+ if self._id == 1:
+ time.sleep(5)
+ old_gen = self._soledad.sync()
+ logger.info('[replica %d] synced.' % self._id)
+ new_gen = self._soledad._db._get_generation()
+ logger.info('[replica %d] old gen %d - new gen %d.' %
+ (self._id, old_gen, new_gen))
+ if old_gen == new_gen:
+ successes += 1
+ logger.info('[replica %d] sync not needed.' % self._id)
+ if successes == 3:
+ break
+
+
+def stress_test(username, provider, passphrase, basedir):
+ threads = []
+ all_set = threading.Event()
+ for i in xrange(NUMBER_OF_REPLICAS):
+ logging.info('[main] starting thread %d.' % i)
+ s = get_soledad_instance(
+ username,
+ provider,
+ passphrase,
+ tempfile.mkdtemp(dir=basedir))
+ t = WorkerThread(i, s, all_set)
+ t.start()
+ threads.append(t)
+ map(lambda t: t._done_creating.wait(), threads)
+ all_set.set()
+ map(lambda t: t.join(), threads)
+ logger.info('Removing dir %s' % basedir)
+ shutil.rmtree(basedir)
+
+
+# main program
+
+if __name__ == '__main__':
+
+ class ValidateUserHandle(argparse.Action):
+ def __call__(self, parser, namespace, values, option_string=None):
+ m = re.compile('^([^@]+)@([^@]+\.[^@]+)$')
+ res = m.match(values)
+ if res == None:
+ parser.error('User handle should have the form user@provider.')
+ setattr(namespace, 'username', res.groups()[0])
+ setattr(namespace, 'provider', res.groups()[1])
+
+ # parse command line
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ 'user@provider', action=ValidateUserHandle, help='the user handle')
+ parser.add_argument(
+ '-b', dest='basedir', required=False, default=None, help='the user handle')
+ args = parser.parse_args()
+
+ # get the password
+ passphrase = getpass.getpass(
+ 'Password for %s@%s: ' % (args.username, args.provider))
+
+ # get the basedir
+ basedir = args.basedir
+ if basedir is None:
+ basedir = tempfile.mkdtemp()
+ logger.info('[main] using %s as base directory.' % basedir)
+
+ stress_test(args.username, args.provider, passphrase, basedir)