diff options
| author | Tomás Touceda <chiiph@leap.se> | 2014-03-17 12:14:29 -0300 | 
|---|---|---|
| committer | Tomás Touceda <chiiph@leap.se> | 2014-03-17 12:14:29 -0300 | 
| commit | ff63167fbb4391a063fdbc29903fdad5a48c55bc (patch) | |
| tree | 12fecfdccf58e496806e38e8c3bf0cd449fc2275 /common | |
| parent | e51d656532294fa1bdf2a78ccd8426ccb97dd07c (diff) | |
| parent | 35bf9f390db15e74ad4e37b6fc7fac9d6d7ca658 (diff) | |
Merge remote-tracking branch 'refs/remotes/drebs/feature/5008_parallelize-get_docs-on-couch-backend' into develop
Diffstat (limited to 'common')
| -rw-r--r-- | common/changes/feature_5008_parallelize-get_docs-on-couch-backend | 1 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/couch.py | 78 | 
2 files changed, 79 insertions, 0 deletions
| diff --git a/common/changes/feature_5008_parallelize-get_docs-on-couch-backend b/common/changes/feature_5008_parallelize-get_docs-on-couch-backend new file mode 100644 index 00000000..d452c888 --- /dev/null +++ b/common/changes/feature_5008_parallelize-get_docs-on-couch-backend @@ -0,0 +1 @@ +  o Parallelize get_docs() on couch backend to accelerate sync (#5008). diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index d16563d3..a4f30cf3 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -26,6 +26,7 @@ import binascii  import socket  import time  import sys +import threading  from StringIO import StringIO @@ -334,6 +335,47 @@ class CouchDatabase(CommonBackend):      A U1DB implementation that uses CouchDB as its persistence layer.      """ +    # We spawn threads to parallelize the CouchDatabase.get_docs() method +    MAX_GET_DOCS_THREADS = 20 + +    class _GetDocThread(threading.Thread): +        """ +        A thread that gets a document from a database. + +        TODO: switch this for a twisted deferred to thread. This depends on +        replacing python-couchdb for paisley in this module. +        """ + +        def __init__(self, db, doc_id, check_for_conflicts, +                     release_fun): +            """ +            :param db: The database from where to get the document. +            :type db: u1db.Database +            :param doc_id: The doc_id of the document to be retrieved. +            :type doc_id: str +            :param check_for_conflicts: Whether the get_doc() method should +                                        check for existing conflicts. +            :type check_for_conflicts: bool +            :param release_fun: A function that releases a semaphore, to be +                                called after the document is fetched. +            :type release_fun: function +            """ +            threading.Thread.__init__(self) +            self._db = db +            self._doc_id = doc_id +            self._check_for_conflicts = check_for_conflicts +            self._release_fun = release_fun +            self._doc = None + +        def run(self): +            """ +            Fetch the document, store it as a property, and call the release +            function. +            """ +            self._doc = self._db._get_doc( +                self._doc_id, self._check_for_conflicts) +            self._release_fun() +      @classmethod      def open_database(cls, url, create):          """ @@ -401,6 +443,9 @@ class CouchDatabase(CommonBackend):              self._set_replica_uid(replica_uid)          if ensure_ddocs:              self.ensure_ddocs_on_db() +        # initialize a thread pool for parallelizing get_docs() +        self._sem_pool = threading.BoundedSemaphore( +            value=self.MAX_GET_DOCS_THREADS)      def ensure_ddocs_on_db(self):          """ @@ -1331,6 +1376,39 @@ class CouchDatabase(CommonBackend):          old_doc.has_conflicts = doc.has_conflicts          return state, self._get_generation() +    def get_docs(self, doc_ids, check_for_conflicts=True, +                 include_deleted=False): +        """ +        Get the JSON content for many documents. + +        :param doc_ids: A list of document identifiers. +        :type doc_ids: list +        :param check_for_conflicts: If set to False, then the conflict check +                                    will be skipped, and 'None' will be +                                    returned instead of True/False. +        :type check_for_conflictsa: bool +        :param include_deleted: If set to True, deleted documents will be +                                returned with empty content. Otherwise deleted +                                documents will not be included in the results. +        :return: iterable giving the Document object for each document id +                 in matching doc_ids order. +        :rtype: iterable +        """ +        # spawn threads to retrieve docs +        threads = [] +        for doc_id in doc_ids: +            self._sem_pool.acquire() +            t = self._GetDocThread(self, doc_id, check_for_conflicts, +                                   self._sem_pool.release) +            t.start() +            threads.append(t) +        # join threads and yield docs +        for t in threads: +            t.join() +            if t._doc.is_tombstone() and not include_deleted: +                continue +            yield t._doc +  class CouchSyncTarget(CommonSyncTarget):      """ | 
