summaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorVictor Shyba <victor.shyba@gmail.com>2015-09-04 19:37:32 -0300
committerVictor Shyba <victor.shyba@gmail.com>2015-09-24 19:39:46 -0300
commitd39b408e92bb2fa6a2dac3835e2f5209d2eee94c (patch)
tree69ddd418cd5aa5faa64841d64ce75d3645ce005f /common
parent92b86079d34da5252b826d32afabafde84dab1af (diff)
[refactor] uses ThreadPool instead of own implementation
Python has a native ThreadPool implementation that fits our needs. Changing it to use this instead and making some calls simpler.
Diffstat (limited to 'common')
-rw-r--r--common/src/leap/soledad/common/couch.py99
1 files changed, 18 insertions, 81 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index 771db69e..421fbac1 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -26,13 +26,12 @@ import logging
import binascii
import time
import sys
-import threading
from StringIO import StringIO
-from collections import defaultdict
from urlparse import urljoin
from contextlib import contextmanager
+from multiprocessing.pool import ThreadPool
from couchdb.client import Server, Database
@@ -362,57 +361,15 @@ def couch_server(url):
yield server
+THREAD_POOL = ThreadPool(20)
+
+
class CouchDatabase(CommonBackend):
"""
A U1DB implementation that uses CouchDB as its persistence layer.
"""
- # We spawn threads to parallelize the CouchDatabase.get_docs() method
- MAX_GET_DOCS_THREADS = 20
-
- update_handler_lock = defaultdict(threading.Lock)
- sync_info_lock = defaultdict(threading.Lock)
-
- class _GetDocThread(threading.Thread):
-
- """
- A thread that gets a document from a database.
-
- TODO: switch this for a twisted deferred to thread. This depends on
- replacing python-couchdb for paisley in this module.
- """
-
- def __init__(self, db, doc_id, check_for_conflicts,
- release_fun):
- """
- :param db: The database from where to get the document.
- :type db: CouchDatabase
- :param doc_id: The doc_id of the document to be retrieved.
- :type doc_id: str
- :param check_for_conflicts: Whether the get_doc() method should
- check for existing conflicts.
- :type check_for_conflicts: bool
- :param release_fun: A function that releases a semaphore, to be
- called after the document is fetched.
- :type release_fun: function
- """
- threading.Thread.__init__(self)
- self._db = db
- self._doc_id = doc_id
- self._check_for_conflicts = check_for_conflicts
- self._release_fun = release_fun
- self._doc = None
-
- def run(self):
- """
- Fetch the document, store it as a property, and call the release
- function.
- """
- self._doc = self._db._get_doc(
- self._doc_id, self._check_for_conflicts)
- self._release_fun()
-
@classmethod
def open_database(cls, url, create, replica_uid=None, ensure_ddocs=False):
"""
@@ -477,9 +434,6 @@ class CouchDatabase(CommonBackend):
self._set_replica_uid(replica_uid)
if ensure_ddocs:
self.ensure_ddocs_on_db()
- # initialize a thread pool for parallelizing get_docs()
- self._sem_pool = threading.BoundedSemaphore(
- value=self.MAX_GET_DOCS_THREADS)
def ensure_ddocs_on_db(self):
"""
@@ -735,6 +689,9 @@ class CouchDatabase(CommonBackend):
attachments=True)[2]
except ResourceNotFound:
return None
+ return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts)
+
+ def __parse_doc_from_couch(self, result, doc_id, check_for_conflicts=False):
# restrict to u1db documents
if 'u1db_rev' not in result:
return None
@@ -799,11 +756,8 @@ class CouchDatabase(CommonBackend):
"""
generation = self._get_generation()
- results = []
- for row in self._database.view('_all_docs'):
- doc = self.get_doc(row.id, include_deleted=include_deleted)
- if doc is not None:
- results.append(doc)
+ results = list(self.get_docs(self._database,
+ include_deleted=include_deleted))
return (generation, results)
def _put_doc(self, old_doc, doc):
@@ -1335,12 +1289,12 @@ class CouchDatabase(CommonBackend):
"""
Get the JSON content for many documents.
- :param doc_ids: A list of document identifiers.
+ :param doc_ids: A list of document identifiers or None for all.
:type doc_ids: list
:param check_for_conflicts: If set to False, then the conflict check
will be skipped, and 'None' will be
returned instead of True/False.
- :type check_for_conflictsa: bool
+ :type check_for_conflicts: bool
:param include_deleted: If set to True, deleted documents will be
returned with empty content. Otherwise deleted
documents will not be included in the results.
@@ -1348,31 +1302,14 @@ class CouchDatabase(CommonBackend):
in matching doc_ids order.
:rtype: iterable
"""
- # Workaround for:
- #
- # http://bugs.python.org/issue7980
- # https://leap.se/code/issues/5449
- #
- # python-couchdb uses time.strptime, which is not thread safe. In
- # order to avoid the problem described on the issues above, we preload
- # strptime here by evaluating the conversion of an arbitrary date.
- # This will not be needed when/if we switch from python-couchdb to
- # paisley.
- time.strptime('Mar 8 1917', '%b %d %Y')
- # spawn threads to retrieve docs
- threads = []
- for doc_id in doc_ids:
- self._sem_pool.acquire()
- t = self._GetDocThread(self, doc_id, check_for_conflicts,
- self._sem_pool.release)
- t.start()
- threads.append(t)
- # join threads and yield docs
- for t in threads:
- t.join()
- if t._doc.is_tombstone() and not include_deleted:
+ get_one = lambda doc_id: self._get_doc(doc_id, check_for_conflicts)
+ docs = [THREAD_POOL.apply_async(get_one, [doc_id])
+ for doc_id in doc_ids]
+ for doc in docs:
+ doc = doc.get()
+ if not doc or not include_deleted and doc.is_tombstone():
continue
- yield t._doc
+ yield doc
def _prune_conflicts(self, doc, doc_vcr):
"""