summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad/common/couch.py
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2015-09-27 21:49:00 +0200
committerRuben Pollan <meskio@sindominio.net>2015-09-27 21:49:00 +0200
commit1566ac3885d566d0b3d70a44eaf0bd901b2d655e (patch)
treea2c7995d2ab1f2a95fdc85539b82b4746424937f /common/src/leap/soledad/common/couch.py
parent4be6f05d91891122e83f74d21c83c5f8fcd3a618 (diff)
parent5b20b469f22ab99533135beefd49129db49064e5 (diff)
Merge branch 'feature/sync_state_in_memory' into develop
- Releases: 0.8.0
Diffstat (limited to 'common/src/leap/soledad/common/couch.py')
-rw-r--r--common/src/leap/soledad/common/couch.py297
1 files changed, 94 insertions, 203 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index 1c762036..38041c09 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -26,13 +26,12 @@ import logging
import binascii
import time
import sys
-import threading
from StringIO import StringIO
-from collections import defaultdict
from urlparse import urljoin
from contextlib import contextmanager
+from multiprocessing.pool import ThreadPool
from couchdb.client import Server, Database
@@ -103,6 +102,7 @@ class CouchDocument(SoledadDocument):
SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts)
self.couch_rev = None
self.transactions = None
+ self._conflicts = None
def get_conflicts(self):
"""
@@ -111,7 +111,7 @@ class CouchDocument(SoledadDocument):
:return: The conflicted versions of the document.
:rtype: [CouchDocument]
"""
- return self._conflicts
+ return self._conflicts or []
def set_conflicts(self, conflicts):
"""
@@ -357,61 +357,19 @@ def couch_server(url):
:type url: str
"""
session = Session(timeout=COUCH_TIMEOUT)
- server = Server(url=url, session=session)
+ server = Server(url=url, full_commit=False, session=session)
yield server
+THREAD_POOL = ThreadPool(20)
+
+
class CouchDatabase(CommonBackend):
"""
A U1DB implementation that uses CouchDB as its persistence layer.
"""
- # We spawn threads to parallelize the CouchDatabase.get_docs() method
- MAX_GET_DOCS_THREADS = 20
-
- update_handler_lock = defaultdict(threading.Lock)
- sync_info_lock = defaultdict(threading.Lock)
-
- class _GetDocThread(threading.Thread):
-
- """
- A thread that gets a document from a database.
-
- TODO: switch this for a twisted deferred to thread. This depends on
- replacing python-couchdb for paisley in this module.
- """
-
- def __init__(self, db, doc_id, check_for_conflicts,
- release_fun):
- """
- :param db: The database from where to get the document.
- :type db: CouchDatabase
- :param doc_id: The doc_id of the document to be retrieved.
- :type doc_id: str
- :param check_for_conflicts: Whether the get_doc() method should
- check for existing conflicts.
- :type check_for_conflicts: bool
- :param release_fun: A function that releases a semaphore, to be
- called after the document is fetched.
- :type release_fun: function
- """
- threading.Thread.__init__(self)
- self._db = db
- self._doc_id = doc_id
- self._check_for_conflicts = check_for_conflicts
- self._release_fun = release_fun
- self._doc = None
-
- def run(self):
- """
- Fetch the document, store it as a property, and call the release
- function.
- """
- self._doc = self._db._get_doc(
- self._doc_id, self._check_for_conflicts)
- self._release_fun()
-
@classmethod
def open_database(cls, url, create, replica_uid=None, ensure_ddocs=False):
"""
@@ -476,9 +434,23 @@ class CouchDatabase(CommonBackend):
self._set_replica_uid(replica_uid)
if ensure_ddocs:
self.ensure_ddocs_on_db()
- # initialize a thread pool for parallelizing get_docs()
- self._sem_pool = threading.BoundedSemaphore(
- value=self.MAX_GET_DOCS_THREADS)
+ self._cache = None
+
+ @property
+ def cache(self):
+ if self._cache is not None:
+ return self._cache
+ else:
+ return {}
+
+ def init_caching(self, cache):
+ """
+ Start using cache by setting internal _cache attribute.
+
+ :param cache: the cache instance, anything that behaves like a dict
+ :type cache: dict
+ """
+ self._cache = cache
def ensure_ddocs_on_db(self):
"""
@@ -557,10 +529,14 @@ class CouchDatabase(CommonBackend):
:rtype: str
"""
if self._real_replica_uid is not None:
+ self.cache[self._url] = {'replica_uid': self._real_replica_uid}
return self._real_replica_uid
+ if self._url in self.cache:
+ return self.cache[self._url]['replica_uid']
try:
# grab replica_uid from server
doc = self._database['u1db_config']
+ self.cache[self._url] = doc
self._real_replica_uid = doc['replica_uid']
return self._real_replica_uid
except ResourceNotFound:
@@ -595,10 +571,13 @@ class CouchDatabase(CommonBackend):
unknown reason.
"""
# query a couch list function
+ if self.replica_uid + '_gen' in self.cache:
+ return self.cache[self.replica_uid + '_gen']['generation']
ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
res = self._database.resource(*ddoc_path)
try:
response = res.get_json()
+ self.cache[self.replica_uid + '_gen'] = response[2]
return response[2]['generation']
except ResourceNotFound as e:
raise_missing_design_doc_error(e, ddoc_path)
@@ -626,11 +605,15 @@ class CouchDatabase(CommonBackend):
design document for an yet
unknown reason.
"""
+ if self.replica_uid + '_gen' in self.cache:
+ response = self.cache[self.replica_uid + '_gen']
+ return (response['generation'], response['transaction_id'])
# query a couch list function
ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
res = self._database.resource(*ddoc_path)
try:
response = res.get_json()
+ self.cache[self.replica_uid + '_gen'] = response[2]
return (response[2]['generation'], response[2]['transaction_id'])
except ResourceNotFound as e:
raise_missing_design_doc_error(e, ddoc_path)
@@ -734,6 +717,10 @@ class CouchDatabase(CommonBackend):
attachments=True)[2]
except ResourceNotFound:
return None
+ return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts)
+
+ def __parse_doc_from_couch(self, result, doc_id,
+ check_for_conflicts=False):
# restrict to u1db documents
if 'u1db_rev' not in result:
return None
@@ -798,11 +785,8 @@ class CouchDatabase(CommonBackend):
"""
generation = self._get_generation()
- results = []
- for row in self._database.view('_all_docs'):
- doc = self.get_doc(row.id, include_deleted=include_deleted)
- if doc is not None:
- results.append(doc)
+ results = list(self.get_docs(self._database,
+ include_deleted=include_deleted))
return (generation, results)
def _put_doc(self, old_doc, doc):
@@ -890,6 +874,10 @@ class CouchDatabase(CommonBackend):
doc.doc_id, body=buf.getvalue(), headers=envelope.headers)
except ResourceConflict:
raise RevisionConflict()
+ if self.replica_uid + '_gen' in self.cache:
+ gen_info = self.cache[self.replica_uid + '_gen']
+ gen_info['generation'] += 1
+ gen_info['transaction_id'] = transactions[-1][1]
def put_doc(self, doc):
"""
@@ -1092,14 +1080,22 @@ class CouchDatabase(CommonBackend):
synchronized with the replica, this is (0, '').
:rtype: (int, str)
"""
- # query a couch view
- result = self._database.view('syncs/log')
- if len(result[other_replica_uid].rows) == 0:
- return (0, '')
- return (
- result[other_replica_uid].rows[0]['value']['known_generation'],
- result[other_replica_uid].rows[0]['value']['known_transaction_id']
- )
+ if other_replica_uid in self.cache:
+ return self.cache[other_replica_uid]
+
+ doc_id = 'u1db_sync_%s' % other_replica_uid
+ try:
+ doc = self._database[doc_id]
+ except ResourceNotFound:
+ doc = {
+ '_id': doc_id,
+ 'generation': 0,
+ 'transaction_id': '',
+ }
+ self._database.save(doc)
+ result = doc['generation'], doc['transaction_id']
+ self.cache[other_replica_uid] = result
+ return result
def _set_replica_gen_and_trans_id(self, other_replica_uid,
other_generation, other_transaction_id,
@@ -1159,42 +1155,17 @@ class CouchDatabase(CommonBackend):
:type doc_idx: int
:param sync_id: The id of the current sync session.
:type sync_id: str
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
"""
- # query a couch update function
- ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log']
- res = self._database.resource(*ddoc_path)
+ self.cache[other_replica_uid] = (other_generation,
+ other_transaction_id)
+ doc_id = 'u1db_sync_%s' % other_replica_uid
try:
- with CouchDatabase.update_handler_lock[self._get_replica_uid()]:
- body = {
- 'other_replica_uid': other_replica_uid,
- 'other_generation': other_generation,
- 'other_transaction_id': other_transaction_id,
- }
- if number_of_docs is not None:
- body['number_of_docs'] = number_of_docs
- if doc_idx is not None:
- body['doc_idx'] = doc_idx
- if sync_id is not None:
- body['sync_id'] = sync_id
- res.put_json(
- body=body,
- headers={'content-type': 'application/json'})
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
+ doc = self._database[doc_id]
+ except ResourceNotFound:
+ doc = {'_id': doc_id}
+ doc['generation'] = other_generation
+ doc['transaction_id'] = other_transaction_id
+ self._database.save(doc)
def _force_doc_sync_conflict(self, doc):
"""
@@ -1203,10 +1174,11 @@ class CouchDatabase(CommonBackend):
:param doc: The document to be put.
:type doc: CouchDocument
"""
- my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- doc.prune_conflicts(
- vectorclock.VectorClockRev(doc.rev), self._replica_uid)
- doc.add_conflict(my_doc)
+ my_doc = self._get_doc(doc.doc_id)
+ self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev))
+ doc.add_conflict(self._factory(doc.doc_id, my_doc.rev,
+ my_doc.get_json()))
+ doc.has_conflicts = True
self._put_doc(my_doc, doc)
def resolve_doc(self, doc, conflicted_doc_revs):
@@ -1320,42 +1292,27 @@ class CouchDatabase(CommonBackend):
"""
if not isinstance(doc, CouchDocument):
doc = self._factory(doc.doc_id, doc.rev, doc.get_json())
- self._save_source_info(replica_uid, replica_gen,
- replica_trans_id, number_of_docs,
- doc_idx, sync_id)
my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- if my_doc is not None:
- my_doc.set_conflicts(
- self.get_doc_conflicts(my_doc.doc_id, my_doc.couch_rev))
- state, save_doc = _process_incoming_doc(
- my_doc, doc, save_conflict, self.replica_uid)
- if save_doc:
- self._put_doc(my_doc, save_doc)
- doc.update(save_doc)
- return state, self._get_generation()
-
- def _save_source_info(self, replica_uid, replica_gen, replica_trans_id,
- number_of_docs, doc_idx, sync_id):
- """
- Validate and save source information.
- """
- self._validate_source(replica_uid, replica_gen, replica_trans_id)
- self._set_replica_gen_and_trans_id(
- replica_uid, replica_gen, replica_trans_id,
- number_of_docs=number_of_docs, doc_idx=doc_idx,
- sync_id=sync_id)
+ if my_doc:
+ doc.set_conflicts(my_doc.get_conflicts())
+ return CommonBackend._put_doc_if_newer(self, doc, save_conflict,
+ replica_uid, replica_gen,
+ replica_trans_id)
+
+ def _put_and_update_indexes(self, cur_doc, doc):
+ self._put_doc(cur_doc, doc)
def get_docs(self, doc_ids, check_for_conflicts=True,
include_deleted=False):
"""
Get the JSON content for many documents.
- :param doc_ids: A list of document identifiers.
+ :param doc_ids: A list of document identifiers or None for all.
:type doc_ids: list
:param check_for_conflicts: If set to False, then the conflict check
will be skipped, and 'None' will be
returned instead of True/False.
- :type check_for_conflictsa: bool
+ :type check_for_conflicts: bool
:param include_deleted: If set to True, deleted documents will be
returned with empty content. Otherwise deleted
documents will not be included in the results.
@@ -1363,31 +1320,14 @@ class CouchDatabase(CommonBackend):
in matching doc_ids order.
:rtype: iterable
"""
- # Workaround for:
- #
- # http://bugs.python.org/issue7980
- # https://leap.se/code/issues/5449
- #
- # python-couchdb uses time.strptime, which is not thread safe. In
- # order to avoid the problem described on the issues above, we preload
- # strptime here by evaluating the conversion of an arbitrary date.
- # This will not be needed when/if we switch from python-couchdb to
- # paisley.
- time.strptime('Mar 8 1917', '%b %d %Y')
- # spawn threads to retrieve docs
- threads = []
- for doc_id in doc_ids:
- self._sem_pool.acquire()
- t = self._GetDocThread(self, doc_id, check_for_conflicts,
- self._sem_pool.release)
- t.start()
- threads.append(t)
- # join threads and yield docs
- for t in threads:
- t.join()
- if t._doc.is_tombstone() and not include_deleted:
+ get_one = lambda doc_id: self._get_doc(doc_id, check_for_conflicts)
+ docs = [THREAD_POOL.apply_async(get_one, [doc_id])
+ for doc_id in doc_ids]
+ for doc in docs:
+ doc = doc.get()
+ if not doc or not include_deleted and doc.is_tombstone():
continue
- yield t._doc
+ yield doc
def _prune_conflicts(self, doc, doc_vcr):
"""
@@ -1459,10 +1399,11 @@ class CouchServerState(ServerState):
:return: The CouchDatabase object.
:rtype: CouchDatabase
"""
- return CouchDatabase(
+ db = CouchDatabase(
self.couch_url,
dbname,
ensure_ddocs=False)
+ return db
def ensure_database(self, dbname):
"""
@@ -1494,53 +1435,3 @@ class CouchServerState(ServerState):
delete databases.
"""
raise Unauthorized()
-
-
-def _process_incoming_doc(my_doc, other_doc, save_conflict, replica_uid):
- """
- Check document, save and return state.
- """
- # at this point, `doc` has arrived from the other syncing party, and
- # we will decide what to do with it.
- # First, we prepare the arriving doc to update couch database.
- new_doc = CouchDocument(
- other_doc.doc_id, other_doc.rev, other_doc.get_json())
- if my_doc is None:
- return 'inserted', new_doc
- new_doc.couch_rev = my_doc.couch_rev
- new_doc.set_conflicts(my_doc.get_conflicts())
- # fetch conflicts because we will eventually manipulate them
- # from now on, it works just like u1db sqlite backend
- doc_vcr = vectorclock.VectorClockRev(new_doc.rev)
- cur_vcr = vectorclock.VectorClockRev(my_doc.rev)
- if doc_vcr.is_newer(cur_vcr):
- rev = new_doc.rev
- new_doc.prune_conflicts(doc_vcr, replica_uid)
- if new_doc.rev != rev:
- # conflicts have been autoresolved
- return 'superseded', new_doc
- else:
- return'inserted', new_doc
- elif new_doc.rev == my_doc.rev:
- # magical convergence
- return 'converged', None
- elif cur_vcr.is_newer(doc_vcr):
- # Don't add this to seen_ids, because we have something newer,
- # so we should send it back, and we should not generate a
- # conflict
- other_doc.update(new_doc)
- return 'superseded', None
- elif my_doc.same_content_as(new_doc):
- # the documents have been edited to the same thing at both ends
- doc_vcr.maximize(cur_vcr)
- doc_vcr.increment(replica_uid)
- new_doc.rev = doc_vcr.as_str()
- return 'superseded', new_doc
- else:
- if save_conflict:
- new_doc.prune_conflicts(
- vectorclock.VectorClockRev(new_doc.rev), replica_uid)
- new_doc.add_conflict(my_doc)
- return 'conflicted', new_doc
- other_doc.update(new_doc)
- return 'conflicted', None