diff options
| -rw-r--r-- | common/changes/feature_5388_avoid-concurrent-syncs-update-handler-conflict | 2 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/couch.py | 20 | ||||
| -rw-r--r-- | scripts/profiling/sync/sync-many.py | 125 | 
3 files changed, 139 insertions, 8 deletions
| diff --git a/common/changes/feature_5388_avoid-concurrent-syncs-update-handler-conflict b/common/changes/feature_5388_avoid-concurrent-syncs-update-handler-conflict new file mode 100644 index 00000000..d0c820db --- /dev/null +++ b/common/changes/feature_5388_avoid-concurrent-syncs-update-handler-conflict @@ -0,0 +1,2 @@ +  o Avoid concurrent syncs problem by adding a lock for PUTting to the sync +    log update handler (#5388). diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index c0ebc425..ebe8477f 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -30,6 +30,7 @@ import threading  from StringIO import StringIO +from collections import defaultdict  from couchdb.client import Server @@ -338,6 +339,8 @@ class CouchDatabase(CommonBackend):      # We spawn threads to parallelize the CouchDatabase.get_docs() method      MAX_GET_DOCS_THREADS = 20 +    update_handler_lock = defaultdict(threading.Lock) +      class _GetDocThread(threading.Thread):          """          A thread that gets a document from a database. @@ -1133,13 +1136,14 @@ class CouchDatabase(CommonBackend):          ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log']          res = self._database.resource(*ddoc_path)          try: -            res.put_json( -                body={ -                    'other_replica_uid': other_replica_uid, -                    'other_generation': other_generation, -                    'other_transaction_id': other_transaction_id, -                }, -                headers={'content-type': 'application/json'}) +            with CouchDatabase.update_handler_lock[self._get_replica_uid()]: +                res.put_json( +                    body={ +                        'other_replica_uid': other_replica_uid, +                        'other_generation': other_generation, +                        'other_transaction_id': other_transaction_id, +                    }, +                    headers={'content-type': 'application/json'})          except ResourceNotFound as e:              raise_missing_design_doc_error(e, ddoc_path) @@ -1367,7 +1371,7 @@ class CouchDatabase(CommonBackend):              if save_conflict:                  self._force_doc_sync_conflict(doc)          if replica_uid is not None and replica_gen is not None: -            self._do_set_replica_gen_and_trans_id( +            self._set_replica_gen_and_trans_id(                  replica_uid, replica_gen, replica_trans_id)          # update info          old_doc.rev = doc.rev diff --git a/scripts/profiling/sync/sync-many.py b/scripts/profiling/sync/sync-many.py new file mode 100644 index 00000000..3666df2c --- /dev/null +++ b/scripts/profiling/sync/sync-many.py @@ -0,0 +1,125 @@ +#!/usr/bin/python + +# The purpose of this script is to stress a soledad server by: +# +#   - Instantiating multiple clients. +#   - Creating many documents in each client. +#   - Syncing all at the same time with th server multiple times, until +#     they've all reached an agreement on the state of the databases and +#     there's nothing else to be synced. + + +import threading +import tempfile +import argparse +import logging +import re +import getpass +import time +import shutil + + +from client_side_db import get_soledad_instance + + +from leap.soledad.client import BootstrapSequenceError + + +NUMBER_OF_REPLICAS = 5 +DOCUMENTS_PER_REPLICA = 10 + + +# create a logger +logger = logging.getLogger(__name__) +LOG_FORMAT = '%(asctime)s %(message)s' +logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) + + +class WorkerThread(threading.Thread): + +    def __init__(self, thread_id, soledad, all_set): +        threading.Thread.__init__(self) +        self._id = thread_id +        self._soledad = soledad +        self._all_set = all_set +        self._done_creating = threading.Event() + +    def run(self): +        # create many documents +        logger.info('[replica %d] creating documents...' % self._id) +        for i in xrange(DOCUMENTS_PER_REPLICA): +            self._soledad.create_doc({'a_doc': i}) +        # wait for others +        self._done_creating.set() +        logger.info('[replica %d] done creating documents.' % self._id) +        self._all_set.wait() +        # sync +        successes = 0 +        while True: +            logger.info('[replica %d] syncing.' % self._id) +            if self._id == 1: +                time.sleep(5) +            old_gen = self._soledad.sync() +            logger.info('[replica %d] synced.' % self._id) +            new_gen = self._soledad._db._get_generation() +            logger.info('[replica %d] old gen %d - new gen %d.' % +                (self._id, old_gen, new_gen)) +            if old_gen == new_gen: +                successes += 1 +                logger.info('[replica %d] sync not needed.' % self._id) +                if successes == 3: +                    break + + +def stress_test(username, provider, passphrase, basedir): +    threads = [] +    all_set = threading.Event() +    for i in xrange(NUMBER_OF_REPLICAS): +        logging.info('[main] starting thread %d.' % i) +        s = get_soledad_instance( +            username, +            provider, +            passphrase, +            tempfile.mkdtemp(dir=basedir)) +        t = WorkerThread(i, s, all_set) +        t.start() +        threads.append(t) +    map(lambda t: t._done_creating.wait(), threads) +    all_set.set() +    map(lambda t: t.join(), threads) +    logger.info('Removing dir %s' % basedir) +    shutil.rmtree(basedir) + + +# main program + +if __name__ == '__main__': + +    class ValidateUserHandle(argparse.Action): +        def __call__(self, parser, namespace, values, option_string=None): +            m = re.compile('^([^@]+)@([^@]+\.[^@]+)$') +            res = m.match(values) +            if res == None: +                parser.error('User handle should have the form user@provider.') +            setattr(namespace, 'username', res.groups()[0]) +            setattr(namespace, 'provider', res.groups()[1]) + +    # parse command line +    parser = argparse.ArgumentParser() +    parser.add_argument( +        'user@provider', action=ValidateUserHandle, help='the user handle') +    parser.add_argument( +        '-b', dest='basedir', required=False, default=None, help='the user handle') +    args = parser.parse_args() + +    # get the password +    passphrase = getpass.getpass( +        'Password for %s@%s: ' % (args.username, args.provider)) + +    # get the basedir +    basedir = args.basedir +    if basedir is None: +        basedir = tempfile.mkdtemp() +    logger.info('[main] using %s as base directory.' % basedir) + +    stress_test(args.username, args.provider, passphrase, basedir) | 
