diff options
author | Victor Shyba <victor.shyba@gmail.com> | 2015-09-04 19:37:32 -0300 |
---|---|---|
committer | Victor Shyba <victor.shyba@gmail.com> | 2015-09-24 19:39:46 -0300 |
commit | d39b408e92bb2fa6a2dac3835e2f5209d2eee94c (patch) | |
tree | 69ddd418cd5aa5faa64841d64ce75d3645ce005f /common/src/leap/soledad | |
parent | 92b86079d34da5252b826d32afabafde84dab1af (diff) |
[refactor] uses ThreadPool instead of own implementation
Python has a native ThreadPool implementation that fits our needs.
Changing it to use this instead and making some calls simpler.
Diffstat (limited to 'common/src/leap/soledad')
-rw-r--r-- | common/src/leap/soledad/common/couch.py | 99 |
1 files changed, 18 insertions, 81 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 771db69e..421fbac1 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -26,13 +26,12 @@ import logging import binascii import time import sys -import threading from StringIO import StringIO -from collections import defaultdict from urlparse import urljoin from contextlib import contextmanager +from multiprocessing.pool import ThreadPool from couchdb.client import Server, Database @@ -362,57 +361,15 @@ def couch_server(url): yield server +THREAD_POOL = ThreadPool(20) + + class CouchDatabase(CommonBackend): """ A U1DB implementation that uses CouchDB as its persistence layer. """ - # We spawn threads to parallelize the CouchDatabase.get_docs() method - MAX_GET_DOCS_THREADS = 20 - - update_handler_lock = defaultdict(threading.Lock) - sync_info_lock = defaultdict(threading.Lock) - - class _GetDocThread(threading.Thread): - - """ - A thread that gets a document from a database. - - TODO: switch this for a twisted deferred to thread. This depends on - replacing python-couchdb for paisley in this module. - """ - - def __init__(self, db, doc_id, check_for_conflicts, - release_fun): - """ - :param db: The database from where to get the document. - :type db: CouchDatabase - :param doc_id: The doc_id of the document to be retrieved. - :type doc_id: str - :param check_for_conflicts: Whether the get_doc() method should - check for existing conflicts. - :type check_for_conflicts: bool - :param release_fun: A function that releases a semaphore, to be - called after the document is fetched. - :type release_fun: function - """ - threading.Thread.__init__(self) - self._db = db - self._doc_id = doc_id - self._check_for_conflicts = check_for_conflicts - self._release_fun = release_fun - self._doc = None - - def run(self): - """ - Fetch the document, store it as a property, and call the release - function. - """ - self._doc = self._db._get_doc( - self._doc_id, self._check_for_conflicts) - self._release_fun() - @classmethod def open_database(cls, url, create, replica_uid=None, ensure_ddocs=False): """ @@ -477,9 +434,6 @@ class CouchDatabase(CommonBackend): self._set_replica_uid(replica_uid) if ensure_ddocs: self.ensure_ddocs_on_db() - # initialize a thread pool for parallelizing get_docs() - self._sem_pool = threading.BoundedSemaphore( - value=self.MAX_GET_DOCS_THREADS) def ensure_ddocs_on_db(self): """ @@ -735,6 +689,9 @@ class CouchDatabase(CommonBackend): attachments=True)[2] except ResourceNotFound: return None + return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts) + + def __parse_doc_from_couch(self, result, doc_id, check_for_conflicts=False): # restrict to u1db documents if 'u1db_rev' not in result: return None @@ -799,11 +756,8 @@ class CouchDatabase(CommonBackend): """ generation = self._get_generation() - results = [] - for row in self._database.view('_all_docs'): - doc = self.get_doc(row.id, include_deleted=include_deleted) - if doc is not None: - results.append(doc) + results = list(self.get_docs(self._database, + include_deleted=include_deleted)) return (generation, results) def _put_doc(self, old_doc, doc): @@ -1335,12 +1289,12 @@ class CouchDatabase(CommonBackend): """ Get the JSON content for many documents. - :param doc_ids: A list of document identifiers. + :param doc_ids: A list of document identifiers or None for all. :type doc_ids: list :param check_for_conflicts: If set to False, then the conflict check will be skipped, and 'None' will be returned instead of True/False. - :type check_for_conflictsa: bool + :type check_for_conflicts: bool :param include_deleted: If set to True, deleted documents will be returned with empty content. Otherwise deleted documents will not be included in the results. @@ -1348,31 +1302,14 @@ class CouchDatabase(CommonBackend): in matching doc_ids order. :rtype: iterable """ - # Workaround for: - # - # http://bugs.python.org/issue7980 - # https://leap.se/code/issues/5449 - # - # python-couchdb uses time.strptime, which is not thread safe. In - # order to avoid the problem described on the issues above, we preload - # strptime here by evaluating the conversion of an arbitrary date. - # This will not be needed when/if we switch from python-couchdb to - # paisley. - time.strptime('Mar 8 1917', '%b %d %Y') - # spawn threads to retrieve docs - threads = [] - for doc_id in doc_ids: - self._sem_pool.acquire() - t = self._GetDocThread(self, doc_id, check_for_conflicts, - self._sem_pool.release) - t.start() - threads.append(t) - # join threads and yield docs - for t in threads: - t.join() - if t._doc.is_tombstone() and not include_deleted: + get_one = lambda doc_id: self._get_doc(doc_id, check_for_conflicts) + docs = [THREAD_POOL.apply_async(get_one, [doc_id]) + for doc_id in doc_ids] + for doc in docs: + doc = doc.get() + if not doc or not include_deleted and doc.is_tombstone(): continue - yield t._doc + yield doc def _prune_conflicts(self, doc, doc_vcr): """ |