diff options
-rw-r--r-- | common/changes/feature_5008_parallelize-get_docs-on-couch-backend | 1 | ||||
-rw-r--r-- | common/src/leap/soledad/common/couch.py | 78 | ||||
-rw-r--r-- | scripts/db_access/client_side_db.py | 13 |
3 files changed, 90 insertions, 2 deletions
diff --git a/common/changes/feature_5008_parallelize-get_docs-on-couch-backend b/common/changes/feature_5008_parallelize-get_docs-on-couch-backend new file mode 100644 index 00000000..d452c888 --- /dev/null +++ b/common/changes/feature_5008_parallelize-get_docs-on-couch-backend @@ -0,0 +1 @@ + o Parallelize get_docs() on couch backend to accelerate sync (#5008). diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index d16563d3..a4f30cf3 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -26,6 +26,7 @@ import binascii import socket import time import sys +import threading from StringIO import StringIO @@ -334,6 +335,47 @@ class CouchDatabase(CommonBackend): A U1DB implementation that uses CouchDB as its persistence layer. """ + # We spawn threads to parallelize the CouchDatabase.get_docs() method + MAX_GET_DOCS_THREADS = 20 + + class _GetDocThread(threading.Thread): + """ + A thread that gets a document from a database. + + TODO: switch this for a twisted deferred to thread. This depends on + replacing python-couchdb for paisley in this module. + """ + + def __init__(self, db, doc_id, check_for_conflicts, + release_fun): + """ + :param db: The database from where to get the document. + :type db: u1db.Database + :param doc_id: The doc_id of the document to be retrieved. + :type doc_id: str + :param check_for_conflicts: Whether the get_doc() method should + check for existing conflicts. + :type check_for_conflicts: bool + :param release_fun: A function that releases a semaphore, to be + called after the document is fetched. + :type release_fun: function + """ + threading.Thread.__init__(self) + self._db = db + self._doc_id = doc_id + self._check_for_conflicts = check_for_conflicts + self._release_fun = release_fun + self._doc = None + + def run(self): + """ + Fetch the document, store it as a property, and call the release + function. + """ + self._doc = self._db._get_doc( + self._doc_id, self._check_for_conflicts) + self._release_fun() + @classmethod def open_database(cls, url, create): """ @@ -401,6 +443,9 @@ class CouchDatabase(CommonBackend): self._set_replica_uid(replica_uid) if ensure_ddocs: self.ensure_ddocs_on_db() + # initialize a thread pool for parallelizing get_docs() + self._sem_pool = threading.BoundedSemaphore( + value=self.MAX_GET_DOCS_THREADS) def ensure_ddocs_on_db(self): """ @@ -1331,6 +1376,39 @@ class CouchDatabase(CommonBackend): old_doc.has_conflicts = doc.has_conflicts return state, self._get_generation() + def get_docs(self, doc_ids, check_for_conflicts=True, + include_deleted=False): + """ + Get the JSON content for many documents. + + :param doc_ids: A list of document identifiers. + :type doc_ids: list + :param check_for_conflicts: If set to False, then the conflict check + will be skipped, and 'None' will be + returned instead of True/False. + :type check_for_conflictsa: bool + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :return: iterable giving the Document object for each document id + in matching doc_ids order. + :rtype: iterable + """ + # spawn threads to retrieve docs + threads = [] + for doc_id in doc_ids: + self._sem_pool.acquire() + t = self._GetDocThread(self, doc_id, check_for_conflicts, + self._sem_pool.release) + t.start() + threads.append(t) + # join threads and yield docs + for t in threads: + t.join() + if t._doc.is_tombstone() and not include_deleted: + continue + yield t._doc + class CouchSyncTarget(CommonSyncTarget): """ diff --git a/scripts/db_access/client_side_db.py b/scripts/db_access/client_side_db.py index 15980f5d..2bf4ab5e 100644 --- a/scripts/db_access/client_side_db.py +++ b/scripts/db_access/client_side_db.py @@ -13,17 +13,24 @@ import requests import json import srp._pysrp as srp import binascii +import logging from leap.common.config import get_path_prefix from leap.soledad.client import Soledad +# create a logger +logger = logging.getLogger(__name__) +LOG_FORMAT = '%(asctime)s %(message)s' +logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) + + safe_unhexlify = lambda x: binascii.unhexlify(x) if ( len(x) % 2 == 0) else binascii.unhexlify('0' + x) def fail(reason): - print 'Fail: ' + reason + logger.error('Fail: ' + reason) exit(2) @@ -94,6 +101,8 @@ def get_soledad_instance(username, provider, passphrase, basedir): # setup soledad info uuid, server_url, cert_file, token = \ get_soledad_info(username, provider, passphrase, basedir) + logger.info('UUID is %s' % uuid) + logger.info('Server URL is %s' % server_url) secrets_path = os.path.join( basedir, '%s.secret' % uuid) local_db_path = os.path.join( @@ -138,7 +147,7 @@ if __name__ == '__main__': basedir = args.basedir if basedir is None: basedir = tempfile.mkdtemp() - print 'Using %s as base directory.' % basedir + logger.info('Using %s as base directory.' % basedir) # get the soledad instance s = get_soledad_instance( |