summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad/common
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/leap/soledad/common')
-rw-r--r--common/src/leap/soledad/common/couch.py863
-rw-r--r--common/src/leap/soledad/common/ddocs/docs/updates/put.js64
-rw-r--r--common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js39
-rw-r--r--common/src/leap/soledad/common/errors.py132
-rw-r--r--common/src/leap/soledad/common/tests/couchdb.ini.template2
-rw-r--r--common/src/leap/soledad/common/tests/test_couch.py214
-rw-r--r--common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py50
-rw-r--r--common/src/leap/soledad/common/tests/test_server.py127
8 files changed, 1120 insertions, 371 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index d2414477..8e8613a1 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -24,23 +24,48 @@ import uuid
import logging
import binascii
import socket
+import time
+import sys
+import threading
+
+
+from StringIO import StringIO
+from collections import defaultdict
from couchdb.client import Server
-from couchdb.http import ResourceNotFound, Unauthorized
-from u1db import errors, query_parser, vectorclock
+from couchdb.http import (
+ ResourceConflict,
+ ResourceNotFound,
+ ServerError,
+ Session,
+)
+from u1db import query_parser, vectorclock
+from u1db.errors import (
+ DatabaseDoesNotExist,
+ InvalidGeneration,
+ RevisionConflict,
+ InvalidDocId,
+ ConflictedDoc,
+ DocumentDoesNotExist,
+ DocumentAlreadyDeleted,
+ Unauthorized,
+)
from u1db.backends import CommonBackend, CommonSyncTarget
from u1db.remote import http_app
from u1db.remote.server_state import ServerState
-from leap.soledad.common import USER_DB_PREFIX, ddocs
+from leap.soledad.common import USER_DB_PREFIX, ddocs, errors
from leap.soledad.common.document import SoledadDocument
logger = logging.getLogger(__name__)
+COUCH_TIMEOUT = 120 # timeout for transfers between Soledad server and Couch
+
+
class InvalidURLError(Exception):
"""
Exception raised when Soledad encounters a malformed URL.
@@ -75,9 +100,9 @@ class CouchDocument(SoledadDocument):
SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts)
self._couch_rev = None
self._conflicts = None
- self._modified_conflicts = False
+ self._transactions = None
- def ensure_fetch_conflicts(self, get_conflicts_fun):
+ def _ensure_fetch_conflicts(self, get_conflicts_fun):
"""
Ensure conflict data has been fetched from the server.
@@ -100,6 +125,16 @@ class CouchDocument(SoledadDocument):
"""
return self._conflicts
+ def set_conflicts(self, conflicts):
+ """
+ Set the conflicted versions of the document.
+
+ :param conflicts: The conflicted versions of the document.
+ :type conflicts: list
+ """
+ self._conflicts = conflicts
+ self.has_conflicts = len(self._conflicts) > 0
+
def add_conflict(self, doc):
"""
Add a conflict to this document.
@@ -108,8 +143,7 @@ class CouchDocument(SoledadDocument):
:type doc: CouchDocument
"""
if self._conflicts is None:
- raise Exception("Run self.ensure_fetch_conflicts first!")
- self._modified_conflicts = True
+ raise Exception("Run self._ensure_fetch_conflicts first!")
self._conflicts.append(doc)
self.has_conflicts = len(self._conflicts) > 0
@@ -121,25 +155,13 @@ class CouchDocument(SoledadDocument):
:type conflict_revs: [str]
"""
if self._conflicts is None:
- raise Exception("Run self.ensure_fetch_conflicts first!")
+ raise Exception("Run self._ensure_fetch_conflicts first!")
conflicts_len = len(self._conflicts)
self._conflicts = filter(
lambda doc: doc.rev not in conflict_revs,
self._conflicts)
- if len(self._conflicts) < conflicts_len:
- self._modified_conflicts = True
self.has_conflicts = len(self._conflicts) > 0
- def modified_conflicts(self):
- """
- Return whether this document's conflicts have been modified.
-
- :return: Whether this document's conflicts have been modified.
- :rtype: bool
- """
- return self._conflicts is not None and \
- self._modified_conflicts is True
-
def _get_couch_rev(self):
return self._couch_rev
@@ -148,18 +170,217 @@ class CouchDocument(SoledadDocument):
couch_rev = property(_get_couch_rev, _set_couch_rev)
+ def _get_transactions(self):
+ return self._transactions
+
+ def _set_transactions(self, rev):
+ self._transactions = rev
+
+ transactions = property(_get_transactions, _set_transactions)
+
# monkey-patch the u1db http app to use CouchDocument
http_app.Document = CouchDocument
+def raise_missing_design_doc_error(exc, ddoc_path):
+ """
+ Raise an appropriate exception when catching a ResourceNotFound when
+ accessing a design document.
+
+ :param exc: The exception cought.
+ :type exc: ResourceNotFound
+ :param ddoc_path: A list representing the requested path.
+ :type ddoc_path: list
+
+ :raise MissingDesignDocError: Raised when tried to access a missing design
+ document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access a
+ missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a design
+ document for an yet unknown reason.
+ """
+ path = "".join(ddoc_path)
+ if exc.message[1] == 'missing':
+ raise errors.MissingDesignDocError(path)
+ elif exc.message[1] == 'missing function' or \
+ exc.message[1].startswith('missing lists function'):
+ raise errors.MissingDesignDocListFunctionError(path)
+ elif exc.message[1] == 'missing_named_view':
+ raise errors.MissingDesignDocNamedViewError(path)
+ elif exc.message[1] == 'deleted':
+ raise errors.MissingDesignDocDeletedError(path)
+ # other errors are unknown for now
+ raise errors.DesignDocUnknownError("%s: %s" % (path, str(exc.message)))
+
+
+def raise_server_error(exc, ddoc_path):
+ """
+ Raise an appropriate exception when catching a ServerError when
+ accessing a design document.
+
+ :param exc: The exception cought.
+ :type exc: ResourceNotFound
+ :param ddoc_path: A list representing the requested path.
+ :type ddoc_path: list
+
+ :raise MissingDesignDocListFunctionError: Raised when trying to access a
+ missing list function on a
+ design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a design
+ document for an yet unknown reason.
+ """
+ path = "".join(ddoc_path)
+ if exc.message[1][0] == 'unnamed_error':
+ raise errors.MissingDesignDocListFunctionError(path)
+ # other errors are unknown for now
+ raise errors.DesignDocUnknownError(path)
+
+
+class MultipartWriter(object):
+ """
+ A multipart writer adapted from python-couchdb's one so we can PUT
+ documents using couch's multipart PUT.
+
+ This stripped down version does not allow for nested structures, and
+ contains only the essential things we need to PUT SoledadDocuments to the
+ couch backend.
+ """
+
+ CRLF = '\r\n'
+
+ def __init__(self, fileobj, headers=None, boundary=None):
+ """
+ Initialize the multipart writer.
+ """
+ self.fileobj = fileobj
+ if boundary is None:
+ boundary = self._make_boundary()
+ self._boundary = boundary
+ self._build_headers('related', headers)
+
+ def add(self, mimetype, content, headers={}):
+ """
+ Add a part to the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ self.fileobj.write(self.CRLF)
+ headers['Content-Type'] = mimetype
+ self._write_headers(headers)
+ if content:
+ # XXX: throw an exception if a boundary appears in the content??
+ self.fileobj.write(content)
+ self.fileobj.write(self.CRLF)
+
+ def close(self):
+ """
+ Close the multipart stream.
+ """
+ self.fileobj.write('--')
+ self.fileobj.write(self._boundary)
+ # be careful not to have anything after '--', otherwise old couch
+ # versions (including bigcouch) will fail.
+ self.fileobj.write('--')
+
+ def _make_boundary(self):
+ """
+ Create a boundary to discern multi parts.
+ """
+ try:
+ from uuid import uuid4
+ return '==' + uuid4().hex + '=='
+ except ImportError:
+ from random import randrange
+ token = randrange(sys.maxint)
+ format = '%%0%dd' % len(repr(sys.maxint - 1))
+ return '===============' + (format % token) + '=='
+
+ def _write_headers(self, headers):
+ """
+ Write a part header in the buffer stream.
+ """
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.fileobj.write(name)
+ self.fileobj.write(': ')
+ self.fileobj.write(value)
+ self.fileobj.write(self.CRLF)
+ self.fileobj.write(self.CRLF)
+
+ def _build_headers(self, subtype, headers):
+ """
+ Build the main headers of the multipart stream.
+
+ This is here so we can send headers separete from content using
+ python-couchdb API.
+ """
+ self.headers = {}
+ self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \
+ (subtype, self._boundary)
+ if headers:
+ for name in sorted(headers.keys()):
+ value = headers[name]
+ self.headers[name] = value
+
+
class CouchDatabase(CommonBackend):
"""
A U1DB implementation that uses CouchDB as its persistence layer.
"""
+ # We spawn threads to parallelize the CouchDatabase.get_docs() method
+ MAX_GET_DOCS_THREADS = 20
+
+ update_handler_lock = defaultdict(threading.Lock)
+
+ class _GetDocThread(threading.Thread):
+ """
+ A thread that gets a document from a database.
+
+ TODO: switch this for a twisted deferred to thread. This depends on
+ replacing python-couchdb for paisley in this module.
+ """
+
+ def __init__(self, db, doc_id, check_for_conflicts,
+ release_fun):
+ """
+ :param db: The database from where to get the document.
+ :type db: u1db.Database
+ :param doc_id: The doc_id of the document to be retrieved.
+ :type doc_id: str
+ :param check_for_conflicts: Whether the get_doc() method should
+ check for existing conflicts.
+ :type check_for_conflicts: bool
+ :param release_fun: A function that releases a semaphore, to be
+ called after the document is fetched.
+ :type release_fun: function
+ """
+ threading.Thread.__init__(self)
+ self._db = db
+ self._doc_id = doc_id
+ self._check_for_conflicts = check_for_conflicts
+ self._release_fun = release_fun
+ self._doc = None
+
+ def run(self):
+ """
+ Fetch the document, store it as a property, and call the release
+ function.
+ """
+ self._doc = self._db._get_doc(
+ self._doc_id, self._check_for_conflicts)
+ self._release_fun()
+
@classmethod
- def open_database(cls, url, create):
+ def open_database(cls, url, create, ensure_ddocs=False):
"""
Open a U1DB database using CouchDB as backend.
@@ -167,6 +388,8 @@ class CouchDatabase(CommonBackend):
:type url: str
:param create: should the replica be created if it does not exist?
:type create: bool
+ :param ensure_ddocs: Ensure that the design docs exist on server.
+ :type ensure_ddocs: bool
:return: the database instance
:rtype: CouchDatabase
@@ -182,8 +405,8 @@ class CouchDatabase(CommonBackend):
server[dbname]
except ResourceNotFound:
if not create:
- raise errors.DatabaseDoesNotExist()
- return cls(url, dbname)
+ raise DatabaseDoesNotExist()
+ return cls(url, dbname, ensure_ddocs=ensure_ddocs)
def __init__(self, url, dbname, replica_uid=None, full_commit=True,
session=None, ensure_ddocs=True):
@@ -206,6 +429,8 @@ class CouchDatabase(CommonBackend):
# save params
self._url = url
self._full_commit = full_commit
+ if session is None:
+ session = Session(timeout=COUCH_TIMEOUT)
self._session = session
self._factory = CouchDocument
self._real_replica_uid = None
@@ -223,6 +448,9 @@ class CouchDatabase(CommonBackend):
self._set_replica_uid(replica_uid)
if ensure_ddocs:
self.ensure_ddocs_on_db()
+ # initialize a thread pool for parallelizing get_docs()
+ self._sem_pool = threading.BoundedSemaphore(
+ value=self.MAX_GET_DOCS_THREADS)
def ensure_ddocs_on_db(self):
"""
@@ -318,12 +546,31 @@ class CouchDatabase(CommonBackend):
:return: The current generation.
:rtype: int
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch list function
- res = self._database.resource(
- '_design', 'transactions', '_list', 'generation', 'log')
- response = res.get_json()
- return response[2]['generation']
+ ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json()
+ return response[2]['generation']
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
def _get_generation_info(self):
"""
@@ -331,12 +578,31 @@ class CouchDatabase(CommonBackend):
:return: A tuple containing the current generation and transaction id.
:rtype: (int, str)
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch list function
- res = self._database.resource(
- '_design', 'transactions', '_list', 'generation', 'log')
- response = res.get_json()
- return (response[2]['generation'], response[2]['transaction_id'])
+ ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json()
+ return (response[2]['generation'], response[2]['transaction_id'])
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
def _get_trans_id_for_gen(self, generation):
"""
@@ -349,16 +615,36 @@ class CouchDatabase(CommonBackend):
:rtype: str
:raise InvalidGeneration: Raised when the generation does not exist.
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
if generation == 0:
return ''
# query a couch list function
- res = self._database.resource(
- '_design', 'transactions', '_list', 'trans_id_for_gen', 'log')
- response = res.get_json(gen=generation)
- if response[2] == {}:
- raise errors.InvalidGeneration
- return response[2]['transaction_id']
+ ddoc_path = [
+ '_design', 'transactions', '_list', 'trans_id_for_gen', 'log'
+ ]
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json(gen=generation)
+ if response[2] == {}:
+ raise InvalidGeneration
+ return response[2]['transaction_id']
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
def _get_transaction_log(self):
"""
@@ -366,12 +652,31 @@ class CouchDatabase(CommonBackend):
:return: The complete transaction log.
:rtype: [(str, str)]
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch view
- res = self._database.resource(
- '_design', 'transactions', '_view', 'log')
- response = res.get_json()
- return map(lambda row: (row['id'], row['value']), response[2]['rows'])
+ ddoc_path = ['_design', 'transactions', '_view', 'log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json()
+ return map(
+ lambda row: (row['id'], row['value']),
+ response[2]['rows'])
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
def _get_doc(self, doc_id, check_for_conflicts=False):
"""
@@ -413,8 +718,15 @@ class CouchDatabase(CommonBackend):
and '_attachments' in result \
and 'u1db_conflicts' in result['_attachments']:
doc.has_conflicts = True
+ doc.set_conflicts(
+ self._build_conflicts(
+ doc.doc_id,
+ json.loads(binascii.a2b_base64(
+ result['_attachments']['u1db_conflicts']['data']))))
# store couch revision
doc.couch_rev = result['_rev']
+ # store transactions
+ doc.transactions = result['u1db_transactions']
return doc
def get_doc(self, doc_id, include_deleted=False):
@@ -465,6 +777,10 @@ class CouchDatabase(CommonBackend):
"""
Put the document in the Couch backend database.
+ Note that C{old_doc} must have been fetched with the parameter
+ C{check_for_conflicts} equal to True, so we can properly update the
+ new document using the conflict information from the old one.
+
:param old_doc: The old document version.
:type old_doc: CouchDocument
:param doc: The document to be put.
@@ -472,43 +788,76 @@ class CouchDatabase(CommonBackend):
:raise RevisionConflict: Raised when trying to update a document but
couch revisions mismatch.
- """
- trans_id = self._allocate_transaction_id()
- # encode content
- content = doc.get_json()
- if content is not None:
- content = binascii.b2a_base64(content)[:-1] # exclude trailing \n
- # encode conflicts
- conflicts = None
- update_conflicts = doc.modified_conflicts()
- if update_conflicts is True:
- if doc.has_conflicts:
- conflicts = binascii.b2a_base64(
- json.dumps(
- map(lambda cdoc: (cdoc.rev, cdoc.content),
- doc.get_conflicts()))
- )[:-1] # exclude \n
- # perform the request
- resource = self._database.resource(
- '_design', 'docs', '_update', 'put', doc.doc_id)
- response = resource.put_json(
- body={
- 'couch_rev': old_doc.couch_rev
- if old_doc is not None else None,
- 'u1db_rev': doc.rev,
- 'content': content,
- 'trans_id': trans_id,
- 'conflicts': conflicts,
- 'update_conflicts': update_conflicts,
- },
- headers={'content-type': 'application/json'})
- # the document might have been updated in between, so we check for the
- # return message
- msg = response[2].read()
- if msg == 'ok':
- return
- elif msg == 'revision conflict':
- raise errors.RevisionConflict()
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ attachments = {} # we save content and conflicts as attachments
+ parts = [] # and we put it using couch's multipart PUT
+ # save content as attachment
+ if doc.is_tombstone() is False:
+ content = doc.get_json()
+ attachments['u1db_content'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(content),
+ }
+ parts.append(content)
+ # save conflicts as attachment
+ if doc.has_conflicts is True:
+ conflicts = json.dumps(
+ map(lambda cdoc: (cdoc.rev, cdoc.content),
+ doc.get_conflicts()))
+ attachments['u1db_conflicts'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(conflicts),
+ }
+ parts.append(conflicts)
+ # store old transactions, if any
+ transactions = old_doc.transactions[:] if old_doc is not None else []
+ # create a new transaction id and timestamp it so the transaction log
+ # is consistent when querying the database.
+ transactions.append(
+ # here we store milliseconds to keep consistent with javascript
+ # Date.prototype.getTime() which was used before inside a couchdb
+ # update handler.
+ (int(time.time() * 1000),
+ self._allocate_transaction_id()))
+ # build the couch document
+ couch_doc = {
+ '_id': doc.doc_id,
+ 'u1db_rev': doc.rev,
+ 'u1db_transactions': transactions,
+ '_attachments': attachments,
+ }
+ # if we are updating a doc we have to add the couch doc revision
+ if old_doc is not None:
+ couch_doc['_rev'] = old_doc.couch_rev
+ # prepare the multipart PUT
+ buf = StringIO()
+ envelope = MultipartWriter(buf)
+ envelope.add('application/json', json.dumps(couch_doc))
+ for part in parts:
+ envelope.add('application/octet-stream', part)
+ envelope.close()
+ # try to save and fail if there's a revision conflict
+ try:
+ self._database.resource.put_json(
+ doc.doc_id, body=buf.getvalue(), headers=envelope.headers)
+ self._renew_couch_session()
+ except ResourceConflict:
+ raise RevisionConflict()
def put_doc(self, doc):
"""
@@ -522,26 +871,26 @@ class CouchDatabase(CommonBackend):
:return: new_doc_rev - The new revision identifier for the document.
The Document object will also be updated.
- :raise errors.InvalidDocId: Raised if the document's id is invalid.
- :raise errors.DocumentTooBig: Raised if the document size is too big.
- :raise errors.ConflictedDoc: Raised if the document has conflicts.
+ :raise InvalidDocId: Raised if the document's id is invalid.
+ :raise DocumentTooBig: Raised if the document size is too big.
+ :raise ConflictedDoc: Raised if the document has conflicts.
"""
if doc.doc_id is None:
- raise errors.InvalidDocId()
+ raise InvalidDocId()
self._check_doc_id(doc.doc_id)
self._check_doc_size(doc)
old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
if old_doc and old_doc.has_conflicts:
- raise errors.ConflictedDoc()
+ raise ConflictedDoc()
if old_doc and doc.rev is None and old_doc.is_tombstone():
new_rev = self._allocate_doc_rev(old_doc.rev)
else:
if old_doc is not None:
if old_doc.rev != doc.rev:
- raise errors.RevisionConflict()
+ raise RevisionConflict()
else:
if doc.rev is not None:
- raise errors.RevisionConflict()
+ raise RevisionConflict()
new_rev = self._allocate_doc_rev(doc.rev)
doc.rev = new_rev
self._put_doc(old_doc, doc)
@@ -563,32 +912,53 @@ class CouchDatabase(CommonBackend):
to the last intervening change and sorted by generation (old
changes first)
:rtype: (int, str, [(str, int, str)])
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch list function
- res = self._database.resource(
- '_design', 'transactions', '_list', 'whats_changed', 'log')
- response = res.get_json(old_gen=old_generation)
- results = map(
- lambda row:
- (row['generation'], row['doc_id'], row['transaction_id']),
- response[2]['transactions'])
- results.reverse()
- cur_gen = old_generation
- seen = set()
- changes = []
- newest_trans_id = ''
- for generation, doc_id, trans_id in results:
- if doc_id not in seen:
- changes.append((doc_id, generation, trans_id))
- seen.add(doc_id)
- if changes:
- cur_gen = changes[0][1] # max generation
- newest_trans_id = changes[0][2]
- changes.reverse()
- else:
- cur_gen, newest_trans_id = self._get_generation_info()
+ ddoc_path = [
+ '_design', 'transactions', '_list', 'whats_changed', 'log'
+ ]
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json(old_gen=old_generation)
+ results = map(
+ lambda row:
+ (row['generation'], row['doc_id'], row['transaction_id']),
+ response[2]['transactions'])
+ results.reverse()
+ cur_gen = old_generation
+ seen = set()
+ changes = []
+ newest_trans_id = ''
+ for generation, doc_id, trans_id in results:
+ if doc_id not in seen:
+ changes.append((doc_id, generation, trans_id))
+ seen.add(doc_id)
+ if changes:
+ cur_gen = changes[0][1] # max generation
+ newest_trans_id = changes[0][2]
+ changes.reverse()
+ else:
+ cur_gen, newest_trans_id = self._get_generation_info()
- return cur_gen, newest_trans_id, changes
+ return cur_gen, newest_trans_id, changes
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
def delete_doc(self, doc):
"""
@@ -600,28 +970,47 @@ class CouchDatabase(CommonBackend):
:param doc: The document to mark as deleted.
:type doc: CouchDocument.
- :raise errors.DocumentDoesNotExist: Raised if the document does not
+ :raise DocumentDoesNotExist: Raised if the document does not
exist.
- :raise errors.RevisionConflict: Raised if the revisions do not match.
- :raise errors.DocumentAlreadyDeleted: Raised if the document is
+ :raise RevisionConflict: Raised if the revisions do not match.
+ :raise DocumentAlreadyDeleted: Raised if the document is
already deleted.
- :raise errors.ConflictedDoc: Raised if the doc has conflicts.
+ :raise ConflictedDoc: Raised if the doc has conflicts.
"""
old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
if old_doc is None:
- raise errors.DocumentDoesNotExist
+ raise DocumentDoesNotExist
if old_doc.rev != doc.rev:
- raise errors.RevisionConflict()
+ raise RevisionConflict()
if old_doc.is_tombstone():
- raise errors.DocumentAlreadyDeleted
+ raise DocumentAlreadyDeleted
if old_doc.has_conflicts:
- raise errors.ConflictedDoc()
+ raise ConflictedDoc()
new_rev = self._allocate_doc_rev(doc.rev)
doc.rev = new_rev
doc.make_tombstone()
self._put_doc(old_doc, doc)
return new_rev
+ def _build_conflicts(self, doc_id, attached_conflicts):
+ """
+ Build the conflicted documents list from the conflicts attachment
+ fetched from a couch document.
+
+ :param attached_conflicts: The document's conflicts as fetched from a
+ couch document attachment.
+ :type attached_conflicts: dict
+ """
+ conflicts = []
+ for doc_rev, content in attached_conflicts:
+ doc = self._factory(doc_id, doc_rev)
+ if content is None:
+ doc.make_tombstone()
+ else:
+ doc.content = content
+ conflicts.append(doc)
+ return conflicts
+
def _get_conflicts(self, doc_id, couch_rev=None):
"""
Get the conflicted versions of a document.
@@ -642,16 +1031,8 @@ class CouchDatabase(CommonBackend):
resource = self._database.resource(doc_id, 'u1db_conflicts')
try:
response = resource.get_json(**params)
- conflicts = []
- # build the conflicted versions
- for doc_rev, content in json.loads(response[2].read()):
- doc = self._factory(doc_id, doc_rev)
- if content is None:
- doc.make_tombstone()
- else:
- doc.content = content
- conflicts.append(doc)
- return conflicts
+ return self._build_conflicts(
+ doc_id, json.loads(response[2].read()))
except ResourceNotFound:
return []
@@ -737,17 +1118,35 @@ class CouchDatabase(CommonBackend):
:param other_transaction_id: The transaction id associated with the
generation.
:type other_transaction_id: str
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
# query a couch update function
- res = self._database.resource(
- '_design', 'syncs', '_update', 'put', 'u1db_sync_log')
- res.put_json(
- body={
- 'other_replica_uid': other_replica_uid,
- 'other_generation': other_generation,
- 'other_transaction_id': other_transaction_id,
- },
- headers={'content-type': 'application/json'})
+ ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ with CouchDatabase.update_handler_lock[self._get_replica_uid()]:
+ res.put_json(
+ body={
+ 'other_replica_uid': other_replica_uid,
+ 'other_generation': other_generation,
+ 'other_transaction_id': other_transaction_id,
+ },
+ headers={'content-type': 'application/json'})
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
def _add_conflict(self, doc, my_doc_rev, my_content):
"""
@@ -765,7 +1164,7 @@ class CouchDatabase(CommonBackend):
serialized string.
:type my_content: str
"""
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
doc.add_conflict(
self._factory(doc_id=doc.doc_id, rev=my_doc_rev,
json=my_content))
@@ -774,7 +1173,7 @@ class CouchDatabase(CommonBackend):
"""
Delete the conflicted revisions from the list of conflicts of C{doc}.
- Note that thie method does not actually update the backed; rather, it
+ Note that this method does not actually update the backend; rather, it
updates the CouchDocument object which will provide the conflict data
when the atomic document update is made.
@@ -783,7 +1182,7 @@ class CouchDatabase(CommonBackend):
:param conflict_revs: A list of the revisions to be deleted.
:param conflict_revs: [str]
"""
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
doc.delete_conflicts(conflict_revs)
def _prune_conflicts(self, doc, doc_vcr):
@@ -842,34 +1241,44 @@ class CouchDatabase(CommonBackend):
:param conflicted_doc_revs: A list of revisions that the new content
supersedes.
:type conflicted_doc_revs: [str]
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
"""
cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
new_rev = self._ensure_maximal_rev(cur_doc.rev,
conflicted_doc_revs)
superseded_revs = set(conflicted_doc_revs)
doc.rev = new_rev
+ # this backend stores conflicts as properties of the documents, so we
+ # have to copy these conflicts over to the document being updated.
if cur_doc.rev in superseded_revs:
+ # the newer doc version will supersede the one in the database, so
+ # we copy conflicts before updating the backend.
+ doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over.
self._delete_conflicts(doc, superseded_revs)
self._put_doc(cur_doc, doc)
else:
- self._add_conflict(doc, new_rev, doc.get_json())
- self._delete_conflicts(doc, superseded_revs)
- # perform request to resolve document in server
- resource = self._database.resource(
- '_design', 'docs', '_update', 'resolve_doc', doc.doc_id)
- conflicts = None
- if doc.has_conflicts:
- conflicts = binascii.b2a_base64(
- json.dumps(
- map(lambda cdoc: (cdoc.rev, cdoc.content),
- doc.get_conflicts()))
- )[:-1] # exclude \n
- response = resource.put_json(
- body={
- 'couch_rev': cur_doc.couch_rev,
- 'conflicts': conflicts,
- },
- headers={'content-type': 'application/json'})
+ # the newer doc version does not supersede the one in the
+ # database, so we will add a conflict to the database and copy
+ # those over to the document the user has in her hands.
+ self._add_conflict(cur_doc, new_rev, doc.get_json())
+ self._delete_conflicts(cur_doc, superseded_revs)
+ self._put_doc(cur_doc, cur_doc) # just update conflicts
+ # backend has been updated with current conflicts, now copy them
+ # to the current document.
+ doc.set_conflicts(cur_doc.get_conflicts())
def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,
replica_trans_id=''):
@@ -926,7 +1335,7 @@ class CouchDatabase(CommonBackend):
if cur_doc is not None:
doc.couch_rev = cur_doc.couch_rev
# fetch conflicts because we will eventually manipulate them
- doc.ensure_fetch_conflicts(self._get_conflicts)
+ doc._ensure_fetch_conflicts(self._get_conflicts)
# from now on, it works just like u1db sqlite backend
doc_vcr = vectorclock.VectorClockRev(doc.rev)
if cur_doc is None:
@@ -963,7 +1372,7 @@ class CouchDatabase(CommonBackend):
if save_conflict:
self._force_doc_sync_conflict(doc)
if replica_uid is not None and replica_gen is not None:
- self._do_set_replica_gen_and_trans_id(
+ self._set_replica_gen_and_trans_id(
replica_uid, replica_gen, replica_trans_id)
# update info
old_doc.rev = doc.rev
@@ -974,6 +1383,59 @@ class CouchDatabase(CommonBackend):
old_doc.has_conflicts = doc.has_conflicts
return state, self._get_generation()
+ def get_docs(self, doc_ids, check_for_conflicts=True,
+ include_deleted=False):
+ """
+ Get the JSON content for many documents.
+
+ :param doc_ids: A list of document identifiers.
+ :type doc_ids: list
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped, and 'None' will be
+ returned instead of True/False.
+ :type check_for_conflictsa: bool
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :return: iterable giving the Document object for each document id
+ in matching doc_ids order.
+ :rtype: iterable
+ """
+ # Workaround for:
+ #
+ # http://bugs.python.org/issue7980
+ # https://leap.se/code/issues/5449
+ #
+ # python-couchdb uses time.strptime, which is not thread safe. In
+ # order to avoid the problem described on the issues above, we preload
+ # strptime here by evaluating the conversion of an arbitrary date.
+ # This will not be needed when/if we switch from python-couchdb to
+ # paisley.
+ time.strptime('Mar 4 1917', '%b %d %Y')
+ # spawn threads to retrieve docs
+ threads = []
+ for doc_id in doc_ids:
+ self._sem_pool.acquire()
+ t = self._GetDocThread(self, doc_id, check_for_conflicts,
+ self._sem_pool.release)
+ t.start()
+ threads.append(t)
+ # join threads and yield docs
+ for t in threads:
+ t.join()
+ if t._doc.is_tombstone() and not include_deleted:
+ continue
+ yield t._doc
+
+ def _renew_couch_session(self):
+ """
+ Create a new couch connection session.
+
+ This is a workaround for #5448. Will not be needed once bigcouch is
+ merged with couchdb.
+ """
+ self._database.resource.session = Session(timeout=COUCH_TIMEOUT)
+
class CouchSyncTarget(CommonSyncTarget):
"""
@@ -997,14 +1459,6 @@ class CouchSyncTarget(CommonSyncTarget):
source_replica_transaction_id)
-class NotEnoughCouchPermissions(Exception):
- """
- Raised when failing to assert for enough permissions on underlying Couch
- Database.
- """
- pass
-
-
class CouchServerState(ServerState):
"""
Inteface of the WSGI server with the CouchDB backend.
@@ -1024,65 +1478,6 @@ class CouchServerState(ServerState):
self._couch_url = couch_url
self._shared_db_name = shared_db_name
self._tokens_db_name = tokens_db_name
- try:
- self._check_couch_permissions()
- except NotEnoughCouchPermissions:
- logger.error("Not enough permissions on underlying couch "
- "database (%s)." % self._couch_url)
- except (socket.error, socket.gaierror, socket.herror,
- socket.timeout), e:
- logger.error("Socket problem while trying to reach underlying "
- "couch database: (%s, %s)." %
- (self._couch_url, e))
-
- def _check_couch_permissions(self):
- """
- Assert that Soledad Server has enough permissions on the underlying
- couch database.
-
- Soledad Server has to be able to do the following in the couch server:
-
- * Create, read and write from/to 'shared' db.
- * Create, read and write from/to 'user-<anything>' dbs.
- * Read from 'tokens' db.
-
- This function tries to perform the actions above using the "low level"
- couch library to ensure that Soledad Server can do everything it needs
- on the underlying couch database.
-
- :param couch_url: The URL of the couch database.
- :type couch_url: str
-
- @raise NotEnoughCouchPermissions: Raised in case there are not enough
- permissions to read/write/create the needed couch databases.
- :rtype: bool
- """
-
- def _open_couch_db(dbname):
- server = Server(url=self._couch_url)
- try:
- server[dbname]
- except ResourceNotFound:
- server.create(dbname)
- return server[dbname]
-
- def _create_delete_test_doc(db):
- doc_id, _ = db.save({'test': 'document'})
- doc = db[doc_id]
- db.delete(doc)
-
- try:
- # test read/write auth for shared db
- _create_delete_test_doc(
- _open_couch_db(self._shared_db_name))
- # test read/write auth for user-<something> db
- _create_delete_test_doc(
- _open_couch_db('%stest-db' % USER_DB_PREFIX))
- # test read auth for tokens db
- tokensdb = _open_couch_db(self._tokens_db_name)
- tokensdb.info()
- except Unauthorized:
- raise NotEnoughCouchPermissions(self._couch_url)
def open_database(self, dbname):
"""
@@ -1094,25 +1489,29 @@ class CouchServerState(ServerState):
:return: The CouchDatabase object.
:rtype: CouchDatabase
"""
- # TODO: open couch
return CouchDatabase.open_database(
self._couch_url + '/' + dbname,
- create=False)
+ create=False,
+ ensure_ddocs=False)
def ensure_database(self, dbname):
"""
Ensure couch database exists.
+ Usually, this method is used by the server to ensure the existence of
+ a database. In our setup, the Soledad user that accesses the underlying
+ couch server should never have permission to create (or delete)
+ databases. But, in case it ever does, by raising an exception here we
+ have one more guarantee that no modified client will be able to
+ enforce creation of a database when syncing.
+
:param dbname: The name of the database to ensure.
:type dbname: str
:return: The CouchDatabase object and the replica uid.
:rtype: (CouchDatabase, str)
"""
- db = CouchDatabase.open_database(
- self._couch_url + '/' + dbname,
- create=True)
- return db, db._replica_uid
+ raise Unauthorized()
def delete_database(self, dbname):
"""
@@ -1121,7 +1520,7 @@ class CouchServerState(ServerState):
:param dbname: The name of the database to delete.
:type dbname: str
"""
- CouchDatabase.delete_database(self._couch_url + '/' + dbname)
+ raise Unauthorized()
def _set_couch_url(self, url):
"""
diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js
deleted file mode 100644
index 5a4647de..00000000
--- a/common/src/leap/soledad/common/ddocs/docs/updates/put.js
+++ /dev/null
@@ -1,64 +0,0 @@
-function(doc, req){
- /* we expect to receive the following in `req.body`:
- * {
- * 'couch_rev': '<couch_rev>',
- * 'u1db_rev': '<u1db_rev>',
- * 'content': '<base64 encoded content>',
- * 'trans_id': '<reansaction_id>'
- * 'conflicts': '<base64 encoded conflicts>',
- * 'update_conflicts': <boolean>
- * }
- */
- var body = JSON.parse(req.body);
-
- // create a new document document
- if (!doc) {
- doc = {}
- doc['_id'] = req['id'];
- }
- // or fail if couch revisions do not match
- else if (doc['_rev'] != body['couch_rev']) {
- // of fail if revisions do not match
- return [null, 'revision conflict']
- }
-
- // store u1db rev
- doc.u1db_rev = body['u1db_rev'];
-
- // save content as attachment
- if (body['content'] != null) {
- // save u1db content as attachment
- if (!doc._attachments)
- doc._attachments = {};
- doc._attachments.u1db_content = {
- content_type: "application/octet-stream",
- data: body['content'] // should be base64 encoded
- };
- }
- // or delete the attachment if document is tombstone
- else if (doc._attachments &&
- doc._attachments.u1db_content)
- delete doc._attachments.u1db_content;
-
- // store the transaction id
- if (!doc.u1db_transactions)
- doc.u1db_transactions = [];
- var d = new Date();
- doc.u1db_transactions.push([d.getTime(), body['trans_id']]);
-
- // save conflicts as attachment if they were sent
- if (body['update_conflicts'])
- if (body['conflicts'] != null) {
- if (!doc._attachments)
- doc._attachments = {};
- doc._attachments.u1db_conflicts = {
- content_type: "application/octet-stream",
- data: body['conflicts'] // should be base64 encoded
- }
- } else {
- if(doc._attachments && doc._attachments.u1db_conflicts)
- delete doc._attachments.u1db_conflicts
- }
-
- return [doc, 'ok'];
-}
diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js
deleted file mode 100644
index 7ba66cf8..00000000
--- a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js
+++ /dev/null
@@ -1,39 +0,0 @@
-function(doc, req){
- /* we expect to receive the following in `req.body`:
- * {
- * 'couch_rev': '<couch_rev>',
- * 'conflicts': '<base64 encoded conflicts>',
- * }
- */
- var body = JSON.parse(req.body);
-
- // fail if no document was given
- if (!doc) {
- return [null, 'document does not exist']
- }
-
- // fail if couch revisions do not match
- if (body['couch_rev'] != null
- && doc['_rev'] != body['couch_rev']) {
- return [null, 'revision conflict']
- }
-
- // fail if conflicts were not sent
- if (body['conflicts'] == null)
- return [null, 'missing conflicts']
-
- // save conflicts as attachment if they were sent
- if (body['conflicts'] != null) {
- if (!doc._attachments)
- doc._attachments = {};
- doc._attachments.u1db_conflicts = {
- content_type: "application/octet-stream",
- data: body['conflicts'] // should be base64 encoded
- }
- }
- // or delete attachment if there are no conflicts
- else if (doc._attachments && doc._attachments.u1db_conflicts)
- delete doc._attachments.u1db_conflicts;
-
- return [doc, 'ok'];
-}
diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py
index 7c2d7296..3a7eadd2 100644
--- a/common/src/leap/soledad/common/errors.py
+++ b/common/src/leap/soledad/common/errors.py
@@ -25,11 +25,49 @@ from u1db import errors
from u1db.remote import http_errors
+def register_exception(cls):
+ """
+ A small decorator that registers exceptions in u1db maps.
+ """
+ # update u1db "wire description to status" and "wire description to
+ # exception" maps.
+ http_errors.wire_description_to_status.update({
+ cls.wire_description: cls.status})
+ errors.wire_description_to_exc.update({
+ cls.wire_description: cls})
+ # do not modify the exception
+ return cls
+
+
+class SoledadError(errors.U1DBError):
+ """
+ Base Soledad HTTP errors.
+ """
+ pass
+
+
#
-# LockResource: a lock based on a document in the shared database.
+# Authorization errors
#
-class InvalidTokenError(errors.U1DBError):
+@register_exception
+class InvalidAuthTokenError(errors.Unauthorized):
+ """
+ Exception raised when failing to get authorization for some action because
+ the provided token either does not exist in the tokens database, has a
+ distinct structure from the expected one, or is associated with a user
+ with a distinct uuid than the one provided by the client.
+ """
+
+ wire_descrition = "invalid auth token"
+ status = 401
+
+#
+# LockResource errors
+#
+
+@register_exception
+class InvalidTokenError(SoledadError):
"""
Exception raised when trying to unlock shared database with invalid token.
"""
@@ -38,7 +76,8 @@ class InvalidTokenError(errors.U1DBError):
status = 401
-class NotLockedError(errors.U1DBError):
+@register_exception
+class NotLockedError(SoledadError):
"""
Exception raised when trying to unlock shared database when it is not
locked.
@@ -48,7 +87,8 @@ class NotLockedError(errors.U1DBError):
status = 404
-class AlreadyLockedError(errors.U1DBError):
+@register_exception
+class AlreadyLockedError(SoledadError):
"""
Exception raised when trying to lock shared database but it is already
locked.
@@ -57,13 +97,83 @@ class AlreadyLockedError(errors.U1DBError):
wire_description = "lock is locked"
status = 403
-# update u1db "wire description to status" and "wire description to exception"
-# maps.
-for e in [InvalidTokenError, NotLockedError, AlreadyLockedError]:
- http_errors.wire_description_to_status.update({
- e.wire_description: e.status})
- errors.wire_description_to_exc.update({
- e.wire_description: e})
+
+@register_exception
+class LockTimedOutError(SoledadError):
+ """
+ Exception raised when timing out while trying to lock the shared database.
+ """
+
+ wire_description = "lock timed out"
+ status = 408
+
+
+@register_exception
+class CouldNotObtainLockError(SoledadError):
+ """
+ Exception raised when timing out while trying to lock the shared database.
+ """
+
+ wire_description = "error obtaining lock"
+ status = 500
+
+
+#
+# CouchDatabase errors
+#
+
+@register_exception
+class MissingDesignDocError(SoledadError):
+ """
+ Raised when trying to access a missing couch design document.
+ """
+
+ wire_description = "missing design document"
+ status = 500
+
+
+@register_exception
+class MissingDesignDocNamedViewError(SoledadError):
+ """
+ Raised when trying to access a missing named view on a couch design
+ document.
+ """
+
+ wire_description = "missing design document named function"
+ status = 500
+
+
+@register_exception
+class MissingDesignDocListFunctionError(SoledadError):
+ """
+ Raised when trying to access a missing list function on a couch design
+ document.
+ """
+
+ wire_description = "missing design document list function"
+ status = 500
+
+
+@register_exception
+class MissingDesignDocDeletedError(SoledadError):
+ """
+ Raised when trying to access a deleted couch design document.
+ """
+
+ wire_description = "design document was deleted"
+ status = 500
+
+
+@register_exception
+class DesignDocUnknownError(SoledadError):
+ """
+ Raised when trying to access a couch design document and getting an
+ unknown error.
+ """
+
+ wire_description = "missing design document unknown error"
+ status = 500
+
# u1db error statuses also have to be updated
http_errors.ERROR_STATUSES = set(
diff --git a/common/src/leap/soledad/common/tests/couchdb.ini.template b/common/src/leap/soledad/common/tests/couchdb.ini.template
index 217ae201..1fc2205b 100644
--- a/common/src/leap/soledad/common/tests/couchdb.ini.template
+++ b/common/src/leap/soledad/common/tests/couchdb.ini.template
@@ -6,7 +6,7 @@
database_dir = %(tempdir)s/lib
view_index_dir = %(tempdir)s/lib
max_document_size = 4294967296 ; 4 GB
-os_process_timeout = 5000 ; 5 seconds. for view and external servers.
+os_process_timeout = 120000 ; 120 seconds. for view and external servers.
max_dbs_open = 100
delayed_commits = true ; set this to false to ensure an fsync before 201 Created is returned
uri_file = %(tempdir)s/lib/couch.uri
diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py
index 72346333..86bb4b93 100644
--- a/common/src/leap/soledad/common/tests/test_couch.py
+++ b/common/src/leap/soledad/common/tests/test_couch.py
@@ -24,16 +24,17 @@ import re
import copy
import shutil
from base64 import b64decode
+from mock import Mock
from couchdb.client import Server
-from u1db import errors
+from u1db import errors as u1db_errors
from leap.common.files import mkdir_p
from leap.soledad.common.tests import u1db_tests as tests
from leap.soledad.common.tests.u1db_tests import test_backends
from leap.soledad.common.tests.u1db_tests import test_sync
-from leap.soledad.common import couch
+from leap.soledad.common import couch, errors
import simplejson as json
@@ -80,9 +81,10 @@ class CouchDBWrapper(object):
mkdir_p(os.path.join(self.tempdir, 'lib'))
mkdir_p(os.path.join(self.tempdir, 'log'))
args = ['couchdb', '-n', '-a', confPath]
- #null = open('/dev/null', 'w')
+ null = open('/dev/null', 'w')
+
self.process = subprocess.Popen(
- args, env=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ args, env=None, stdout=null.fileno(), stderr=null.fileno(),
close_fds=True)
# find port
logPath = os.path.join(self.tempdir, 'log', 'couch.log')
@@ -125,21 +127,21 @@ class CouchDBTestCase(unittest.TestCase):
TestCase base class for tests against a real CouchDB server.
"""
- def setUp(self):
+ @classmethod
+ def setUpClass(cls):
"""
Make sure we have a CouchDB instance for a test.
"""
- self.wrapper = CouchDBWrapper()
- self.wrapper.start()
+ cls.wrapper = CouchDBWrapper()
+ cls.wrapper.start()
#self.db = self.wrapper.db
- unittest.TestCase.setUp(self)
- def tearDown(self):
+ @classmethod
+ def tearDownClass(cls):
"""
Stop CouchDB instance for test.
"""
- self.wrapper.stop()
- unittest.TestCase.tearDown(self)
+ cls.wrapper.stop()
#-----------------------------------------------------------------------------
@@ -356,7 +358,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):
def __init__(self, url, dbname, replica_uid=None, full_commit=True,
session=None, ensure_ddocs=True):
old_class.__init__(self, url, dbname, replica_uid, full_commit,
- session, ensure_ddocs=True)
+ session, ensure_ddocs=ensure_ddocs)
self._indexes = {}
def _put_doc(self, old_doc, doc):
@@ -372,7 +374,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):
if self._indexes[index_name]._definition == list(
index_expressions):
return
- raise errors.IndexNameTakenError
+ raise u1db_errors.IndexNameTakenError
index = InMemoryIndex(index_name, list(index_expressions))
_, all_docs = self.get_all_docs()
for doc in all_docs:
@@ -392,7 +394,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):
try:
index = self._indexes[index_name]
except KeyError:
- raise errors.IndexDoesNotExist
+ raise u1db_errors.IndexDoesNotExist
doc_ids = index.lookup(key_values)
result = []
for doc_id in doc_ids:
@@ -405,7 +407,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):
try:
index = self._indexes[index_name]
except KeyError:
- raise errors.IndexDoesNotExist
+ raise u1db_errors.IndexDoesNotExist
if isinstance(start_value, basestring):
start_value = (start_value,)
if isinstance(end_value, basestring):
@@ -420,7 +422,7 @@ class IndexedCouchDatabase(couch.CouchDatabase):
try:
index = self._indexes[index_name]
except KeyError:
- raise errors.IndexDoesNotExist
+ raise u1db_errors.IndexDoesNotExist
keys = index.keys()
# XXX inefficiency warning
return list(set([tuple(key.split('\x01')) for key in keys]))
@@ -461,4 +463,184 @@ class CouchDatabaseSyncTests(test_sync.DatabaseSyncTests, CouchDBTestCase):
test_sync.DatabaseSyncTests.tearDown(self)
+class CouchDatabaseExceptionsTests(CouchDBTestCase):
+
+ def setUp(self):
+ CouchDBTestCase.setUp(self)
+ self.db = couch.CouchDatabase(
+ 'http://127.0.0.1:%d' % self.wrapper.port, 'test',
+ ensure_ddocs=False) # note that we don't enforce ddocs here
+
+ def tearDown(self):
+ self.db.delete_database()
+
+ def test_missing_design_doc_raises(self):
+ """
+ Test that all methods that access design documents will raise if the
+ design docs are not present.
+ """
+ # _get_generation()
+ self.assertRaises(
+ errors.MissingDesignDocError,
+ self.db._get_generation)
+ # _get_generation_info()
+ self.assertRaises(
+ errors.MissingDesignDocError,
+ self.db._get_generation_info)
+ # _get_trans_id_for_gen()
+ self.assertRaises(
+ errors.MissingDesignDocError,
+ self.db._get_trans_id_for_gen, 1)
+ # _get_transaction_log()
+ self.assertRaises(
+ errors.MissingDesignDocError,
+ self.db._get_transaction_log)
+ # whats_changed()
+ self.assertRaises(
+ errors.MissingDesignDocError,
+ self.db.whats_changed)
+ # _do_set_replica_gen_and_trans_id()
+ self.assertRaises(
+ errors.MissingDesignDocError,
+ self.db._do_set_replica_gen_and_trans_id, 1, 2, 3)
+
+ def test_missing_design_doc_functions_raises(self):
+ """
+ Test that all methods that access design documents list functions
+ will raise if the functions are not present.
+ """
+ self.db = couch.CouchDatabase(
+ 'http://127.0.0.1:%d' % self.wrapper.port, 'test',
+ ensure_ddocs=True)
+ # erase views from _design/transactions
+ transactions = self.db._database['_design/transactions']
+ transactions['lists'] = {}
+ self.db._database.save(transactions)
+ # _get_generation()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db._get_generation)
+ # _get_generation_info()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db._get_generation_info)
+ # _get_trans_id_for_gen()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db._get_trans_id_for_gen, 1)
+ # whats_changed()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db.whats_changed)
+
+ def test_absent_design_doc_functions_raises(self):
+ """
+ Test that all methods that access design documents list functions
+ will raise if the functions are not present.
+ """
+ self.db = couch.CouchDatabase(
+ 'http://127.0.0.1:%d' % self.wrapper.port, 'test',
+ ensure_ddocs=True)
+ # erase views from _design/transactions
+ transactions = self.db._database['_design/transactions']
+ del transactions['lists']
+ self.db._database.save(transactions)
+ # _get_generation()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db._get_generation)
+ # _get_generation_info()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db._get_generation_info)
+ # _get_trans_id_for_gen()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db._get_trans_id_for_gen, 1)
+ # whats_changed()
+ self.assertRaises(
+ errors.MissingDesignDocListFunctionError,
+ self.db.whats_changed)
+
+ def test_missing_design_doc_named_views_raises(self):
+ """
+ Test that all methods that access design documents' named views will
+ raise if the views are not present.
+ """
+ self.db = couch.CouchDatabase(
+ 'http://127.0.0.1:%d' % self.wrapper.port, 'test',
+ ensure_ddocs=True)
+ # erase views from _design/docs
+ docs = self.db._database['_design/docs']
+ del docs['views']
+ self.db._database.save(docs)
+ # erase views from _design/syncs
+ syncs = self.db._database['_design/syncs']
+ del syncs['views']
+ self.db._database.save(syncs)
+ # erase views from _design/transactions
+ transactions = self.db._database['_design/transactions']
+ del transactions['views']
+ self.db._database.save(transactions)
+ # _get_generation()
+ self.assertRaises(
+ errors.MissingDesignDocNamedViewError,
+ self.db._get_generation)
+ # _get_generation_info()
+ self.assertRaises(
+ errors.MissingDesignDocNamedViewError,
+ self.db._get_generation_info)
+ # _get_trans_id_for_gen()
+ self.assertRaises(
+ errors.MissingDesignDocNamedViewError,
+ self.db._get_trans_id_for_gen, 1)
+ # _get_transaction_log()
+ self.assertRaises(
+ errors.MissingDesignDocNamedViewError,
+ self.db._get_transaction_log)
+ # whats_changed()
+ self.assertRaises(
+ errors.MissingDesignDocNamedViewError,
+ self.db.whats_changed)
+
+ def test_deleted_design_doc_raises(self):
+ """
+ Test that all methods that access design documents will raise if the
+ design docs are not present.
+ """
+ self.db = couch.CouchDatabase(
+ 'http://127.0.0.1:%d' % self.wrapper.port, 'test',
+ ensure_ddocs=True)
+ # delete _design/docs
+ del self.db._database['_design/docs']
+ # delete _design/syncs
+ del self.db._database['_design/syncs']
+ # delete _design/transactions
+ del self.db._database['_design/transactions']
+ # _get_generation()
+ self.assertRaises(
+ errors.MissingDesignDocDeletedError,
+ self.db._get_generation)
+ # _get_generation_info()
+ self.assertRaises(
+ errors.MissingDesignDocDeletedError,
+ self.db._get_generation_info)
+ # _get_trans_id_for_gen()
+ self.assertRaises(
+ errors.MissingDesignDocDeletedError,
+ self.db._get_trans_id_for_gen, 1)
+ # _get_transaction_log()
+ self.assertRaises(
+ errors.MissingDesignDocDeletedError,
+ self.db._get_transaction_log)
+ # whats_changed()
+ self.assertRaises(
+ errors.MissingDesignDocDeletedError,
+ self.db.whats_changed)
+ # _do_set_replica_gen_and_trans_id()
+ self.assertRaises(
+ errors.MissingDesignDocDeletedError,
+ self.db._do_set_replica_gen_and_trans_id, 1, 2, 3)
+
+
load_tests = tests.load_with_scenarios
diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
index a0c473b1..3c457cc5 100644
--- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
+++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
@@ -33,11 +33,17 @@ from leap.soledad.common.tests.test_target import (
make_leap_document_for_test,
token_leap_sync_target,
)
+from leap.soledad.common.tests.test_server import _couch_ensure_database
REPEAT_TIMES = 20
+# monkey path CouchServerState so it can ensure databases.
+
+CouchServerState.ensure_database = _couch_ensure_database
+
+
class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
@staticmethod
@@ -100,6 +106,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
def tearDown(self):
+ self.db.delete_database()
CouchDBTestCase.tearDown(self)
TestCaseWithServer.tearDown(self)
@@ -337,3 +344,46 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
self.assertEqual(
1,
len(filter(lambda t: t[0] == doc_id, transaction_log)))
+
+ def test_concurrent_syncs_do_not_fail(self):
+ """
+ Assert that concurrent attempts to sync end up being executed
+ sequentially and do not fail.
+ """
+ threads = []
+ docs = []
+ pool = threading.BoundedSemaphore(value=1)
+ self.startServer()
+ sol = self._soledad_instance(
+ auth_token='auth-token',
+ server_url=self.getURL())
+
+ def _run_method(self):
+ # create a lot of documents
+ doc = self._params['sol'].create_doc({})
+ # do the sync!
+ sol.sync()
+ pool.acquire()
+ docs.append(doc.doc_id)
+ pool.release()
+
+ # launch threads to create documents in parallel
+ for i in range(0, REPEAT_TIMES):
+ thread = self._WorkerThread(
+ {'sol': sol, 'syncs': i},
+ _run_method)
+ thread.start()
+ threads.append(thread)
+
+ # wait for threads to finish
+ for thread in threads:
+ thread.join()
+
+ transaction_log = self.db._get_transaction_log()
+ self.assertEqual(REPEAT_TIMES, len(transaction_log))
+ # assert all documents are in the remote log
+ self.assertEqual(REPEAT_TIMES, len(docs))
+ for doc_id in docs:
+ self.assertEqual(
+ 1,
+ len(filter(lambda t: t[0] == doc_id, transaction_log)))
diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py
index 83df192b..f8d2a64f 100644
--- a/common/src/leap/soledad/common/tests/test_server.py
+++ b/common/src/leap/soledad/common/tests/test_server.py
@@ -25,6 +25,7 @@ import tempfile
import simplejson as json
import mock
import time
+import binascii
from leap.common.testing.basetest import BaseLeapTest
@@ -50,6 +51,17 @@ from leap.soledad.server import SoledadApp, LockResource
from leap.soledad.server.auth import URLToAuthorization
+# monkey path CouchServerState so it can ensure databases.
+
+def _couch_ensure_database(self, dbname):
+ db = CouchDatabase.open_database(
+ self._couch_url + '/' + dbname,
+ create=True)
+ return db, db._replica_uid
+
+CouchServerState.ensure_database = _couch_ensure_database
+
+
class ServerAuthorizationTestCase(BaseLeapTest):
"""
Tests related to Soledad server authorization.
@@ -339,15 +351,16 @@ class EncryptedSyncTestCase(
_, doclist = sol1.get_all_docs()
self.assertEqual([], doclist)
doc1 = sol1.create_doc(json.loads(simple_doc))
- # sync with server
- sol1._server_url = self.getURL()
- sol1.sync()
- # assert doc was sent to couch db
+ # ensure remote db exists before syncing
db = CouchDatabase(
self._couch_url,
# the name of the user database is "user-<uuid>".
'user-user-uuid',
)
+ # sync with server
+ sol1._server_url = self.getURL()
+ sol1.sync()
+ # assert doc was sent to couch db
_, doclist = db.get_all_docs()
self.assertEqual(1, len(doclist))
couchdoc = doclist[0]
@@ -376,6 +389,7 @@ class EncryptedSyncTestCase(
doc2 = doclist[0]
# assert incoming doc is equal to the first sent doc
self.assertEqual(doc1, doc2)
+ db.delete_database()
def test_encrypted_sym_sync_with_unicode_passphrase(self):
"""
@@ -393,15 +407,16 @@ class EncryptedSyncTestCase(
_, doclist = sol1.get_all_docs()
self.assertEqual([], doclist)
doc1 = sol1.create_doc(json.loads(simple_doc))
- # sync with server
- sol1._server_url = self.getURL()
- sol1.sync()
- # assert doc was sent to couch db
+ # ensure remote db exists before syncing
db = CouchDatabase(
self._couch_url,
# the name of the user database is "user-<uuid>".
'user-user-uuid',
)
+ # sync with server
+ sol1._server_url = self.getURL()
+ sol1.sync()
+ # assert doc was sent to couch db
_, doclist = db.get_all_docs()
self.assertEqual(1, len(doclist))
couchdoc = doclist[0]
@@ -434,7 +449,94 @@ class EncryptedSyncTestCase(
doc2 = doclist[0]
# assert incoming doc is equal to the first sent doc
self.assertEqual(doc1, doc2)
+ db.delete_database()
+ def test_sync_very_large_files(self):
+ """
+ Test if Soledad can sync very large files.
+ """
+ # define the size of the "very large file"
+ length = 100*(10**6) # 100 MB
+ self.startServer()
+ # instantiate soledad and create a document
+ sol1 = self._soledad_instance(
+ # token is verified in test_target.make_token_soledad_app
+ auth_token='auth-token'
+ )
+ _, doclist = sol1.get_all_docs()
+ self.assertEqual([], doclist)
+ content = binascii.hexlify(os.urandom(length/2)) # len() == length
+ doc1 = sol1.create_doc({'data': content})
+ # ensure remote db exists before syncing
+ db = CouchDatabase(
+ self._couch_url,
+ # the name of the user database is "user-<uuid>".
+ 'user-user-uuid',
+ )
+ # sync with server
+ sol1._server_url = self.getURL()
+ sol1.sync()
+ # instantiate soledad with empty db, but with same secrets path
+ sol2 = self._soledad_instance(prefix='x', auth_token='auth-token')
+ _, doclist = sol2.get_all_docs()
+ self.assertEqual([], doclist)
+ sol2._secrets_path = sol1.secrets_path
+ sol2._load_secrets()
+ sol2._set_secret_id(sol1._secret_id)
+ # sync the new instance
+ sol2._server_url = self.getURL()
+ sol2.sync()
+ _, doclist = sol2.get_all_docs()
+ self.assertEqual(1, len(doclist))
+ doc2 = doclist[0]
+ # assert incoming doc is equal to the first sent doc
+ self.assertEqual(doc1, doc2)
+ # delete remote database
+ db.delete_database()
+
+
+ def test_sync_many_small_files(self):
+ """
+ Test if Soledad can sync many smallfiles.
+ """
+ number_of_docs = 100
+ self.startServer()
+ # instantiate soledad and create a document
+ sol1 = self._soledad_instance(
+ # token is verified in test_target.make_token_soledad_app
+ auth_token='auth-token'
+ )
+ _, doclist = sol1.get_all_docs()
+ self.assertEqual([], doclist)
+ # create many small files
+ for i in range(0, number_of_docs):
+ sol1.create_doc(json.loads(simple_doc))
+ # ensure remote db exists before syncing
+ db = CouchDatabase(
+ self._couch_url,
+ # the name of the user database is "user-<uuid>".
+ 'user-user-uuid',
+ )
+ # sync with server
+ sol1._server_url = self.getURL()
+ sol1.sync()
+ # instantiate soledad with empty db, but with same secrets path
+ sol2 = self._soledad_instance(prefix='x', auth_token='auth-token')
+ _, doclist = sol2.get_all_docs()
+ self.assertEqual([], doclist)
+ sol2._secrets_path = sol1.secrets_path
+ sol2._load_secrets()
+ sol2._set_secret_id(sol1._secret_id)
+ # sync the new instance
+ sol2._server_url = self.getURL()
+ sol2.sync()
+ _, doclist = sol2.get_all_docs()
+ self.assertEqual(number_of_docs, len(doclist))
+ # assert incoming docs are equal to sent docs
+ for doc in doclist:
+ self.assertEqual(sol1.get_doc(doc.doc_id), doc)
+ # delete remote database
+ db.delete_database()
class LockResourceTestCase(
CouchDBTestCase, TestCaseWithServer):
@@ -455,12 +557,21 @@ class LockResourceTestCase(
CouchDBTestCase.setUp(self)
self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
self._couch_url = 'http://localhost:' + str(self.wrapper.port)
+ # create the databases
+ CouchDatabase(self._couch_url, 'shared')
+ CouchDatabase(self._couch_url, 'tokens')
self._state = CouchServerState(
self._couch_url, 'shared', 'tokens')
def tearDown(self):
CouchDBTestCase.tearDown(self)
TestCaseWithServer.tearDown(self)
+ # delete remote database
+ db = CouchDatabase(
+ self._couch_url,
+ 'shared',
+ )
+ db.delete_database()
def test__try_obtain_filesystem_lock(self):
responder = mock.Mock()