summaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/changes/feature_5008_parallelize-get_docs-on-couch-backend1
-rw-r--r--common/src/leap/soledad/common/couch.py78
2 files changed, 79 insertions, 0 deletions
diff --git a/common/changes/feature_5008_parallelize-get_docs-on-couch-backend b/common/changes/feature_5008_parallelize-get_docs-on-couch-backend
new file mode 100644
index 00000000..d452c888
--- /dev/null
+++ b/common/changes/feature_5008_parallelize-get_docs-on-couch-backend
@@ -0,0 +1 @@
+ o Parallelize get_docs() on couch backend to accelerate sync (#5008).
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index d16563d3..a4f30cf3 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -26,6 +26,7 @@ import binascii
import socket
import time
import sys
+import threading
from StringIO import StringIO
@@ -334,6 +335,47 @@ class CouchDatabase(CommonBackend):
A U1DB implementation that uses CouchDB as its persistence layer.
"""
+ # We spawn threads to parallelize the CouchDatabase.get_docs() method
+ MAX_GET_DOCS_THREADS = 20
+
+ class _GetDocThread(threading.Thread):
+ """
+ A thread that gets a document from a database.
+
+ TODO: switch this for a twisted deferred to thread. This depends on
+ replacing python-couchdb for paisley in this module.
+ """
+
+ def __init__(self, db, doc_id, check_for_conflicts,
+ release_fun):
+ """
+ :param db: The database from where to get the document.
+ :type db: u1db.Database
+ :param doc_id: The doc_id of the document to be retrieved.
+ :type doc_id: str
+ :param check_for_conflicts: Whether the get_doc() method should
+ check for existing conflicts.
+ :type check_for_conflicts: bool
+ :param release_fun: A function that releases a semaphore, to be
+ called after the document is fetched.
+ :type release_fun: function
+ """
+ threading.Thread.__init__(self)
+ self._db = db
+ self._doc_id = doc_id
+ self._check_for_conflicts = check_for_conflicts
+ self._release_fun = release_fun
+ self._doc = None
+
+ def run(self):
+ """
+ Fetch the document, store it as a property, and call the release
+ function.
+ """
+ self._doc = self._db._get_doc(
+ self._doc_id, self._check_for_conflicts)
+ self._release_fun()
+
@classmethod
def open_database(cls, url, create):
"""
@@ -401,6 +443,9 @@ class CouchDatabase(CommonBackend):
self._set_replica_uid(replica_uid)
if ensure_ddocs:
self.ensure_ddocs_on_db()
+ # initialize a thread pool for parallelizing get_docs()
+ self._sem_pool = threading.BoundedSemaphore(
+ value=self.MAX_GET_DOCS_THREADS)
def ensure_ddocs_on_db(self):
"""
@@ -1331,6 +1376,39 @@ class CouchDatabase(CommonBackend):
old_doc.has_conflicts = doc.has_conflicts
return state, self._get_generation()
+ def get_docs(self, doc_ids, check_for_conflicts=True,
+ include_deleted=False):
+ """
+ Get the JSON content for many documents.
+
+ :param doc_ids: A list of document identifiers.
+ :type doc_ids: list
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped, and 'None' will be
+ returned instead of True/False.
+ :type check_for_conflictsa: bool
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :return: iterable giving the Document object for each document id
+ in matching doc_ids order.
+ :rtype: iterable
+ """
+ # spawn threads to retrieve docs
+ threads = []
+ for doc_id in doc_ids:
+ self._sem_pool.acquire()
+ t = self._GetDocThread(self, doc_id, check_for_conflicts,
+ self._sem_pool.release)
+ t.start()
+ threads.append(t)
+ # join threads and yield docs
+ for t in threads:
+ t.join()
+ if t._doc.is_tombstone() and not include_deleted:
+ continue
+ yield t._doc
+
class CouchSyncTarget(CommonSyncTarget):
"""