summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/changes/feature_5008_parallelize-get_docs-on-couch-backend1
-rw-r--r--common/src/leap/soledad/common/couch.py78
-rw-r--r--scripts/db_access/client_side_db.py13
3 files changed, 90 insertions, 2 deletions
diff --git a/common/changes/feature_5008_parallelize-get_docs-on-couch-backend b/common/changes/feature_5008_parallelize-get_docs-on-couch-backend
new file mode 100644
index 00000000..d452c888
--- /dev/null
+++ b/common/changes/feature_5008_parallelize-get_docs-on-couch-backend
@@ -0,0 +1 @@
+ o Parallelize get_docs() on couch backend to accelerate sync (#5008).
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index d16563d3..a4f30cf3 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -26,6 +26,7 @@ import binascii
import socket
import time
import sys
+import threading
from StringIO import StringIO
@@ -334,6 +335,47 @@ class CouchDatabase(CommonBackend):
A U1DB implementation that uses CouchDB as its persistence layer.
"""
+ # We spawn threads to parallelize the CouchDatabase.get_docs() method
+ MAX_GET_DOCS_THREADS = 20
+
+ class _GetDocThread(threading.Thread):
+ """
+ A thread that gets a document from a database.
+
+ TODO: switch this for a twisted deferred to thread. This depends on
+ replacing python-couchdb for paisley in this module.
+ """
+
+ def __init__(self, db, doc_id, check_for_conflicts,
+ release_fun):
+ """
+ :param db: The database from where to get the document.
+ :type db: u1db.Database
+ :param doc_id: The doc_id of the document to be retrieved.
+ :type doc_id: str
+ :param check_for_conflicts: Whether the get_doc() method should
+ check for existing conflicts.
+ :type check_for_conflicts: bool
+ :param release_fun: A function that releases a semaphore, to be
+ called after the document is fetched.
+ :type release_fun: function
+ """
+ threading.Thread.__init__(self)
+ self._db = db
+ self._doc_id = doc_id
+ self._check_for_conflicts = check_for_conflicts
+ self._release_fun = release_fun
+ self._doc = None
+
+ def run(self):
+ """
+ Fetch the document, store it as a property, and call the release
+ function.
+ """
+ self._doc = self._db._get_doc(
+ self._doc_id, self._check_for_conflicts)
+ self._release_fun()
+
@classmethod
def open_database(cls, url, create):
"""
@@ -401,6 +443,9 @@ class CouchDatabase(CommonBackend):
self._set_replica_uid(replica_uid)
if ensure_ddocs:
self.ensure_ddocs_on_db()
+ # initialize a thread pool for parallelizing get_docs()
+ self._sem_pool = threading.BoundedSemaphore(
+ value=self.MAX_GET_DOCS_THREADS)
def ensure_ddocs_on_db(self):
"""
@@ -1331,6 +1376,39 @@ class CouchDatabase(CommonBackend):
old_doc.has_conflicts = doc.has_conflicts
return state, self._get_generation()
+ def get_docs(self, doc_ids, check_for_conflicts=True,
+ include_deleted=False):
+ """
+ Get the JSON content for many documents.
+
+ :param doc_ids: A list of document identifiers.
+ :type doc_ids: list
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped, and 'None' will be
+ returned instead of True/False.
+ :type check_for_conflictsa: bool
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :return: iterable giving the Document object for each document id
+ in matching doc_ids order.
+ :rtype: iterable
+ """
+ # spawn threads to retrieve docs
+ threads = []
+ for doc_id in doc_ids:
+ self._sem_pool.acquire()
+ t = self._GetDocThread(self, doc_id, check_for_conflicts,
+ self._sem_pool.release)
+ t.start()
+ threads.append(t)
+ # join threads and yield docs
+ for t in threads:
+ t.join()
+ if t._doc.is_tombstone() and not include_deleted:
+ continue
+ yield t._doc
+
class CouchSyncTarget(CommonSyncTarget):
"""
diff --git a/scripts/db_access/client_side_db.py b/scripts/db_access/client_side_db.py
index 15980f5d..2bf4ab5e 100644
--- a/scripts/db_access/client_side_db.py
+++ b/scripts/db_access/client_side_db.py
@@ -13,17 +13,24 @@ import requests
import json
import srp._pysrp as srp
import binascii
+import logging
from leap.common.config import get_path_prefix
from leap.soledad.client import Soledad
+# create a logger
+logger = logging.getLogger(__name__)
+LOG_FORMAT = '%(asctime)s %(message)s'
+logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
+
+
safe_unhexlify = lambda x: binascii.unhexlify(x) if (
len(x) % 2 == 0) else binascii.unhexlify('0' + x)
def fail(reason):
- print 'Fail: ' + reason
+ logger.error('Fail: ' + reason)
exit(2)
@@ -94,6 +101,8 @@ def get_soledad_instance(username, provider, passphrase, basedir):
# setup soledad info
uuid, server_url, cert_file, token = \
get_soledad_info(username, provider, passphrase, basedir)
+ logger.info('UUID is %s' % uuid)
+ logger.info('Server URL is %s' % server_url)
secrets_path = os.path.join(
basedir, '%s.secret' % uuid)
local_db_path = os.path.join(
@@ -138,7 +147,7 @@ if __name__ == '__main__':
basedir = args.basedir
if basedir is None:
basedir = tempfile.mkdtemp()
- print 'Using %s as base directory.' % basedir
+ logger.info('Using %s as base directory.' % basedir)
# get the soledad instance
s = get_soledad_instance(