diff options
Diffstat (limited to 'common')
9 files changed, 1120 insertions, 373 deletions
diff --git a/common/changes/bug_4475_remodel-couch-backend b/common/changes/bug_4475_remodel-couch-backend deleted file mode 100644 index 13a1b121..00000000 --- a/common/changes/bug_4475_remodel-couch-backend +++ /dev/null @@ -1,2 +0,0 @@ - o Remodel couch backend to fix concurrency and scalability. Closes #4475, - #4682, #4683 and #4680. diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index d2414477..8e8613a1 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -24,23 +24,48 @@ import uuid import logging import binascii import socket +import time +import sys +import threading + + +from StringIO import StringIO +from collections import defaultdict from couchdb.client import Server -from couchdb.http import ResourceNotFound, Unauthorized -from u1db import errors, query_parser, vectorclock +from couchdb.http import ( + ResourceConflict, + ResourceNotFound, + ServerError, + Session, +) +from u1db import query_parser, vectorclock +from u1db.errors import ( + DatabaseDoesNotExist, + InvalidGeneration, + RevisionConflict, + InvalidDocId, + ConflictedDoc, + DocumentDoesNotExist, + DocumentAlreadyDeleted, + Unauthorized, +) from u1db.backends import CommonBackend, CommonSyncTarget from u1db.remote import http_app from u1db.remote.server_state import ServerState -from leap.soledad.common import USER_DB_PREFIX, ddocs +from leap.soledad.common import USER_DB_PREFIX, ddocs, errors from leap.soledad.common.document import SoledadDocument logger = logging.getLogger(__name__) +COUCH_TIMEOUT = 120 # timeout for transfers between Soledad server and Couch + + class InvalidURLError(Exception): """ Exception raised when Soledad encounters a malformed URL. @@ -75,9 +100,9 @@ class CouchDocument(SoledadDocument): SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) self._couch_rev = None self._conflicts = None - self._modified_conflicts = False + self._transactions = None - def ensure_fetch_conflicts(self, get_conflicts_fun): + def _ensure_fetch_conflicts(self, get_conflicts_fun): """ Ensure conflict data has been fetched from the server. @@ -100,6 +125,16 @@ class CouchDocument(SoledadDocument): """ return self._conflicts + def set_conflicts(self, conflicts): + """ + Set the conflicted versions of the document. + + :param conflicts: The conflicted versions of the document. + :type conflicts: list + """ + self._conflicts = conflicts + self.has_conflicts = len(self._conflicts) > 0 + def add_conflict(self, doc): """ Add a conflict to this document. @@ -108,8 +143,7 @@ class CouchDocument(SoledadDocument): :type doc: CouchDocument """ if self._conflicts is None: - raise Exception("Run self.ensure_fetch_conflicts first!") - self._modified_conflicts = True + raise Exception("Run self._ensure_fetch_conflicts first!") self._conflicts.append(doc) self.has_conflicts = len(self._conflicts) > 0 @@ -121,25 +155,13 @@ class CouchDocument(SoledadDocument): :type conflict_revs: [str] """ if self._conflicts is None: - raise Exception("Run self.ensure_fetch_conflicts first!") + raise Exception("Run self._ensure_fetch_conflicts first!") conflicts_len = len(self._conflicts) self._conflicts = filter( lambda doc: doc.rev not in conflict_revs, self._conflicts) - if len(self._conflicts) < conflicts_len: - self._modified_conflicts = True self.has_conflicts = len(self._conflicts) > 0 - def modified_conflicts(self): - """ - Return whether this document's conflicts have been modified. - - :return: Whether this document's conflicts have been modified. - :rtype: bool - """ - return self._conflicts is not None and \ - self._modified_conflicts is True - def _get_couch_rev(self): return self._couch_rev @@ -148,18 +170,217 @@ class CouchDocument(SoledadDocument): couch_rev = property(_get_couch_rev, _set_couch_rev) + def _get_transactions(self): + return self._transactions + + def _set_transactions(self, rev): + self._transactions = rev + + transactions = property(_get_transactions, _set_transactions) + # monkey-patch the u1db http app to use CouchDocument http_app.Document = CouchDocument +def raise_missing_design_doc_error(exc, ddoc_path): + """ + Raise an appropriate exception when catching a ResourceNotFound when + accessing a design document. + + :param exc: The exception cought. + :type exc: ResourceNotFound + :param ddoc_path: A list representing the requested path. + :type ddoc_path: list + + :raise MissingDesignDocError: Raised when tried to access a missing design + document. + :raise MissingDesignDocListFunctionError: Raised when trying to access a + missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a design + document for an yet unknown reason. + """ + path = "".join(ddoc_path) + if exc.message[1] == 'missing': + raise errors.MissingDesignDocError(path) + elif exc.message[1] == 'missing function' or \ + exc.message[1].startswith('missing lists function'): + raise errors.MissingDesignDocListFunctionError(path) + elif exc.message[1] == 'missing_named_view': + raise errors.MissingDesignDocNamedViewError(path) + elif exc.message[1] == 'deleted': + raise errors.MissingDesignDocDeletedError(path) + # other errors are unknown for now + raise errors.DesignDocUnknownError("%s: %s" % (path, str(exc.message))) + + +def raise_server_error(exc, ddoc_path): + """ + Raise an appropriate exception when catching a ServerError when + accessing a design document. + + :param exc: The exception cought. + :type exc: ResourceNotFound + :param ddoc_path: A list representing the requested path. + :type ddoc_path: list + + :raise MissingDesignDocListFunctionError: Raised when trying to access a + missing list function on a + design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a design + document for an yet unknown reason. + """ + path = "".join(ddoc_path) + if exc.message[1][0] == 'unnamed_error': + raise errors.MissingDesignDocListFunctionError(path) + # other errors are unknown for now + raise errors.DesignDocUnknownError(path) + + +class MultipartWriter(object): + """ + A multipart writer adapted from python-couchdb's one so we can PUT + documents using couch's multipart PUT. + + This stripped down version does not allow for nested structures, and + contains only the essential things we need to PUT SoledadDocuments to the + couch backend. + """ + + CRLF = '\r\n' + + def __init__(self, fileobj, headers=None, boundary=None): + """ + Initialize the multipart writer. + """ + self.fileobj = fileobj + if boundary is None: + boundary = self._make_boundary() + self._boundary = boundary + self._build_headers('related', headers) + + def add(self, mimetype, content, headers={}): + """ + Add a part to the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + self.fileobj.write(self.CRLF) + headers['Content-Type'] = mimetype + self._write_headers(headers) + if content: + # XXX: throw an exception if a boundary appears in the content?? + self.fileobj.write(content) + self.fileobj.write(self.CRLF) + + def close(self): + """ + Close the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + # be careful not to have anything after '--', otherwise old couch + # versions (including bigcouch) will fail. + self.fileobj.write('--') + + def _make_boundary(self): + """ + Create a boundary to discern multi parts. + """ + try: + from uuid import uuid4 + return '==' + uuid4().hex + '==' + except ImportError: + from random import randrange + token = randrange(sys.maxint) + format = '%%0%dd' % len(repr(sys.maxint - 1)) + return '===============' + (format % token) + '==' + + def _write_headers(self, headers): + """ + Write a part header in the buffer stream. + """ + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.fileobj.write(name) + self.fileobj.write(': ') + self.fileobj.write(value) + self.fileobj.write(self.CRLF) + self.fileobj.write(self.CRLF) + + def _build_headers(self, subtype, headers): + """ + Build the main headers of the multipart stream. + + This is here so we can send headers separete from content using + python-couchdb API. + """ + self.headers = {} + self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \ + (subtype, self._boundary) + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.headers[name] = value + + class CouchDatabase(CommonBackend): """ A U1DB implementation that uses CouchDB as its persistence layer. """ + # We spawn threads to parallelize the CouchDatabase.get_docs() method + MAX_GET_DOCS_THREADS = 20 + + update_handler_lock = defaultdict(threading.Lock) + + class _GetDocThread(threading.Thread): + """ + A thread that gets a document from a database. + + TODO: switch this for a twisted deferred to thread. This depends on + replacing python-couchdb for paisley in this module. + """ + + def __init__(self, db, doc_id, check_for_conflicts, + release_fun): + """ + :param db: The database from where to get the document. + :type db: u1db.Database + :param doc_id: The doc_id of the document to be retrieved. + :type doc_id: str + :param check_for_conflicts: Whether the get_doc() method should + check for existing conflicts. + :type check_for_conflicts: bool + :param release_fun: A function that releases a semaphore, to be + called after the document is fetched. + :type release_fun: function + """ + threading.Thread.__init__(self) + self._db = db + self._doc_id = doc_id + self._check_for_conflicts = check_for_conflicts + self._release_fun = release_fun + self._doc = None + + def run(self): + """ + Fetch the document, store it as a property, and call the release + function. + """ + self._doc = self._db._get_doc( + self._doc_id, self._check_for_conflicts) + self._release_fun() + @classmethod - def open_database(cls, url, create): + def open_database(cls, url, create, ensure_ddocs=False): """ Open a U1DB database using CouchDB as backend. @@ -167,6 +388,8 @@ class CouchDatabase(CommonBackend): :type url: str :param create: should the replica be created if it does not exist? :type create: bool + :param ensure_ddocs: Ensure that the design docs exist on server. + :type ensure_ddocs: bool :return: the database instance :rtype: CouchDatabase @@ -182,8 +405,8 @@ class CouchDatabase(CommonBackend): server[dbname] except ResourceNotFound: if not create: - raise errors.DatabaseDoesNotExist() - return cls(url, dbname) + raise DatabaseDoesNotExist() + return cls(url, dbname, ensure_ddocs=ensure_ddocs) def __init__(self, url, dbname, replica_uid=None, full_commit=True, session=None, ensure_ddocs=True): @@ -206,6 +429,8 @@ class CouchDatabase(CommonBackend): # save params self._url = url self._full_commit = full_commit + if session is None: + session = Session(timeout=COUCH_TIMEOUT) self._session = session self._factory = CouchDocument self._real_replica_uid = None @@ -223,6 +448,9 @@ class CouchDatabase(CommonBackend): self._set_replica_uid(replica_uid) if ensure_ddocs: self.ensure_ddocs_on_db() + # initialize a thread pool for parallelizing get_docs() + self._sem_pool = threading.BoundedSemaphore( + value=self.MAX_GET_DOCS_THREADS) def ensure_ddocs_on_db(self): """ @@ -318,12 +546,31 @@ class CouchDatabase(CommonBackend): :return: The current generation. :rtype: int + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ # query a couch list function - res = self._database.resource( - '_design', 'transactions', '_list', 'generation', 'log') - response = res.get_json() - return response[2]['generation'] + ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json() + return response[2]['generation'] + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) + except ServerError as e: + raise_server_error(e, ddoc_path) def _get_generation_info(self): """ @@ -331,12 +578,31 @@ class CouchDatabase(CommonBackend): :return: A tuple containing the current generation and transaction id. :rtype: (int, str) + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ # query a couch list function - res = self._database.resource( - '_design', 'transactions', '_list', 'generation', 'log') - response = res.get_json() - return (response[2]['generation'], response[2]['transaction_id']) + ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json() + return (response[2]['generation'], response[2]['transaction_id']) + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) + except ServerError as e: + raise_server_error(e, ddoc_path) def _get_trans_id_for_gen(self, generation): """ @@ -349,16 +615,36 @@ class CouchDatabase(CommonBackend): :rtype: str :raise InvalidGeneration: Raised when the generation does not exist. + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ if generation == 0: return '' # query a couch list function - res = self._database.resource( - '_design', 'transactions', '_list', 'trans_id_for_gen', 'log') - response = res.get_json(gen=generation) - if response[2] == {}: - raise errors.InvalidGeneration - return response[2]['transaction_id'] + ddoc_path = [ + '_design', 'transactions', '_list', 'trans_id_for_gen', 'log' + ] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json(gen=generation) + if response[2] == {}: + raise InvalidGeneration + return response[2]['transaction_id'] + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) + except ServerError as e: + raise_server_error(e, ddoc_path) def _get_transaction_log(self): """ @@ -366,12 +652,31 @@ class CouchDatabase(CommonBackend): :return: The complete transaction log. :rtype: [(str, str)] + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ # query a couch view - res = self._database.resource( - '_design', 'transactions', '_view', 'log') - response = res.get_json() - return map(lambda row: (row['id'], row['value']), response[2]['rows']) + ddoc_path = ['_design', 'transactions', '_view', 'log'] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json() + return map( + lambda row: (row['id'], row['value']), + response[2]['rows']) + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) def _get_doc(self, doc_id, check_for_conflicts=False): """ @@ -413,8 +718,15 @@ class CouchDatabase(CommonBackend): and '_attachments' in result \ and 'u1db_conflicts' in result['_attachments']: doc.has_conflicts = True + doc.set_conflicts( + self._build_conflicts( + doc.doc_id, + json.loads(binascii.a2b_base64( + result['_attachments']['u1db_conflicts']['data'])))) # store couch revision doc.couch_rev = result['_rev'] + # store transactions + doc.transactions = result['u1db_transactions'] return doc def get_doc(self, doc_id, include_deleted=False): @@ -465,6 +777,10 @@ class CouchDatabase(CommonBackend): """ Put the document in the Couch backend database. + Note that C{old_doc} must have been fetched with the parameter + C{check_for_conflicts} equal to True, so we can properly update the + new document using the conflict information from the old one. + :param old_doc: The old document version. :type old_doc: CouchDocument :param doc: The document to be put. @@ -472,43 +788,76 @@ class CouchDatabase(CommonBackend): :raise RevisionConflict: Raised when trying to update a document but couch revisions mismatch. - """ - trans_id = self._allocate_transaction_id() - # encode content - content = doc.get_json() - if content is not None: - content = binascii.b2a_base64(content)[:-1] # exclude trailing \n - # encode conflicts - conflicts = None - update_conflicts = doc.modified_conflicts() - if update_conflicts is True: - if doc.has_conflicts: - conflicts = binascii.b2a_base64( - json.dumps( - map(lambda cdoc: (cdoc.rev, cdoc.content), - doc.get_conflicts())) - )[:-1] # exclude \n - # perform the request - resource = self._database.resource( - '_design', 'docs', '_update', 'put', doc.doc_id) - response = resource.put_json( - body={ - 'couch_rev': old_doc.couch_rev - if old_doc is not None else None, - 'u1db_rev': doc.rev, - 'content': content, - 'trans_id': trans_id, - 'conflicts': conflicts, - 'update_conflicts': update_conflicts, - }, - headers={'content-type': 'application/json'}) - # the document might have been updated in between, so we check for the - # return message - msg = response[2].read() - if msg == 'ok': - return - elif msg == 'revision conflict': - raise errors.RevisionConflict() + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. + """ + attachments = {} # we save content and conflicts as attachments + parts = [] # and we put it using couch's multipart PUT + # save content as attachment + if doc.is_tombstone() is False: + content = doc.get_json() + attachments['u1db_content'] = { + 'follows': True, + 'content_type': 'application/octet-stream', + 'length': len(content), + } + parts.append(content) + # save conflicts as attachment + if doc.has_conflicts is True: + conflicts = json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + attachments['u1db_conflicts'] = { + 'follows': True, + 'content_type': 'application/octet-stream', + 'length': len(conflicts), + } + parts.append(conflicts) + # store old transactions, if any + transactions = old_doc.transactions[:] if old_doc is not None else [] + # create a new transaction id and timestamp it so the transaction log + # is consistent when querying the database. + transactions.append( + # here we store milliseconds to keep consistent with javascript + # Date.prototype.getTime() which was used before inside a couchdb + # update handler. + (int(time.time() * 1000), + self._allocate_transaction_id())) + # build the couch document + couch_doc = { + '_id': doc.doc_id, + 'u1db_rev': doc.rev, + 'u1db_transactions': transactions, + '_attachments': attachments, + } + # if we are updating a doc we have to add the couch doc revision + if old_doc is not None: + couch_doc['_rev'] = old_doc.couch_rev + # prepare the multipart PUT + buf = StringIO() + envelope = MultipartWriter(buf) + envelope.add('application/json', json.dumps(couch_doc)) + for part in parts: + envelope.add('application/octet-stream', part) + envelope.close() + # try to save and fail if there's a revision conflict + try: + self._database.resource.put_json( + doc.doc_id, body=buf.getvalue(), headers=envelope.headers) + self._renew_couch_session() + except ResourceConflict: + raise RevisionConflict() def put_doc(self, doc): """ @@ -522,26 +871,26 @@ class CouchDatabase(CommonBackend): :return: new_doc_rev - The new revision identifier for the document. The Document object will also be updated. - :raise errors.InvalidDocId: Raised if the document's id is invalid. - :raise errors.DocumentTooBig: Raised if the document size is too big. - :raise errors.ConflictedDoc: Raised if the document has conflicts. + :raise InvalidDocId: Raised if the document's id is invalid. + :raise DocumentTooBig: Raised if the document size is too big. + :raise ConflictedDoc: Raised if the document has conflicts. """ if doc.doc_id is None: - raise errors.InvalidDocId() + raise InvalidDocId() self._check_doc_id(doc.doc_id) self._check_doc_size(doc) old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) if old_doc and old_doc.has_conflicts: - raise errors.ConflictedDoc() + raise ConflictedDoc() if old_doc and doc.rev is None and old_doc.is_tombstone(): new_rev = self._allocate_doc_rev(old_doc.rev) else: if old_doc is not None: if old_doc.rev != doc.rev: - raise errors.RevisionConflict() + raise RevisionConflict() else: if doc.rev is not None: - raise errors.RevisionConflict() + raise RevisionConflict() new_rev = self._allocate_doc_rev(doc.rev) doc.rev = new_rev self._put_doc(old_doc, doc) @@ -563,32 +912,53 @@ class CouchDatabase(CommonBackend): to the last intervening change and sorted by generation (old changes first) :rtype: (int, str, [(str, int, str)]) + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ # query a couch list function - res = self._database.resource( - '_design', 'transactions', '_list', 'whats_changed', 'log') - response = res.get_json(old_gen=old_generation) - results = map( - lambda row: - (row['generation'], row['doc_id'], row['transaction_id']), - response[2]['transactions']) - results.reverse() - cur_gen = old_generation - seen = set() - changes = [] - newest_trans_id = '' - for generation, doc_id, trans_id in results: - if doc_id not in seen: - changes.append((doc_id, generation, trans_id)) - seen.add(doc_id) - if changes: - cur_gen = changes[0][1] # max generation - newest_trans_id = changes[0][2] - changes.reverse() - else: - cur_gen, newest_trans_id = self._get_generation_info() + ddoc_path = [ + '_design', 'transactions', '_list', 'whats_changed', 'log' + ] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json(old_gen=old_generation) + results = map( + lambda row: + (row['generation'], row['doc_id'], row['transaction_id']), + response[2]['transactions']) + results.reverse() + cur_gen = old_generation + seen = set() + changes = [] + newest_trans_id = '' + for generation, doc_id, trans_id in results: + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + if changes: + cur_gen = changes[0][1] # max generation + newest_trans_id = changes[0][2] + changes.reverse() + else: + cur_gen, newest_trans_id = self._get_generation_info() - return cur_gen, newest_trans_id, changes + return cur_gen, newest_trans_id, changes + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) + except ServerError as e: + raise_server_error(e, ddoc_path) def delete_doc(self, doc): """ @@ -600,28 +970,47 @@ class CouchDatabase(CommonBackend): :param doc: The document to mark as deleted. :type doc: CouchDocument. - :raise errors.DocumentDoesNotExist: Raised if the document does not + :raise DocumentDoesNotExist: Raised if the document does not exist. - :raise errors.RevisionConflict: Raised if the revisions do not match. - :raise errors.DocumentAlreadyDeleted: Raised if the document is + :raise RevisionConflict: Raised if the revisions do not match. + :raise DocumentAlreadyDeleted: Raised if the document is already deleted. - :raise errors.ConflictedDoc: Raised if the doc has conflicts. + :raise ConflictedDoc: Raised if the doc has conflicts. """ old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) if old_doc is None: - raise errors.DocumentDoesNotExist + raise DocumentDoesNotExist if old_doc.rev != doc.rev: - raise errors.RevisionConflict() + raise RevisionConflict() if old_doc.is_tombstone(): - raise errors.DocumentAlreadyDeleted + raise DocumentAlreadyDeleted if old_doc.has_conflicts: - raise errors.ConflictedDoc() + raise ConflictedDoc() new_rev = self._allocate_doc_rev(doc.rev) doc.rev = new_rev doc.make_tombstone() self._put_doc(old_doc, doc) return new_rev + def _build_conflicts(self, doc_id, attached_conflicts): + """ + Build the conflicted documents list from the conflicts attachment + fetched from a couch document. + + :param attached_conflicts: The document's conflicts as fetched from a + couch document attachment. + :type attached_conflicts: dict + """ + conflicts = [] + for doc_rev, content in attached_conflicts: + doc = self._factory(doc_id, doc_rev) + if content is None: + doc.make_tombstone() + else: + doc.content = content + conflicts.append(doc) + return conflicts + def _get_conflicts(self, doc_id, couch_rev=None): """ Get the conflicted versions of a document. @@ -642,16 +1031,8 @@ class CouchDatabase(CommonBackend): resource = self._database.resource(doc_id, 'u1db_conflicts') try: response = resource.get_json(**params) - conflicts = [] - # build the conflicted versions - for doc_rev, content in json.loads(response[2].read()): - doc = self._factory(doc_id, doc_rev) - if content is None: - doc.make_tombstone() - else: - doc.content = content - conflicts.append(doc) - return conflicts + return self._build_conflicts( + doc_id, json.loads(response[2].read())) except ResourceNotFound: return [] @@ -737,17 +1118,35 @@ class CouchDatabase(CommonBackend): :param other_transaction_id: The transaction id associated with the generation. :type other_transaction_id: str + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ # query a couch update function - res = self._database.resource( - '_design', 'syncs', '_update', 'put', 'u1db_sync_log') - res.put_json( - body={ - 'other_replica_uid': other_replica_uid, - 'other_generation': other_generation, - 'other_transaction_id': other_transaction_id, - }, - headers={'content-type': 'application/json'}) + ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log'] + res = self._database.resource(*ddoc_path) + try: + with CouchDatabase.update_handler_lock[self._get_replica_uid()]: + res.put_json( + body={ + 'other_replica_uid': other_replica_uid, + 'other_generation': other_generation, + 'other_transaction_id': other_transaction_id, + }, + headers={'content-type': 'application/json'}) + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) def _add_conflict(self, doc, my_doc_rev, my_content): """ @@ -765,7 +1164,7 @@ class CouchDatabase(CommonBackend): serialized string. :type my_content: str """ - doc.ensure_fetch_conflicts(self._get_conflicts) + doc._ensure_fetch_conflicts(self._get_conflicts) doc.add_conflict( self._factory(doc_id=doc.doc_id, rev=my_doc_rev, json=my_content)) @@ -774,7 +1173,7 @@ class CouchDatabase(CommonBackend): """ Delete the conflicted revisions from the list of conflicts of C{doc}. - Note that thie method does not actually update the backed; rather, it + Note that this method does not actually update the backend; rather, it updates the CouchDocument object which will provide the conflict data when the atomic document update is made. @@ -783,7 +1182,7 @@ class CouchDatabase(CommonBackend): :param conflict_revs: A list of the revisions to be deleted. :param conflict_revs: [str] """ - doc.ensure_fetch_conflicts(self._get_conflicts) + doc._ensure_fetch_conflicts(self._get_conflicts) doc.delete_conflicts(conflict_revs) def _prune_conflicts(self, doc, doc_vcr): @@ -842,34 +1241,44 @@ class CouchDatabase(CommonBackend): :param conflicted_doc_revs: A list of revisions that the new content supersedes. :type conflicted_doc_revs: [str] + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) new_rev = self._ensure_maximal_rev(cur_doc.rev, conflicted_doc_revs) superseded_revs = set(conflicted_doc_revs) doc.rev = new_rev + # this backend stores conflicts as properties of the documents, so we + # have to copy these conflicts over to the document being updated. if cur_doc.rev in superseded_revs: + # the newer doc version will supersede the one in the database, so + # we copy conflicts before updating the backend. + doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over. self._delete_conflicts(doc, superseded_revs) self._put_doc(cur_doc, doc) else: - self._add_conflict(doc, new_rev, doc.get_json()) - self._delete_conflicts(doc, superseded_revs) - # perform request to resolve document in server - resource = self._database.resource( - '_design', 'docs', '_update', 'resolve_doc', doc.doc_id) - conflicts = None - if doc.has_conflicts: - conflicts = binascii.b2a_base64( - json.dumps( - map(lambda cdoc: (cdoc.rev, cdoc.content), - doc.get_conflicts())) - )[:-1] # exclude \n - response = resource.put_json( - body={ - 'couch_rev': cur_doc.couch_rev, - 'conflicts': conflicts, - }, - headers={'content-type': 'application/json'}) + # the newer doc version does not supersede the one in the + # database, so we will add a conflict to the database and copy + # those over to the document the user has in her hands. + self._add_conflict(cur_doc, new_rev, doc.get_json()) + self._delete_conflicts(cur_doc, superseded_revs) + self._put_doc(cur_doc, cur_doc) # just update conflicts + # backend has been updated with current conflicts, now copy them + # to the current document. + doc.set_conflicts(cur_doc.get_conflicts()) def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, replica_trans_id=''): @@ -926,7 +1335,7 @@ class CouchDatabase(CommonBackend): if cur_doc is not None: doc.couch_rev = cur_doc.couch_rev # fetch conflicts because we will eventually manipulate them - doc.ensure_fetch_conflicts(self._get_conflicts) + doc._ensure_fetch_conflicts(self._get_conflicts) # from now on, it works just like u1db sqlite backend doc_vcr = vectorclock.VectorClockRev(doc.rev) if cur_doc is None: @@ -963,7 +1372,7 @@ class CouchDatabase(CommonBackend): if save_conflict: self._force_doc_sync_conflict(doc) if replica_uid is not None and replica_gen is not None: - self._do_set_replica_gen_and_trans_id( + self._set_replica_gen_and_trans_id( replica_uid, replica_gen, replica_trans_id) # update info old_doc.rev = doc.rev @@ -974,6 +1383,59 @@ class CouchDatabase(CommonBackend): old_doc.has_conflicts = doc.has_conflicts return state, self._get_generation() + def get_docs(self, doc_ids, check_for_conflicts=True, + include_deleted=False): + """ + Get the JSON content for many documents. + + :param doc_ids: A list of document identifiers. + :type doc_ids: list + :param check_for_conflicts: If set to False, then the conflict check + will be skipped, and 'None' will be + returned instead of True/False. + :type check_for_conflictsa: bool + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :return: iterable giving the Document object for each document id + in matching doc_ids order. + :rtype: iterable + """ + # Workaround for: + # + # http://bugs.python.org/issue7980 + # https://leap.se/code/issues/5449 + # + # python-couchdb uses time.strptime, which is not thread safe. In + # order to avoid the problem described on the issues above, we preload + # strptime here by evaluating the conversion of an arbitrary date. + # This will not be needed when/if we switch from python-couchdb to + # paisley. + time.strptime('Mar 4 1917', '%b %d %Y') + # spawn threads to retrieve docs + threads = [] + for doc_id in doc_ids: + self._sem_pool.acquire() + t = self._GetDocThread(self, doc_id, check_for_conflicts, + self._sem_pool.release) + t.start() + threads.append(t) + # join threads and yield docs + for t in threads: + t.join() + if t._doc.is_tombstone() and not include_deleted: + continue + yield t._doc + + def _renew_couch_session(self): + """ + Create a new couch connection session. + + This is a workaround for #5448. Will not be needed once bigcouch is + merged with couchdb. + """ + self._database.resource.session = Session(timeout=COUCH_TIMEOUT) + class CouchSyncTarget(CommonSyncTarget): """ @@ -997,14 +1459,6 @@ class CouchSyncTarget(CommonSyncTarget): source_replica_transaction_id) -class NotEnoughCouchPermissions(Exception): - """ - Raised when failing to assert for enough permissions on underlying Couch - Database. - """ - pass - - class CouchServerState(ServerState): """ Inteface of the WSGI server with the CouchDB backend. @@ -1024,65 +1478,6 @@ class CouchServerState(ServerState): self._couch_url = couch_url self._shared_db_name = shared_db_name self._tokens_db_name = tokens_db_name - try: - self._check_couch_permissions() - except NotEnoughCouchPermissions: - logger.error("Not enough permissions on underlying couch " - "database (%s)." % self._couch_url) - except (socket.error, socket.gaierror, socket.herror, - socket.timeout), e: - logger.error("Socket problem while trying to reach underlying " - "couch database: (%s, %s)." % - (self._couch_url, e)) - - def _check_couch_permissions(self): - """ - Assert that Soledad Server has enough permissions on the underlying - couch database. - - Soledad Server has to be able to do the following in the couch server: - - * Create, read and write from/to 'shared' db. - * Create, read and write from/to 'user-<anything>' dbs. - * Read from 'tokens' db. - - This function tries to perform the actions above using the "low level" - couch library to ensure that Soledad Server can do everything it needs - on the underlying couch database. - - :param couch_url: The URL of the couch database. - :type couch_url: str - - @raise NotEnoughCouchPermissions: Raised in case there are not enough - permissions to read/write/create the needed couch databases. - :rtype: bool - """ - - def _open_couch_db(dbname): - server = Server(url=self._couch_url) - try: - server[dbname] - except ResourceNotFound: - server.create(dbname) - return server[dbname] - - def _create_delete_test_doc(db): - doc_id, _ = db.save({'test': 'document'}) - doc = db[doc_id] - db.delete(doc) - - try: - # test read/write auth for shared db - _create_delete_test_doc( - _open_couch_db(self._shared_db_name)) - # test read/write auth for user-<something> db - _create_delete_test_doc( - _open_couch_db('%stest-db' % USER_DB_PREFIX)) - # test read auth for tokens db - tokensdb = _open_couch_db(self._tokens_db_name) - tokensdb.info() - except Unauthorized: - raise NotEnoughCouchPermissions(self._couch_url) def open_database(self, dbname): """ @@ -1094,25 +1489,29 @@ class CouchServerState(ServerState): :return: The CouchDatabase object. :rtype: CouchDatabase """ - # TODO: open couch return CouchDatabase.open_database( self._couch_url + '/' + dbname, - create=False) + create=False, + ensure_ddocs=False) def ensure_database(self, dbname): """ Ensure couch database exists. + Usually, this method is used by the server to ensure the existence of + a database. In our setup, the Soledad user that accesses the underlying + couch server should never have permission to create (or delete) + databases. But, in case it ever does, by raising an exception here we + have one more guarantee that no modified client will be able to + enforce creation of a database when syncing. + :param dbname: The name of the database to ensure. :type dbname: str :return: The CouchDatabase object and the replica uid. :rtype: (CouchDatabase, str) """ - db = CouchDatabase.open_database( - self._couch_url + '/' + dbname, - create=True) - return db, db._replica_uid + raise Unauthorized() def delete_database(self, dbname): """ @@ -1121,7 +1520,7 @@ class CouchServerState(ServerState): :param dbname: The name of the database to delete. :type dbname: str """ - CouchDatabase.delete_database(self._couch_url + '/' + dbname) + raise Unauthorized() def _set_couch_url(self, url): """ diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js deleted file mode 100644 index 5a4647de..00000000 --- a/common/src/leap/soledad/common/ddocs/docs/updates/put.js +++ /dev/null @@ -1,64 +0,0 @@ -function(doc, req){ - /* we expect to receive the following in `req.body`: - * { - * 'couch_rev': '<couch_rev>', - * 'u1db_rev': '<u1db_rev>', - * 'content': '<base64 encoded content>', - * 'trans_id': '<reansaction_id>' - * 'conflicts': '<base64 encoded conflicts>', - * 'update_conflicts': <boolean> - * } - */ - var body = JSON.parse(req.body); - - // create a new document document - if (!doc) { - doc = {} - doc['_id'] = req['id']; - } - // or fail if couch revisions do not match - else if (doc['_rev'] != body['couch_rev']) { - // of fail if revisions do not match - return [null, 'revision conflict'] - } - - // store u1db rev - doc.u1db_rev = body['u1db_rev']; - - // save content as attachment - if (body['content'] != null) { - // save u1db content as attachment - if (!doc._attachments) - doc._attachments = {}; - doc._attachments.u1db_content = { - content_type: "application/octet-stream", - data: body['content'] // should be base64 encoded - }; - } - // or delete the attachment if document is tombstone - else if (doc._attachments && - doc._attachments.u1db_content) - delete doc._attachments.u1db_content; - - // store the transaction id - if (!doc.u1db_transactions) - doc.u1db_transactions = []; - var d = new Date(); - doc.u1db_transactions.push([d.getTime(), body['trans_id']]); - - // save conflicts as attachment if they were sent - if (body['update_conflicts']) - if (body['conflicts'] != null) { - if (!doc._attachments) - doc._attachments = {}; - doc._attachments.u1db_conflicts = { - content_type: "application/octet-stream", - data: body['conflicts'] // should be base64 encoded - } - } else { - if(doc._attachments && doc._attachments.u1db_conflicts) - delete doc._attachments.u1db_conflicts - } - - return [doc, 'ok']; -} diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js deleted file mode 100644 index 7ba66cf8..00000000 --- a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js +++ /dev/null @@ -1,39 +0,0 @@ -function(doc, req){ - /* we expect to receive the following in `req.body`: - * { - * 'couch_rev': '<couch_rev>', - * 'conflicts': '<base64 encoded conflicts>', - * } - */ - var body = JSON.parse(req.body); - - // fail if no document was given - if (!doc) { - return [null, 'document does not exist'] - } - - // fail if couch revisions do not match - if (body['couch_rev'] != null - && doc['_rev'] != body['couch_rev']) { - return [null, 'revision conflict'] - } - - // fail if conflicts were not sent - if (body['conflicts'] == null) - return [null, 'missing conflicts'] - - // save conflicts as attachment if they were sent - if (body['conflicts'] != null) { - if (!doc._attachments) - doc._attachments = {}; - doc._attachments.u1db_conflicts = { - content_type: "application/octet-stream", - data: body['conflicts'] // should be base64 encoded - } - } - // or delete attachment if there are no conflicts - else if (doc._attachments && doc._attachments.u1db_conflicts) - delete doc._attachments.u1db_conflicts; - - return [doc, 'ok']; -} diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index 7c2d7296..3a7eadd2 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -25,11 +25,49 @@ from u1db import errors from u1db.remote import http_errors +def register_exception(cls): + """ + A small decorator that registers exceptions in u1db maps. + """ + # update u1db "wire description to status" and "wire description to + # exception" maps. + http_errors.wire_description_to_status.update({ + cls.wire_description: cls.status}) + errors.wire_description_to_exc.update({ + cls.wire_description: cls}) + # do not modify the exception + return cls + + +class SoledadError(errors.U1DBError): + """ + Base Soledad HTTP errors. + """ + pass + + # -# LockResource: a lock based on a document in the shared database. +# Authorization errors # -class InvalidTokenError(errors.U1DBError): +@register_exception +class InvalidAuthTokenError(errors.Unauthorized): + """ + Exception raised when failing to get authorization for some action because + the provided token either does not exist in the tokens database, has a + distinct structure from the expected one, or is associated with a user + with a distinct uuid than the one provided by the client. + """ + + wire_descrition = "invalid auth token" + status = 401 + +# +# LockResource errors +# + +@register_exception +class InvalidTokenError(SoledadError): """ Exception raised when trying to unlock shared database with invalid token. """ @@ -38,7 +76,8 @@ class InvalidTokenError(errors.U1DBError): status = 401 -class NotLockedError(errors.U1DBError): +@register_exception +class NotLockedError(SoledadError): """ Exception raised when trying to unlock shared database when it is not locked. @@ -48,7 +87,8 @@ class NotLockedError(errors.U1DBError): status = 404 -class AlreadyLockedError(errors.U1DBError): +@register_exception +class AlreadyLockedError(SoledadError): """ Exception raised when trying to lock shared database but it is already locked. @@ -57,13 +97,83 @@ class AlreadyLockedError(errors.U1DBError): wire_description = "lock is locked" status = 403 -# update u1db "wire description to status" and "wire description to exception" -# maps. -for e in [InvalidTokenError, NotLockedError, AlreadyLockedError]: - http_errors.wire_description_to_status.update({ - e.wire_description: e.status}) - errors.wire_description_to_exc.update({ - e.wire_description: e}) + +@register_exception +class LockTimedOutError(SoledadError): + """ + Exception raised when timing out while trying to lock the shared database. + """ + + wire_description = "lock timed out" + status = 408 + + +@register_exception +class CouldNotObtainLockError(SoledadError): + """ + Exception raised when timing out while trying to lock the shared database. + """ + + wire_description = "error obtaining lock" + status = 500 + + +# +# CouchDatabase errors +# + +@register_exception +class MissingDesignDocError(SoledadError): + """ + Raised when trying to access a missing couch design document. + """ + + wire_description = "missing design document" + status = 500 + + +@register_exception +class MissingDesignDocNamedViewError(SoledadError): + """ + Raised when trying to access a missing named view on a couch design + document. + """ + + wire_description = "missing design document named function" + status = 500 + + +@register_exception +class MissingDesignDocListFunctionError(SoledadError): + """ + Raised when trying to access a missing list function on a couch design + document. + """ + + wire_description = "missing design document list function" + status = 500 + + +@register_exception +class MissingDesignDocDeletedError(SoledadError): + """ + Raised when trying to access a deleted couch design document. + """ + + wire_description = "design document was deleted" + status = 500 + + +@register_exception +class DesignDocUnknownError(SoledadError): + """ + Raised when trying to access a couch design document and getting an + unknown error. + """ + + wire_description = "missing design document unknown error" + status = 500 + # u1db error statuses also have to be updated http_errors.ERROR_STATUSES = set( diff --git a/common/src/leap/soledad/common/tests/couchdb.ini.template b/common/src/leap/soledad/common/tests/couchdb.ini.template index 217ae201..1fc2205b 100644 --- a/common/src/leap/soledad/common/tests/couchdb.ini.template +++ b/common/src/leap/soledad/common/tests/couchdb.ini.template @@ -6,7 +6,7 @@ database_dir = %(tempdir)s/lib view_index_dir = %(tempdir)s/lib max_document_size = 4294967296 ; 4 GB -os_process_timeout = 5000 ; 5 seconds. for view and external servers. +os_process_timeout = 120000 ; 120 seconds. for view and external servers. max_dbs_open = 100 delayed_commits = true ; set this to false to ensure an fsync before 201 Created is returned uri_file = %(tempdir)s/lib/couch.uri diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 72346333..86bb4b93 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -24,16 +24,17 @@ import re import copy import shutil from base64 import b64decode +from mock import Mock from couchdb.client import Server -from u1db import errors +from u1db import errors as u1db_errors from leap.common.files import mkdir_p from leap.soledad.common.tests import u1db_tests as tests from leap.soledad.common.tests.u1db_tests import test_backends from leap.soledad.common.tests.u1db_tests import test_sync -from leap.soledad.common import couch +from leap.soledad.common import couch, errors import simplejson as json @@ -80,9 +81,10 @@ class CouchDBWrapper(object): mkdir_p(os.path.join(self.tempdir, 'lib')) mkdir_p(os.path.join(self.tempdir, 'log')) args = ['couchdb', '-n', '-a', confPath] - #null = open('/dev/null', 'w') + null = open('/dev/null', 'w') + self.process = subprocess.Popen( - args, env=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + args, env=None, stdout=null.fileno(), stderr=null.fileno(), close_fds=True) # find port logPath = os.path.join(self.tempdir, 'log', 'couch.log') @@ -125,21 +127,21 @@ class CouchDBTestCase(unittest.TestCase): TestCase base class for tests against a real CouchDB server. """ - def setUp(self): + @classmethod + def setUpClass(cls): """ Make sure we have a CouchDB instance for a test. """ - self.wrapper = CouchDBWrapper() - self.wrapper.start() + cls.wrapper = CouchDBWrapper() + cls.wrapper.start() #self.db = self.wrapper.db - unittest.TestCase.setUp(self) - def tearDown(self): + @classmethod + def tearDownClass(cls): """ Stop CouchDB instance for test. """ - self.wrapper.stop() - unittest.TestCase.tearDown(self) + cls.wrapper.stop() #----------------------------------------------------------------------------- @@ -356,7 +358,7 @@ class IndexedCouchDatabase(couch.CouchDatabase): def __init__(self, url, dbname, replica_uid=None, full_commit=True, session=None, ensure_ddocs=True): old_class.__init__(self, url, dbname, replica_uid, full_commit, - session, ensure_ddocs=True) + session, ensure_ddocs=ensure_ddocs) self._indexes = {} def _put_doc(self, old_doc, doc): @@ -372,7 +374,7 @@ class IndexedCouchDatabase(couch.CouchDatabase): if self._indexes[index_name]._definition == list( index_expressions): return - raise errors.IndexNameTakenError + raise u1db_errors.IndexNameTakenError index = InMemoryIndex(index_name, list(index_expressions)) _, all_docs = self.get_all_docs() for doc in all_docs: @@ -392,7 +394,7 @@ class IndexedCouchDatabase(couch.CouchDatabase): try: index = self._indexes[index_name] except KeyError: - raise errors.IndexDoesNotExist + raise u1db_errors.IndexDoesNotExist doc_ids = index.lookup(key_values) result = [] for doc_id in doc_ids: @@ -405,7 +407,7 @@ class IndexedCouchDatabase(couch.CouchDatabase): try: index = self._indexes[index_name] except KeyError: - raise errors.IndexDoesNotExist + raise u1db_errors.IndexDoesNotExist if isinstance(start_value, basestring): start_value = (start_value,) if isinstance(end_value, basestring): @@ -420,7 +422,7 @@ class IndexedCouchDatabase(couch.CouchDatabase): try: index = self._indexes[index_name] except KeyError: - raise errors.IndexDoesNotExist + raise u1db_errors.IndexDoesNotExist keys = index.keys() # XXX inefficiency warning return list(set([tuple(key.split('\x01')) for key in keys])) @@ -461,4 +463,184 @@ class CouchDatabaseSyncTests(test_sync.DatabaseSyncTests, CouchDBTestCase): test_sync.DatabaseSyncTests.tearDown(self) +class CouchDatabaseExceptionsTests(CouchDBTestCase): + + def setUp(self): + CouchDBTestCase.setUp(self) + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=False) # note that we don't enforce ddocs here + + def tearDown(self): + self.db.delete_database() + + def test_missing_design_doc_raises(self): + """ + Test that all methods that access design documents will raise if the + design docs are not present. + """ + # _get_generation() + self.assertRaises( + errors.MissingDesignDocError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocError, + self.db._get_trans_id_for_gen, 1) + # _get_transaction_log() + self.assertRaises( + errors.MissingDesignDocError, + self.db._get_transaction_log) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocError, + self.db.whats_changed) + # _do_set_replica_gen_and_trans_id() + self.assertRaises( + errors.MissingDesignDocError, + self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) + + def test_missing_design_doc_functions_raises(self): + """ + Test that all methods that access design documents list functions + will raise if the functions are not present. + """ + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=True) + # erase views from _design/transactions + transactions = self.db._database['_design/transactions'] + transactions['lists'] = {} + self.db._database.save(transactions) + # _get_generation() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_trans_id_for_gen, 1) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db.whats_changed) + + def test_absent_design_doc_functions_raises(self): + """ + Test that all methods that access design documents list functions + will raise if the functions are not present. + """ + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=True) + # erase views from _design/transactions + transactions = self.db._database['_design/transactions'] + del transactions['lists'] + self.db._database.save(transactions) + # _get_generation() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_trans_id_for_gen, 1) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db.whats_changed) + + def test_missing_design_doc_named_views_raises(self): + """ + Test that all methods that access design documents' named views will + raise if the views are not present. + """ + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=True) + # erase views from _design/docs + docs = self.db._database['_design/docs'] + del docs['views'] + self.db._database.save(docs) + # erase views from _design/syncs + syncs = self.db._database['_design/syncs'] + del syncs['views'] + self.db._database.save(syncs) + # erase views from _design/transactions + transactions = self.db._database['_design/transactions'] + del transactions['views'] + self.db._database.save(transactions) + # _get_generation() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db._get_trans_id_for_gen, 1) + # _get_transaction_log() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db._get_transaction_log) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db.whats_changed) + + def test_deleted_design_doc_raises(self): + """ + Test that all methods that access design documents will raise if the + design docs are not present. + """ + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=True) + # delete _design/docs + del self.db._database['_design/docs'] + # delete _design/syncs + del self.db._database['_design/syncs'] + # delete _design/transactions + del self.db._database['_design/transactions'] + # _get_generation() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._get_trans_id_for_gen, 1) + # _get_transaction_log() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._get_transaction_log) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db.whats_changed) + # _do_set_replica_gen_and_trans_id() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) + + load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index a0c473b1..3c457cc5 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -33,11 +33,17 @@ from leap.soledad.common.tests.test_target import ( make_leap_document_for_test, token_leap_sync_target, ) +from leap.soledad.common.tests.test_server import _couch_ensure_database REPEAT_TIMES = 20 +# monkey path CouchServerState so it can ensure databases. + +CouchServerState.ensure_database = _couch_ensure_database + + class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): @staticmethod @@ -100,6 +106,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") def tearDown(self): + self.db.delete_database() CouchDBTestCase.tearDown(self) TestCaseWithServer.tearDown(self) @@ -337,3 +344,46 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): self.assertEqual( 1, len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_concurrent_syncs_do_not_fail(self): + """ + Assert that concurrent attempts to sync end up being executed + sequentially and do not fail. + """ + threads = [] + docs = [] + pool = threading.BoundedSemaphore(value=1) + self.startServer() + sol = self._soledad_instance( + auth_token='auth-token', + server_url=self.getURL()) + + def _run_method(self): + # create a lot of documents + doc = self._params['sol'].create_doc({}) + # do the sync! + sol.sync() + pool.acquire() + docs.append(doc.doc_id) + pool.release() + + # launch threads to create documents in parallel + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread( + {'sol': sol, 'syncs': i}, + _run_method) + thread.start() + threads.append(thread) + + # wait for threads to finish + for thread in threads: + thread.join() + + transaction_log = self.db._get_transaction_log() + self.assertEqual(REPEAT_TIMES, len(transaction_log)) + # assert all documents are in the remote log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index 83df192b..f8d2a64f 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -25,6 +25,7 @@ import tempfile import simplejson as json import mock import time +import binascii from leap.common.testing.basetest import BaseLeapTest @@ -50,6 +51,17 @@ from leap.soledad.server import SoledadApp, LockResource from leap.soledad.server.auth import URLToAuthorization +# monkey path CouchServerState so it can ensure databases. + +def _couch_ensure_database(self, dbname): + db = CouchDatabase.open_database( + self._couch_url + '/' + dbname, + create=True) + return db, db._replica_uid + +CouchServerState.ensure_database = _couch_ensure_database + + class ServerAuthorizationTestCase(BaseLeapTest): """ Tests related to Soledad server authorization. @@ -339,15 +351,16 @@ class EncryptedSyncTestCase( _, doclist = sol1.get_all_docs() self.assertEqual([], doclist) doc1 = sol1.create_doc(json.loads(simple_doc)) - # sync with server - sol1._server_url = self.getURL() - sol1.sync() - # assert doc was sent to couch db + # ensure remote db exists before syncing db = CouchDatabase( self._couch_url, # the name of the user database is "user-<uuid>". 'user-user-uuid', ) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # assert doc was sent to couch db _, doclist = db.get_all_docs() self.assertEqual(1, len(doclist)) couchdoc = doclist[0] @@ -376,6 +389,7 @@ class EncryptedSyncTestCase( doc2 = doclist[0] # assert incoming doc is equal to the first sent doc self.assertEqual(doc1, doc2) + db.delete_database() def test_encrypted_sym_sync_with_unicode_passphrase(self): """ @@ -393,15 +407,16 @@ class EncryptedSyncTestCase( _, doclist = sol1.get_all_docs() self.assertEqual([], doclist) doc1 = sol1.create_doc(json.loads(simple_doc)) - # sync with server - sol1._server_url = self.getURL() - sol1.sync() - # assert doc was sent to couch db + # ensure remote db exists before syncing db = CouchDatabase( self._couch_url, # the name of the user database is "user-<uuid>". 'user-user-uuid', ) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # assert doc was sent to couch db _, doclist = db.get_all_docs() self.assertEqual(1, len(doclist)) couchdoc = doclist[0] @@ -434,7 +449,94 @@ class EncryptedSyncTestCase( doc2 = doclist[0] # assert incoming doc is equal to the first sent doc self.assertEqual(doc1, doc2) + db.delete_database() + def test_sync_very_large_files(self): + """ + Test if Soledad can sync very large files. + """ + # define the size of the "very large file" + length = 100*(10**6) # 100 MB + self.startServer() + # instantiate soledad and create a document + sol1 = self._soledad_instance( + # token is verified in test_target.make_token_soledad_app + auth_token='auth-token' + ) + _, doclist = sol1.get_all_docs() + self.assertEqual([], doclist) + content = binascii.hexlify(os.urandom(length/2)) # len() == length + doc1 = sol1.create_doc({'data': content}) + # ensure remote db exists before syncing + db = CouchDatabase( + self._couch_url, + # the name of the user database is "user-<uuid>". + 'user-user-uuid', + ) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # instantiate soledad with empty db, but with same secrets path + sol2 = self._soledad_instance(prefix='x', auth_token='auth-token') + _, doclist = sol2.get_all_docs() + self.assertEqual([], doclist) + sol2._secrets_path = sol1.secrets_path + sol2._load_secrets() + sol2._set_secret_id(sol1._secret_id) + # sync the new instance + sol2._server_url = self.getURL() + sol2.sync() + _, doclist = sol2.get_all_docs() + self.assertEqual(1, len(doclist)) + doc2 = doclist[0] + # assert incoming doc is equal to the first sent doc + self.assertEqual(doc1, doc2) + # delete remote database + db.delete_database() + + + def test_sync_many_small_files(self): + """ + Test if Soledad can sync many smallfiles. + """ + number_of_docs = 100 + self.startServer() + # instantiate soledad and create a document + sol1 = self._soledad_instance( + # token is verified in test_target.make_token_soledad_app + auth_token='auth-token' + ) + _, doclist = sol1.get_all_docs() + self.assertEqual([], doclist) + # create many small files + for i in range(0, number_of_docs): + sol1.create_doc(json.loads(simple_doc)) + # ensure remote db exists before syncing + db = CouchDatabase( + self._couch_url, + # the name of the user database is "user-<uuid>". + 'user-user-uuid', + ) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # instantiate soledad with empty db, but with same secrets path + sol2 = self._soledad_instance(prefix='x', auth_token='auth-token') + _, doclist = sol2.get_all_docs() + self.assertEqual([], doclist) + sol2._secrets_path = sol1.secrets_path + sol2._load_secrets() + sol2._set_secret_id(sol1._secret_id) + # sync the new instance + sol2._server_url = self.getURL() + sol2.sync() + _, doclist = sol2.get_all_docs() + self.assertEqual(number_of_docs, len(doclist)) + # assert incoming docs are equal to sent docs + for doc in doclist: + self.assertEqual(sol1.get_doc(doc.doc_id), doc) + # delete remote database + db.delete_database() class LockResourceTestCase( CouchDBTestCase, TestCaseWithServer): @@ -455,12 +557,21 @@ class LockResourceTestCase( CouchDBTestCase.setUp(self) self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") self._couch_url = 'http://localhost:' + str(self.wrapper.port) + # create the databases + CouchDatabase(self._couch_url, 'shared') + CouchDatabase(self._couch_url, 'tokens') self._state = CouchServerState( self._couch_url, 'shared', 'tokens') def tearDown(self): CouchDBTestCase.tearDown(self) TestCaseWithServer.tearDown(self) + # delete remote database + db = CouchDatabase( + self._couch_url, + 'shared', + ) + db.delete_database() def test__try_obtain_filesystem_lock(self): responder = mock.Mock() |