diff options
| -rw-r--r-- | common/src/leap/soledad/common/couch.py | 99 | 
1 files changed, 18 insertions, 81 deletions
| diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 771db69e..421fbac1 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -26,13 +26,12 @@ import logging  import binascii  import time  import sys -import threading  from StringIO import StringIO -from collections import defaultdict  from urlparse import urljoin  from contextlib import contextmanager +from multiprocessing.pool import ThreadPool  from couchdb.client import Server, Database @@ -362,57 +361,15 @@ def couch_server(url):      yield server +THREAD_POOL = ThreadPool(20) + +  class CouchDatabase(CommonBackend):      """      A U1DB implementation that uses CouchDB as its persistence layer.      """ -    # We spawn threads to parallelize the CouchDatabase.get_docs() method -    MAX_GET_DOCS_THREADS = 20 - -    update_handler_lock = defaultdict(threading.Lock) -    sync_info_lock = defaultdict(threading.Lock) - -    class _GetDocThread(threading.Thread): - -        """ -        A thread that gets a document from a database. - -        TODO: switch this for a twisted deferred to thread. This depends on -        replacing python-couchdb for paisley in this module. -        """ - -        def __init__(self, db, doc_id, check_for_conflicts, -                     release_fun): -            """ -            :param db: The database from where to get the document. -            :type db: CouchDatabase -            :param doc_id: The doc_id of the document to be retrieved. -            :type doc_id: str -            :param check_for_conflicts: Whether the get_doc() method should -                                        check for existing conflicts. -            :type check_for_conflicts: bool -            :param release_fun: A function that releases a semaphore, to be -                                called after the document is fetched. -            :type release_fun: function -            """ -            threading.Thread.__init__(self) -            self._db = db -            self._doc_id = doc_id -            self._check_for_conflicts = check_for_conflicts -            self._release_fun = release_fun -            self._doc = None - -        def run(self): -            """ -            Fetch the document, store it as a property, and call the release -            function. -            """ -            self._doc = self._db._get_doc( -                self._doc_id, self._check_for_conflicts) -            self._release_fun() -      @classmethod      def open_database(cls, url, create, replica_uid=None, ensure_ddocs=False):          """ @@ -477,9 +434,6 @@ class CouchDatabase(CommonBackend):              self._set_replica_uid(replica_uid)          if ensure_ddocs:              self.ensure_ddocs_on_db() -        # initialize a thread pool for parallelizing get_docs() -        self._sem_pool = threading.BoundedSemaphore( -            value=self.MAX_GET_DOCS_THREADS)      def ensure_ddocs_on_db(self):          """ @@ -735,6 +689,9 @@ class CouchDatabase(CommonBackend):                      attachments=True)[2]          except ResourceNotFound:              return None +        return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts) + +    def __parse_doc_from_couch(self, result, doc_id, check_for_conflicts=False):          # restrict to u1db documents          if 'u1db_rev' not in result:              return None @@ -799,11 +756,8 @@ class CouchDatabase(CommonBackend):          """          generation = self._get_generation() -        results = [] -        for row in self._database.view('_all_docs'): -            doc = self.get_doc(row.id, include_deleted=include_deleted) -            if doc is not None: -                results.append(doc) +        results = list(self.get_docs(self._database, +                                     include_deleted=include_deleted))          return (generation, results)      def _put_doc(self, old_doc, doc): @@ -1335,12 +1289,12 @@ class CouchDatabase(CommonBackend):          """          Get the JSON content for many documents. -        :param doc_ids: A list of document identifiers. +        :param doc_ids: A list of document identifiers or None for all.          :type doc_ids: list          :param check_for_conflicts: If set to False, then the conflict check                                      will be skipped, and 'None' will be                                      returned instead of True/False. -        :type check_for_conflictsa: bool +        :type check_for_conflicts: bool          :param include_deleted: If set to True, deleted documents will be                                  returned with empty content. Otherwise deleted                                  documents will not be included in the results. @@ -1348,31 +1302,14 @@ class CouchDatabase(CommonBackend):                   in matching doc_ids order.          :rtype: iterable          """ -        # Workaround for: -        # -        #   http://bugs.python.org/issue7980 -        #   https://leap.se/code/issues/5449 -        # -        # python-couchdb uses time.strptime, which is not thread safe. In -        # order to avoid the problem described on the issues above, we preload -        # strptime here by evaluating the conversion of an arbitrary date. -        # This will not be needed when/if we switch from python-couchdb to -        # paisley. -        time.strptime('Mar 8 1917', '%b %d %Y') -        # spawn threads to retrieve docs -        threads = [] -        for doc_id in doc_ids: -            self._sem_pool.acquire() -            t = self._GetDocThread(self, doc_id, check_for_conflicts, -                                   self._sem_pool.release) -            t.start() -            threads.append(t) -        # join threads and yield docs -        for t in threads: -            t.join() -            if t._doc.is_tombstone() and not include_deleted: +        get_one = lambda doc_id: self._get_doc(doc_id, check_for_conflicts) +        docs = [THREAD_POOL.apply_async(get_one, [doc_id]) +                for doc_id in doc_ids] +        for doc in docs: +            doc = doc.get() +            if not doc or not include_deleted and doc.is_tombstone():                  continue -            yield t._doc +            yield doc      def _prune_conflicts(self, doc, doc_vcr):          """ | 
