summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad/common/couch.py
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/leap/soledad/common/couch.py')
-rw-r--r--common/src/leap/soledad/common/couch.py863
1 files changed, 631 insertions, 232 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index d2414477..8e8613a1 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -24,23 +24,48 @@ import uuid
import logging
import binascii
import socket
+import time
+import sys
+import threading
+
+
+from StringIO import StringIO
+from collections import defaultdict
from couchdb.client import Server
-from couchdb.http import ResourceNotFound, Unauthorized
-from u1db import errors, query_parser, vectorclock
+from couchdb.http import (
+ ResourceConflict,
+ ResourceNotFound,
+ ServerError,
+ Session,
+)
+from u1db import query_parser, vectorclock
+from u1db.errors import (
+ DatabaseDoesNotExist,
+ InvalidGeneration,
+ RevisionConflict,
+ InvalidDocId,
+ ConflictedDoc,
+ DocumentDoesNotExist,
+ DocumentAlreadyDeleted,
+ Unauthorized,
+)
from u1db.backends import CommonBackend, CommonSyncTarget
from u1db.remote import http_app
from u1db.remote.server_state import ServerState
-from leap.soledad.common import USER_DB_PREFIX, ddocs
+from leap.soledad.common import USER_DB_PREFIX, ddocs, errors
from leap.soledad.common.document import SoledadDocument
logger = logging.getLogger(__name__)
+COUCH_TIMEOUT = 120 # timeout for transfers between Soledad server and Couch
+
+
class InvalidURLError(Exception):
"""
Exception raised when Soledad encounters a malformed URL.
@@ -75,9 +100,9 @@ class CouchDocument(SoledadDocument):
SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts)
self._couch_rev = None
self._conflicts = None
- self._modified_conflicts = False
+ self._transactions = None
- def ensure_fetch_conflicts(self, get_conflicts_fun):
+ def _ensure_fetch_conflicts(self, get_conflicts_fun):
"""
Ensure conflict data has been fetched from the server.
@@ -100,6 +125,16 @@ class CouchDocument(SoledadDocument):
"""
return self._conflicts
+ def set_conflicts(self, conflicts):
+ """
+ Set the conflicted versions of the document.
+
+ :param conflicts: The conflicted versions of the document.
+ :type conflicts: list
+ """
+ self._conflicts = conflicts
+ self.has_conflicts = len(self._conflicts) > 0
+
def add_conflict(self, doc):
"""
Add a conflict to this document.
@@ -108,8 +143,7 @@ class CouchDocument(SoledadDocument):
:type doc: CouchDocument
"""
if self._conflicts is None:
- raise Exception("Run self.ensure_fetch_conflicts first!")
- self._modified_conflicts = True
+ raise Exception("Run self._ensure_fetch_conflicts first!")
self._conflicts.append(doc)
self.has_conflicts = len(self._conflicts) > 0
@@ -121,25 +155,13 @@ class CouchDocument(SoledadDocument):
:type conflict_revs: [str]
"""
if self._conflicts is None:
- raise Exception("Run self.ensure_fetch_conflicts first!")
+ raise Exception("Run self._ensure_fetch_conflicts first!")
conflicts_len = len(self._conflicts)
self._conflicts = filter(
lambda doc: doc.rev not in conflict_revs,
self._conflicts)
- if len(self._conflicts) < conflicts_len:
- self._modified_conflicts = True
self.has_conflicts = len(self._conflicts) > 0
- def modified_conflicts(self):
- """
- Return whether this document's conflicts have been modified.
-
- :return: Whether this document's conflicts have been modified.
- :rtype: bool
- """
- return self._conflicts is not None and \
- self._modified_conflicts is True
-
def _get_couch_rev(self):
return self._couch_rev
@@ -148,18 +170,217 @@ class CouchDocument(SoledadDocument):
couch_rev = property(_get_couch_rev, _set_couch_rev)
+ def _get_transactions(self):
+ return self._transactions
+
+ def _set_transactions(self, rev):
+ self._transactions = rev
+
+ transactions = property(_get_transactions, _set_transactions)
+
# monkey-patch the u1db http app to use CouchDocument
http_app.Document = CouchDocument
+def raise_missing_design_doc_error(exc, ddoc_path):
+ """
+ Raise an appropriate exception when catching a ResourceNotFound when
+ accessing a design document.
+
+ :param exc: The exception cought.
+ :type exc: ResourceNotFound
+ :param ddoc_path: A list representing the requested path.
+ :type ddoc_path: list
+
+ :raise MissingDesignDocError: Raised when tried to access a missing design
+ document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access a
+ missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a design
+ document for an yet unknown reason.
+ """
+ path = "".join(ddoc_path)
+ if exc.message[1] == 'missing':
+ raise errors.MissingDesignDocError(path)
+ elif exc.message[1] == 'missing function' or \
+ exc.message[1].startswith('missing lists function'):
+ raise errors.MissingDesignDocListFunctionError(path)
+ elif exc.message[1] == 'missing_named_view':
+ raise errors.MissingDesignDocNamedViewError(path)
+ elif exc.message[1] == 'deleted':
+ raise errors.MissingDesignDocDeletedError(path)
+ # other errors are unknown for now
+ raise errors.DesignDocUnknownError("%s: %s" % (path, str(exc.message)))
+
+
+def raise_server_error(exc, ddoc_path):
+ """
+ Raise an appropriate exception when catching a ServerError when
+ accessing a design document.
+
+ :param exc: The exception cought.
+ :type exc: ResourceNotFound
+ :param ddoc_path: A list representing the requested path.
+ :type ddoc_path: list
+
+ :raise MissingDesignDocListFunctionError: Raised when trying to access a
+ missing list function on a
+ design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a design
+ document for an yet unknown reason.
+ """
+ path = "".join(ddoc_path)
+ if exc.message[1][0] == 'unnamed_error':
+ raise errors.MissingDesignDocListFunctionError(path)
+ # other errors are unknown for now
+ raise errors.DesignDocUnknownError(path)
+
+
+class MultipartWriter(object):
+ """
+ A multipart writer adapted from python-couchdb's one so we can PUT
+ documents using couch's multipart PUT.
+
+ This stripped down version does not allow for nested structures, and
+ contains only the essential things we need to PUT SoledadDocuments to the
+ couch backend.
+ """
+
+ CRLF = '\r\n'
+
+ def __init__(self, fileobj, headers=None, boundary=None):
+ """
+ Initialize the multipart writer.
+ """
+ self.fileobj = fileobj
+ if boundary is None:
+ boundary = self._make_boundary()
+ self._boundary = boundary
+ self._build_headers('related', headers)
+
+ def add(self, mimetype, content, headers={}):
+ """
+ Add a part to the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ self.fileobj.write(self.CRLF)
+ headers['Content-Type'] = mimetype
+ self._write_headers(headers)
+ if content:
+ # XXX: throw an exception if a boundary appears in the content??
+ self.fileobj.write(content)
+ self.fileobj.write(self.CRLF)
+
+ def close(self):
+ """
+ Close the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ # be careful not to have anything after '--', otherwise old couch
+ # versions (including bigcouch) will fail.
+ self.fileobj.write('--')
+
+ def _make_boundary(self):
+ """
+ Create a boundary to discern multi parts.
+ """
+ try:
+ from uuid import uuid4
+ return '==' + uuid4().hex + '=='
+ except ImportError:
+ from random import randrange
+ token = randrange(sys.maxint)
+ format = '%%0%dd' % len(repr(sys.maxint - 1))
+ return '===============' + (format % token) + '=='
+
+ def _write_headers(self, headers):
+ """
+ Write a part header in the buffer stream.
+ """
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.fileobj.write(name)
+ self.fileobj.write(': ')
+ self.fileobj.write(value)
+ self.fileobj.write(self.CRLF)
+ self.fileobj.write(self.CRLF)
+
+ def _build_headers(self, subtype, headers):
+ """
+ Build the main headers of the multipart stream.
+
+ This is here so we can send headers separete from content using
+ python-couchdb API.
+ """
+ self.headers = {}
+ self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \
+ (subtype, self._boundary)
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.headers[name] = value
+
+
class CouchDatabase(CommonBackend):
"""
A U1DB implementation that uses CouchDB as its persistence layer.
"""
+ # We spawn threads to parallelize the CouchDatabase.get_docs() method
+ MAX_GET_DOCS_THREADS = 20
+
+ update_handler_lock = defaultdict(threading.Lock)
+
+ class _GetDocThread(threading.Thread):
+ """
+ A thread that gets a document from a database.
+
+ TODO: switch this for a twisted deferred to thread. This depends on
+ replacing python-couchdb for paisley in this module.
+ """
+
+ def __init__(self, db, doc_id, check_for_conflicts,
+ release_fun):
+ """
+ :param db: The database from where to get the document.
+ :type db: u1db.Database
+ :param doc_id: The doc_id of the document to be retrieved.
+ :type doc_id: str
+ :param check_for_conflicts: Whether the get_doc() method should
+ check for existing conflicts.
+ :type check_for_conflicts: bool
+ :param release_fun: A function that releases a semaphore, to be
+ called after the document is fetched.
+ :type release_fun: function
+ """
+ threading.Thread.__init__(self)
+ self._db = db
+ self._doc_id = doc_id
+ self._check_for_conflicts = check_for_conflicts
+ self._release_fun = release_fun
+ self._doc = None
+
+ def run(self):
+ """
+ Fetch the document, store it as a property, and call the release
+ function.
+ """
+ self._doc = self._db._get_doc(
+ self._doc_id, self._check_for_conflicts)
+ self._release_fun()
+
@classmethod
- def open_database(cls, url, create):
+ def open_database(cls, url, create, ensure_ddocs=False):
"""
Open a U1DB database using CouchDB as backend.
@@ -167,6 +388,8 @@ class CouchDatabase(CommonBackend):
:type url: str
:param create: should the replica be created if it does not exist?
:type create: bool
+ :param ensure_ddocs: Ensure that the design docs exist on server.
+ :type ensure_ddocs: bool
:return: the database instance
:rtype: CouchDatabase
@@ -182,8 +405,8 @@ class CouchDatabase(CommonBackend):
server[dbname]
except ResourceNotFound:
if not create:
- raise errors.DatabaseDoesNotExist()
- return cls(url, dbname)
+ raise DatabaseDoesNotExist()
+ return cls(url, dbname, ensure_ddocs=ensure_ddocs)
def __init__(self, url, dbname, replica_uid=None, full_commit=True,
session=None, ensure_ddocs=True):
@@ -206,6 +429,8 @@ class CouchDatabase(CommonBackend):
# save params
self._url = url
self._full_commit = full_commit
+ if session is None:
+ session = Session(timeout=COUCH_TIMEOUT)
self._session = session
self._factory = CouchDocument
self._real_replica_uid = None
@@ -223,6 +448,9 @@ class CouchDatabase(CommonBackend):
self._set_replica_uid(replica_uid)
if ensure_ddocs:
self.ensure_ddocs_on_db()
+ # initialize a thread pool for parallelizing get_docs()
+ self._sem_pool = threading.BoundedSemaphore(
+ value=self.MAX_GET_DOCS_THREADS)
def ensure_ddocs_on_db(self):
"""
@@ -318,12 +546,31 @@ class CouchDatabase(CommonBackend):
:return: The current generation.
:rtype: int
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch list function
- res = self._database.resource(
- '_design', 'transactions', '_list', 'generation', 'log')
- response = res.get_json()
- return response[2]['generation']
+ ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json()
+ return response[2]['generation']
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
def _get_generation_info(self):
"""
@@ -331,12 +578,31 @@ class CouchDatabase(CommonBackend):
:return: A tuple containing the current generation and transaction id.
:rtype: (int, str)
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch list function
- res = self._database.resource(
- '_design', 'transactions', '_list', 'generation', 'log')
- response = res.get_json()
- return (response[2]['generation'], response[2]['transaction_id'])
+ ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json()
+ return (response[2]['generation'], response[2]['transaction_id'])
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
def _get_trans_id_for_gen(self, generation):
"""
@@ -349,16 +615,36 @@ class CouchDatabase(CommonBackend):
:rtype: str
:raise InvalidGeneration: Raised when the generation does not exist.
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
if generation == 0:
return ''
# query a couch list function
- res = self._database.resource(
- '_design', 'transactions', '_list', 'trans_id_for_gen', 'log')
- response = res.get_json(gen=generation)
- if response[2] == {}:
- raise errors.InvalidGeneration
- return response[2]['transaction_id']
+ ddoc_path = [
+ '_design', 'transactions', '_list', 'trans_id_for_gen', 'log'
+ ]
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json(gen=generation)
+ if response[2] == {}:
+ raise InvalidGeneration
+ return response[2]['transaction_id']
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
def _get_transaction_log(self):
"""
@@ -366,12 +652,31 @@ class CouchDatabase(CommonBackend):
:return: The complete transaction log.
:rtype: [(str, str)]
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch view
- res = self._database.resource(
- '_design', 'transactions', '_view', 'log')
- response = res.get_json()
- return map(lambda row: (row['id'], row['value']), response[2]['rows'])
+ ddoc_path = ['_design', 'transactions', '_view', 'log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json()
+ return map(
+ lambda row: (row['id'], row['value']),
+ response[2]['rows'])
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
def _get_doc(self, doc_id, check_for_conflicts=False):
"""
@@ -413,8 +718,15 @@ class CouchDatabase(CommonBackend):
and '_attachments' in result \
and 'u1db_conflicts' in result['_attachments']:
doc.has_conflicts = True
+ doc.set_conflicts(
+ self._build_conflicts(
+ doc.doc_id,
+ json.loads(binascii.a2b_base64(
+ result['_attachments']['u1db_conflicts']['data']))))
# store couch revision
doc.couch_rev = result['_rev']
+ # store transactions
+ doc.transactions = result['u1db_transactions']
return doc
def get_doc(self, doc_id, include_deleted=False):
@@ -465,6 +777,10 @@ class CouchDatabase(CommonBackend):
"""
Put the document in the Couch backend database.
+ Note that C{old_doc} must have been fetched with the parameter
+ C{check_for_conflicts} equal to True, so we can properly update the
+ new document using the conflict information from the old one.
+
:param old_doc: The old document version.
:type old_doc: CouchDocument
:param doc: The document to be put.
@@ -472,43 +788,76 @@ class CouchDatabase(CommonBackend):
:raise RevisionConflict: Raised when trying to update a document but
couch revisions mismatch.
- """
- trans_id = self._allocate_transaction_id()
- # encode content
- content = doc.get_json()
- if content is not None:
- content = binascii.b2a_base64(content)[:-1] # exclude trailing \n
- # encode conflicts
- conflicts = None
- update_conflicts = doc.modified_conflicts()
- if update_conflicts is True:
- if doc.has_conflicts:
- conflicts = binascii.b2a_base64(
- json.dumps(
- map(lambda cdoc: (cdoc.rev, cdoc.content),
- doc.get_conflicts()))
- )[:-1] # exclude \n
- # perform the request
- resource = self._database.resource(
- '_design', 'docs', '_update', 'put', doc.doc_id)
- response = resource.put_json(
- body={
- 'couch_rev': old_doc.couch_rev
- if old_doc is not None else None,
- 'u1db_rev': doc.rev,
- 'content': content,
- 'trans_id': trans_id,
- 'conflicts': conflicts,
- 'update_conflicts': update_conflicts,
- },
- headers={'content-type': 'application/json'})
- # the document might have been updated in between, so we check for the
- # return message
- msg = response[2].read()
- if msg == 'ok':
- return
- elif msg == 'revision conflict':
- raise errors.RevisionConflict()
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ attachments = {} # we save content and conflicts as attachments
+ parts = [] # and we put it using couch's multipart PUT
+ # save content as attachment
+ if doc.is_tombstone() is False:
+ content = doc.get_json()
+ attachments['u1db_content'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(content),
+ }
+ parts.append(content)
+ # save conflicts as attachment
+ if doc.has_conflicts is True:
+ conflicts = json.dumps(
+ map(lambda cdoc: (cdoc.rev, cdoc.content),
+ doc.get_conflicts()))
+ attachments['u1db_conflicts'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(conflicts),
+ }
+ parts.append(conflicts)
+ # store old transactions, if any
+ transactions = old_doc.transactions[:] if old_doc is not None else []
+ # create a new transaction id and timestamp it so the transaction log
+ # is consistent when querying the database.
+ transactions.append(
+ # here we store milliseconds to keep consistent with javascript
+ # Date.prototype.getTime() which was used before inside a couchdb
+ # update handler.
+ (int(time.time() * 1000),
+ self._allocate_transaction_id()))
+ # build the couch document
+ couch_doc = {
+ '_id': doc.doc_id,
+ 'u1db_rev': doc.rev,
+ 'u1db_transactions': transactions,
+ '_attachments': attachments,
+ }
+ # if we are updating a doc we have to add the couch doc revision
+ if old_doc is not None:
+ couch_doc['_rev'] = old_doc.couch_rev
+ # prepare the multipart PUT
+ buf = StringIO()
+ envelope = MultipartWriter(buf)
+ envelope.add('application/json', json.dumps(couch_doc))
+ for part in parts:
+ envelope.add('application/octet-stream', part)
+ envelope.close()
+ # try to save and fail if there's a revision conflict
+ try:
+ self._database.resource.put_json(
+ doc.doc_id, body=buf.getvalue(), headers=envelope.headers)
+ self._renew_couch_session()
+ except ResourceConflict:
+ raise RevisionConflict()
def put_doc(self, doc):
"""
@@ -522,26 +871,26 @@ class CouchDatabase(CommonBackend):
:return: new_doc_rev - The new revision identifier for the document.
The Document object will also be updated.
- :raise errors.InvalidDocId: Raised if the document's id is invalid.
- :raise errors.DocumentTooBig: Raised if the document size is too big.
- :raise errors.ConflictedDoc: Raised if the document has conflicts.
+ :raise InvalidDocId: Raised if the document's id is invalid.
+ :raise DocumentTooBig: Raised if the document size is too big.
+ :raise ConflictedDoc: Raised if the document has conflicts.
"""
if doc.doc_id is None:
- raise errors.InvalidDocId()
+ raise InvalidDocId()
self._check_doc_id(doc.doc_id)
self._check_doc_size(doc)
old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
if old_doc and old_doc.has_conflicts:
- raise errors.ConflictedDoc()
+ raise ConflictedDoc()
if old_doc and doc.rev is None and old_doc.is_tombstone():
new_rev = self._allocate_doc_rev(old_doc.rev)
else:
if old_doc is not None:
if old_doc.rev != doc.rev:
- raise errors.RevisionConflict()
+ raise RevisionConflict()
else:
if doc.rev is not None:
- raise errors.RevisionConflict()
+ raise RevisionConflict()
new_rev = self._allocate_doc_rev(doc.rev)
doc.rev = new_rev
self._put_doc(old_doc, doc)
@@ -563,32 +912,53 @@ class CouchDatabase(CommonBackend):
to the last intervening change and sorted by generation (old
changes first)
:rtype: (int, str, [(str, int, str)])
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch list function
- res = self._database.resource(
- '_design', 'transactions', '_list', 'whats_changed', 'log')
- response = res.get_json(old_gen=old_generation)
- results = map(
- lambda row:
- (row['generation'], row['doc_id'], row['transaction_id']),
- response[2]['transactions'])
- results.reverse()
- cur_gen = old_generation
- seen = set()
- changes = []
- newest_trans_id = ''
- for generation, doc_id, trans_id in results:
- if doc_id not in seen:
- changes.append((doc_id, generation, trans_id))
- seen.add(doc_id)
- if changes:
- cur_gen = changes[0][1] # max generation
- newest_trans_id = changes[0][2]
- changes.reverse()
- else:
- cur_gen, newest_trans_id = self._get_generation_info()
+ ddoc_path = [
+ '_design', 'transactions', '_list', 'whats_changed', 'log'
+ ]
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json(old_gen=old_generation)
+ results = map(
+ lambda row:
+ (row['generation'], row['doc_id'], row['transaction_id']),
+ response[2]['transactions'])
+ results.reverse()
+ cur_gen = old_generation
+ seen = set()
+ changes = []
+ newest_trans_id = ''
+ for generation, doc_id, trans_id in results:
+ if doc_id not in seen:
+ changes.append((doc_id, generation, trans_id))
+ seen.add(doc_id)
+ if changes:
+ cur_gen = changes[0][1] # max generation
+ newest_trans_id = changes[0][2]
+ changes.reverse()
+ else:
+ cur_gen, newest_trans_id = self._get_generation_info()
- return cur_gen, newest_trans_id, changes
+ return cur_gen, newest_trans_id, changes
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
def delete_doc(self, doc):
"""
@@ -600,28 +970,47 @@ class CouchDatabase(CommonBackend):
:param doc: The document to mark as deleted.
:type doc: CouchDocument.
- :raise errors.DocumentDoesNotExist: Raised if the document does not
+ :raise DocumentDoesNotExist: Raised if the document does not
exist.
- :raise errors.RevisionConflict: Raised if the revisions do not match.
- :raise errors.DocumentAlreadyDeleted: Raised if the document is
+ :raise RevisionConflict: Raised if the revisions do not match.
+ :raise DocumentAlreadyDeleted: Raised if the document is
already deleted.
- :raise errors.ConflictedDoc: Raised if the doc has conflicts.
+ :raise ConflictedDoc: Raised if the doc has conflicts.
"""
old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
if old_doc is None:
- raise errors.DocumentDoesNotExist
+ raise DocumentDoesNotExist
if old_doc.rev != doc.rev:
- raise errors.RevisionConflict()
+ raise RevisionConflict()
if old_doc.is_tombstone():
- raise errors.DocumentAlreadyDeleted
+ raise DocumentAlreadyDeleted
if old_doc.has_conflicts:
- raise errors.ConflictedDoc()
+ raise ConflictedDoc()
new_rev = self._allocate_doc_rev(doc.rev)
doc.rev = new_rev
doc.make_tombstone()
self._put_doc(old_doc, doc)
return new_rev
+ def _build_conflicts(self, doc_id, attached_conflicts):
+ """
+ Build the conflicted documents list from the conflicts attachment
+ fetched from a couch document.
+
+ :param attached_conflicts: The document's conflicts as fetched from a
+ couch document attachment.
+ :type attached_conflicts: dict
+ """
+ conflicts = []
+ for doc_rev, content in attached_conflicts:
+ doc = self._factory(doc_id, doc_rev)
+ if content is None:
+ doc.make_tombstone()
+ else:
+ doc.content = content
+ conflicts.append(doc)
+ return conflicts
+
def _get_conflicts(self, doc_id, couch_rev=None):
"""
Get the conflicted versions of a document.
@@ -642,16 +1031,8 @@ class CouchDatabase(CommonBackend):
resource = self._database.resource(doc_id, 'u1db_conflicts')
try:
response = resource.get_json(**params)
- conflicts = []
- # build the conflicted versions
- for doc_rev, content in json.loads(response[2].read()):
- doc = self._factory(doc_id, doc_rev)
- if content is None:
- doc.make_tombstone()
- else:
- doc.content = content
- conflicts.append(doc)
- return conflicts
+ return self._build_conflicts(
+ doc_id, json.loads(response[2].read()))
except ResourceNotFound:
return []
@@ -737,17 +1118,35 @@ class CouchDatabase(CommonBackend):
:param other_transaction_id: The transaction id associated with the
generation.
:type other_transaction_id: str
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch update function
- res = self._database.resource(
- '_design', 'syncs', '_update', 'put', 'u1db_sync_log')
- res.put_json(
- body={
- 'other_replica_uid': other_replica_uid,
- 'other_generation': other_generation,
- 'other_transaction_id': other_transaction_id,
- },
- headers={'content-type': 'application/json'})
+ ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ with CouchDatabase.update_handler_lock[self._get_replica_uid()]:
+ res.put_json(
+ body={
+ 'other_replica_uid': other_replica_uid,
+ 'other_generation': other_generation,
+ 'other_transaction_id': other_transaction_id,
+ },
+ headers={'content-type': 'application/json'})
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
def _add_conflict(self, doc, my_doc_rev, my_content):
"""
@@ -765,7 +1164,7 @@ class CouchDatabase(CommonBackend):
serialized string.
:type my_content: str
"""
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
doc.add_conflict(
self._factory(doc_id=doc.doc_id, rev=my_doc_rev,
json=my_content))
@@ -774,7 +1173,7 @@ class CouchDatabase(CommonBackend):
"""
Delete the conflicted revisions from the list of conflicts of C{doc}.
- Note that thie method does not actually update the backed; rather, it
+ Note that this method does not actually update the backend; rather, it
updates the CouchDocument object which will provide the conflict data
when the atomic document update is made.
@@ -783,7 +1182,7 @@ class CouchDatabase(CommonBackend):
:param conflict_revs: A list of the revisions to be deleted.
:param conflict_revs: [str]
"""
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
doc.delete_conflicts(conflict_revs)
def _prune_conflicts(self, doc, doc_vcr):
@@ -842,34 +1241,44 @@ class CouchDatabase(CommonBackend):
:param conflicted_doc_revs: A list of revisions that the new content
supersedes.
:type conflicted_doc_revs: [str]
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
new_rev = self._ensure_maximal_rev(cur_doc.rev,
conflicted_doc_revs)
superseded_revs = set(conflicted_doc_revs)
doc.rev = new_rev
+ # this backend stores conflicts as properties of the documents, so we
+ # have to copy these conflicts over to the document being updated.
if cur_doc.rev in superseded_revs:
+ # the newer doc version will supersede the one in the database, so
+ # we copy conflicts before updating the backend.
+ doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over.
self._delete_conflicts(doc, superseded_revs)
self._put_doc(cur_doc, doc)
else:
- self._add_conflict(doc, new_rev, doc.get_json())
- self._delete_conflicts(doc, superseded_revs)
- # perform request to resolve document in server
- resource = self._database.resource(
- '_design', 'docs', '_update', 'resolve_doc', doc.doc_id)
- conflicts = None
- if doc.has_conflicts:
- conflicts = binascii.b2a_base64(
- json.dumps(
- map(lambda cdoc: (cdoc.rev, cdoc.content),
- doc.get_conflicts()))
- )[:-1] # exclude \n
- response = resource.put_json(
- body={
- 'couch_rev': cur_doc.couch_rev,
- 'conflicts': conflicts,
- },
- headers={'content-type': 'application/json'})
+ # the newer doc version does not supersede the one in the
+ # database, so we will add a conflict to the database and copy
+ # those over to the document the user has in her hands.
+ self._add_conflict(cur_doc, new_rev, doc.get_json())
+ self._delete_conflicts(cur_doc, superseded_revs)
+ self._put_doc(cur_doc, cur_doc) # just update conflicts
+ # backend has been updated with current conflicts, now copy them
+ # to the current document.
+ doc.set_conflicts(cur_doc.get_conflicts())
def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,
replica_trans_id=''):
@@ -926,7 +1335,7 @@ class CouchDatabase(CommonBackend):
if cur_doc is not None:
doc.couch_rev = cur_doc.couch_rev
# fetch conflicts because we will eventually manipulate them
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
# from now on, it works just like u1db sqlite backend
doc_vcr = vectorclock.VectorClockRev(doc.rev)
if cur_doc is None:
@@ -963,7 +1372,7 @@ class CouchDatabase(CommonBackend):
if save_conflict:
self._force_doc_sync_conflict(doc)
if replica_uid is not None and replica_gen is not None:
- self._do_set_replica_gen_and_trans_id(
+ self._set_replica_gen_and_trans_id(
replica_uid, replica_gen, replica_trans_id)
# update info
old_doc.rev = doc.rev
@@ -974,6 +1383,59 @@ class CouchDatabase(CommonBackend):
old_doc.has_conflicts = doc.has_conflicts
return state, self._get_generation()
+ def get_docs(self, doc_ids, check_for_conflicts=True,
+ include_deleted=False):
+ """
+ Get the JSON content for many documents.
+
+ :param doc_ids: A list of document identifiers.
+ :type doc_ids: list
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped, and 'None' will be
+ returned instead of True/False.
+ :type check_for_conflictsa: bool
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :return: iterable giving the Document object for each document id
+ in matching doc_ids order.
+ :rtype: iterable
+ """
+ # Workaround for:
+ #
+ # http://bugs.python.org/issue7980
+ # https://leap.se/code/issues/5449
+ #
+ # python-couchdb uses time.strptime, which is not thread safe. In
+ # order to avoid the problem described on the issues above, we preload
+ # strptime here by evaluating the conversion of an arbitrary date.
+ # This will not be needed when/if we switch from python-couchdb to
+ # paisley.
+ time.strptime('Mar 4 1917', '%b %d %Y')
+ # spawn threads to retrieve docs
+ threads = []
+ for doc_id in doc_ids:
+ self._sem_pool.acquire()
+ t = self._GetDocThread(self, doc_id, check_for_conflicts,
+ self._sem_pool.release)
+ t.start()
+ threads.append(t)
+ # join threads and yield docs
+ for t in threads:
+ t.join()
+ if t._doc.is_tombstone() and not include_deleted:
+ continue
+ yield t._doc
+
+ def _renew_couch_session(self):
+ """
+ Create a new couch connection session.
+
+ This is a workaround for #5448. Will not be needed once bigcouch is
+ merged with couchdb.
+ """
+ self._database.resource.session = Session(timeout=COUCH_TIMEOUT)
+
class CouchSyncTarget(CommonSyncTarget):
"""
@@ -997,14 +1459,6 @@ class CouchSyncTarget(CommonSyncTarget):
source_replica_transaction_id)
-class NotEnoughCouchPermissions(Exception):
- """
- Raised when failing to assert for enough permissions on underlying Couch
- Database.
- """
- pass
-
-
class CouchServerState(ServerState):
"""
Inteface of the WSGI server with the CouchDB backend.
@@ -1024,65 +1478,6 @@ class CouchServerState(ServerState):
self._couch_url = couch_url
self._shared_db_name = shared_db_name
self._tokens_db_name = tokens_db_name
- try:
- self._check_couch_permissions()
- except NotEnoughCouchPermissions:
- logger.error("Not enough permissions on underlying couch "
- "database (%s)." % self._couch_url)
- except (socket.error, socket.gaierror, socket.herror,
- socket.timeout), e:
- logger.error("Socket problem while trying to reach underlying "
- "couch database: (%s, %s)." %
- (self._couch_url, e))
-
- def _check_couch_permissions(self):
- """
- Assert that Soledad Server has enough permissions on the underlying
- couch database.
-
- Soledad Server has to be able to do the following in the couch server:
-
- * Create, read and write from/to 'shared' db.
- * Create, read and write from/to 'user-<anything>' dbs.
- * Read from 'tokens' db.
-
- This function tries to perform the actions above using the "low level"
- couch library to ensure that Soledad Server can do everything it needs
- on the underlying couch database.
-
- :param couch_url: The URL of the couch database.
- :type couch_url: str
-
- @raise NotEnoughCouchPermissions: Raised in case there are not enough
- permissions to read/write/create the needed couch databases.
- :rtype: bool
- """
-
- def _open_couch_db(dbname):
- server = Server(url=self._couch_url)
- try:
- server[dbname]
- except ResourceNotFound:
- server.create(dbname)
- return server[dbname]
-
- def _create_delete_test_doc(db):
- doc_id, _ = db.save({'test': 'document'})
- doc = db[doc_id]
- db.delete(doc)
-
- try:
- # test read/write auth for shared db
- _create_delete_test_doc(
- _open_couch_db(self._shared_db_name))
- # test read/write auth for user-<something> db
- _create_delete_test_doc(
- _open_couch_db('%stest-db' % USER_DB_PREFIX))
- # test read auth for tokens db
- tokensdb = _open_couch_db(self._tokens_db_name)
- tokensdb.info()
- except Unauthorized:
- raise NotEnoughCouchPermissions(self._couch_url)
def open_database(self, dbname):
"""
@@ -1094,25 +1489,29 @@ class CouchServerState(ServerState):
:return: The CouchDatabase object.
:rtype: CouchDatabase
"""
- # TODO: open couch
return CouchDatabase.open_database(
self._couch_url + '/' + dbname,
- create=False)
+ create=False,
+ ensure_ddocs=False)
def ensure_database(self, dbname):
"""
Ensure couch database exists.
+ Usually, this method is used by the server to ensure the existence of
+ a database. In our setup, the Soledad user that accesses the underlying
+ couch server should never have permission to create (or delete)
+ databases. But, in case it ever does, by raising an exception here we
+ have one more guarantee that no modified client will be able to
+ enforce creation of a database when syncing.
+
:param dbname: The name of the database to ensure.
:type dbname: str
:return: The CouchDatabase object and the replica uid.
:rtype: (CouchDatabase, str)
"""
- db = CouchDatabase.open_database(
- self._couch_url + '/' + dbname,
- create=True)
- return db, db._replica_uid
+ raise Unauthorized()
def delete_database(self, dbname):
"""
@@ -1121,7 +1520,7 @@ class CouchServerState(ServerState):
:param dbname: The name of the database to delete.
:type dbname: str
"""
- CouchDatabase.delete_database(self._couch_url + '/' + dbname)
+ raise Unauthorized()
def _set_couch_url(self, url):
"""