diff options
-rw-r--r-- | common/changes/feature_5388_avoid-concurrent-syncs-update-handler-conflict | 2 | ||||
-rw-r--r-- | common/src/leap/soledad/common/couch.py | 20 | ||||
-rw-r--r-- | scripts/profiling/sync/sync-many.py | 125 |
3 files changed, 139 insertions, 8 deletions
diff --git a/common/changes/feature_5388_avoid-concurrent-syncs-update-handler-conflict b/common/changes/feature_5388_avoid-concurrent-syncs-update-handler-conflict new file mode 100644 index 00000000..d0c820db --- /dev/null +++ b/common/changes/feature_5388_avoid-concurrent-syncs-update-handler-conflict @@ -0,0 +1,2 @@ + o Avoid concurrent syncs problem by adding a lock for PUTting to the sync + log update handler (#5388). diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index c0ebc425..ebe8477f 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -30,6 +30,7 @@ import threading from StringIO import StringIO +from collections import defaultdict from couchdb.client import Server @@ -338,6 +339,8 @@ class CouchDatabase(CommonBackend): # We spawn threads to parallelize the CouchDatabase.get_docs() method MAX_GET_DOCS_THREADS = 20 + update_handler_lock = defaultdict(threading.Lock) + class _GetDocThread(threading.Thread): """ A thread that gets a document from a database. @@ -1133,13 +1136,14 @@ class CouchDatabase(CommonBackend): ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log'] res = self._database.resource(*ddoc_path) try: - res.put_json( - body={ - 'other_replica_uid': other_replica_uid, - 'other_generation': other_generation, - 'other_transaction_id': other_transaction_id, - }, - headers={'content-type': 'application/json'}) + with CouchDatabase.update_handler_lock[self._get_replica_uid()]: + res.put_json( + body={ + 'other_replica_uid': other_replica_uid, + 'other_generation': other_generation, + 'other_transaction_id': other_transaction_id, + }, + headers={'content-type': 'application/json'}) except ResourceNotFound as e: raise_missing_design_doc_error(e, ddoc_path) @@ -1367,7 +1371,7 @@ class CouchDatabase(CommonBackend): if save_conflict: self._force_doc_sync_conflict(doc) if replica_uid is not None and replica_gen is not None: - self._do_set_replica_gen_and_trans_id( + self._set_replica_gen_and_trans_id( replica_uid, replica_gen, replica_trans_id) # update info old_doc.rev = doc.rev diff --git a/scripts/profiling/sync/sync-many.py b/scripts/profiling/sync/sync-many.py new file mode 100644 index 00000000..3666df2c --- /dev/null +++ b/scripts/profiling/sync/sync-many.py @@ -0,0 +1,125 @@ +#!/usr/bin/python + +# The purpose of this script is to stress a soledad server by: +# +# - Instantiating multiple clients. +# - Creating many documents in each client. +# - Syncing all at the same time with th server multiple times, until +# they've all reached an agreement on the state of the databases and +# there's nothing else to be synced. + + +import threading +import tempfile +import argparse +import logging +import re +import getpass +import time +import shutil + + +from client_side_db import get_soledad_instance + + +from leap.soledad.client import BootstrapSequenceError + + +NUMBER_OF_REPLICAS = 5 +DOCUMENTS_PER_REPLICA = 10 + + +# create a logger +logger = logging.getLogger(__name__) +LOG_FORMAT = '%(asctime)s %(message)s' +logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) + + +class WorkerThread(threading.Thread): + + def __init__(self, thread_id, soledad, all_set): + threading.Thread.__init__(self) + self._id = thread_id + self._soledad = soledad + self._all_set = all_set + self._done_creating = threading.Event() + + def run(self): + # create many documents + logger.info('[replica %d] creating documents...' % self._id) + for i in xrange(DOCUMENTS_PER_REPLICA): + self._soledad.create_doc({'a_doc': i}) + # wait for others + self._done_creating.set() + logger.info('[replica %d] done creating documents.' % self._id) + self._all_set.wait() + # sync + successes = 0 + while True: + logger.info('[replica %d] syncing.' % self._id) + if self._id == 1: + time.sleep(5) + old_gen = self._soledad.sync() + logger.info('[replica %d] synced.' % self._id) + new_gen = self._soledad._db._get_generation() + logger.info('[replica %d] old gen %d - new gen %d.' % + (self._id, old_gen, new_gen)) + if old_gen == new_gen: + successes += 1 + logger.info('[replica %d] sync not needed.' % self._id) + if successes == 3: + break + + +def stress_test(username, provider, passphrase, basedir): + threads = [] + all_set = threading.Event() + for i in xrange(NUMBER_OF_REPLICAS): + logging.info('[main] starting thread %d.' % i) + s = get_soledad_instance( + username, + provider, + passphrase, + tempfile.mkdtemp(dir=basedir)) + t = WorkerThread(i, s, all_set) + t.start() + threads.append(t) + map(lambda t: t._done_creating.wait(), threads) + all_set.set() + map(lambda t: t.join(), threads) + logger.info('Removing dir %s' % basedir) + shutil.rmtree(basedir) + + +# main program + +if __name__ == '__main__': + + class ValidateUserHandle(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + m = re.compile('^([^@]+)@([^@]+\.[^@]+)$') + res = m.match(values) + if res == None: + parser.error('User handle should have the form user@provider.') + setattr(namespace, 'username', res.groups()[0]) + setattr(namespace, 'provider', res.groups()[1]) + + # parse command line + parser = argparse.ArgumentParser() + parser.add_argument( + 'user@provider', action=ValidateUserHandle, help='the user handle') + parser.add_argument( + '-b', dest='basedir', required=False, default=None, help='the user handle') + args = parser.parse_args() + + # get the password + passphrase = getpass.getpass( + 'Password for %s@%s: ' % (args.username, args.provider)) + + # get the basedir + basedir = args.basedir + if basedir is None: + basedir = tempfile.mkdtemp() + logger.info('[main] using %s as base directory.' % basedir) + + stress_test(args.username, args.provider, passphrase, basedir) |