From e91ad8c99c7abc99a97b8afb6ac7e3b7e729b219 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 9 Dec 2013 16:57:55 -0400 Subject: pep8 happy --- common/src/leap/soledad/common/_version.py | 35 ++++++++++++---------- common/src/leap/soledad/common/document.py | 3 +- .../common/tests/u1db_tests/test_http_database.py | 11 ++++--- .../soledad/common/tests/u1db_tests/test_open.py | 3 +- .../common/tests/u1db_tests/test_sqlite_backend.py | 3 +- .../soledad/common/tests/u1db_tests/test_sync.py | 34 +++++++++++---------- 6 files changed, 48 insertions(+), 41 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/_version.py b/common/src/leap/soledad/common/_version.py index 1d020a14..5116b516 100644 --- a/common/src/leap/soledad/common/_version.py +++ b/common/src/leap/soledad/common/_version.py @@ -17,6 +17,7 @@ git_full = "$Format:%H$" import subprocess import sys + def run_command(args, cwd=None, verbose=False): try: # remember shell=False, so use git.cmd on windows, not just git @@ -36,11 +37,10 @@ def run_command(args, cwd=None, verbose=False): return None return stdout - -import sys import re import os.path + def get_expanded_variables(versionfile_source): # the code embedded in _version.py can just fetch the value of these # variables. When used from setup.py, we don't want to import @@ -48,7 +48,7 @@ def get_expanded_variables(versionfile_source): # used from _version.py. variables = {} try: - f = open(versionfile_source,"r") + f = open(versionfile_source, "r") for line in f.readlines(): if line.strip().startswith("git_refnames ="): mo = re.search(r'=\s*"(.*)"', line) @@ -63,12 +63,13 @@ def get_expanded_variables(versionfile_source): pass return variables + def versions_from_expanded_variables(variables, tag_prefix, verbose=False): refnames = variables["refnames"].strip() if refnames.startswith("$Format"): if verbose: print("variables are unexpanded, not using") - return {} # unexpanded, so not in an unpacked git-archive tarball + return {} # unexpanded, so not in an unpacked git-archive tarball refs = set([r.strip() for r in refnames.strip("()").split(",")]) # starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of # just "foo-1.0". If we see a "tag: " prefix, prefer those. @@ -93,13 +94,14 @@ def versions_from_expanded_variables(variables, tag_prefix, verbose=False): r = ref[len(tag_prefix):] if verbose: print("picking %s" % r) - return { "version": r, - "full": variables["full"].strip() } + return {"version": r, + "full": variables["full"].strip()} # no suitable tags, so we use the full revision id if verbose: print("no suitable tags, using full revision id") - return { "version": variables["full"].strip(), - "full": variables["full"].strip() } + return {"version": variables["full"].strip(), + "full": variables["full"].strip()} + def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): # this runs 'git' from the root of the source tree. That either means @@ -116,7 +118,7 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): here = os.path.abspath(__file__) except NameError: # some py2exe/bbfreeze/non-CPython implementations don't do __file__ - return {} # not always correct + return {} # not always correct # versionfile_source is the relative path from the top of the source tree # (where the .git directory might live) to this file. Invert this to find @@ -141,7 +143,8 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): return {} if not stdout.startswith(tag_prefix): if verbose: - print("tag '%s' doesn't start with prefix '%s'" % (stdout, tag_prefix)) + print("tag '%s' doesn't start with prefix '%s'" % ( + stdout, tag_prefix)) return {} tag = stdout[len(tag_prefix):] stdout = run_command([GIT, "rev-parse", "HEAD"], cwd=root) @@ -153,7 +156,8 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): return {"version": tag, "full": full} -def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False): +def versions_from_parentdir(parentdir_prefix, versionfile_source, + verbose=False): if IN_LONG_VERSION_PY: # We're running from _version.py. If it's from a source tree # (execute-in-place), we can work upwards to find the root of the @@ -163,7 +167,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False) here = os.path.abspath(__file__) except NameError: # py2exe/bbfreeze/non-CPython don't have __file__ - return {} # without __file__, we have no hope + return {} # without __file__, we have no hope # versionfile_source is the relative path from the top of the source # tree to _version.py. Invert this to find the root from __file__. root = here @@ -180,7 +184,8 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False) dirname = os.path.basename(root) if not dirname.startswith(parentdir_prefix): if verbose: - print("guessing rootdir is '%s', but '%s' doesn't start with prefix '%s'" % + print("guessing rootdir is '%s', but '%s' doesn't start with " + "prefix '%s'" % (root, dirname, parentdir_prefix)) return None return {"version": dirname[len(parentdir_prefix):], "full": ""} @@ -189,8 +194,9 @@ tag_prefix = "" parentdir_prefix = "leap.soledad.common-" versionfile_source = "src/leap/soledad/common/_version.py" + def get_versions(default={"version": "unknown", "full": ""}, verbose=False): - variables = { "refnames": git_refnames, "full": git_full } + variables = {"refnames": git_refnames, "full": git_full} ver = versions_from_expanded_variables(variables, tag_prefix, verbose) if not ver: ver = versions_from_vcs(tag_prefix, versionfile_source, verbose) @@ -200,4 +206,3 @@ def get_versions(default={"version": "unknown", "full": ""}, verbose=False): if not ver: ver = default return ver - diff --git a/common/src/leap/soledad/common/document.py b/common/src/leap/soledad/common/document.py index cc24b53a..919ade12 100644 --- a/common/src/leap/soledad/common/document.py +++ b/common/src/leap/soledad/common/document.py @@ -29,6 +29,7 @@ from u1db import Document # class SoledadDocument(Document): + """ Encryptable and syncable document. @@ -107,5 +108,3 @@ class SoledadDocument(Document): _get_rev, _set_rev, doc="Wrapper to ensure `doc.rev` is always returned as bytes.") - - diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_http_database.py b/common/src/leap/soledad/common/tests/u1db_tests/test_http_database.py index 3f3c7bba..9251000e 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_http_database.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_http_database.py @@ -133,12 +133,11 @@ class TestHTTPDatabaseSimpleOperations(tests.TestCase): None), self.got) def test_get_doc_deleted_include_deleted(self): - self.response_val = errors.HTTPError(404, - json.dumps( - {"error": errors.DOCUMENT_DELETED} - ), - {'x-u1db-rev': 'doc-rev-gone', - 'x-u1db-has-conflicts': 'false'}) + self.response_val = errors.HTTPError( + 404, + json.dumps({"error": errors.DOCUMENT_DELETED}), + {'x-u1db-rev': 'doc-rev-gone', + 'x-u1db-has-conflicts': 'false'}) doc = self.db.get_doc('deleted', include_deleted=True) self.assertEqual('deleted', doc.doc_id) self.assertEqual('doc-rev-gone', doc.rev) diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_open.py b/common/src/leap/soledad/common/tests/u1db_tests/test_open.py index 13425b4f..63406245 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_open.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_open.py @@ -24,7 +24,8 @@ from u1db import ( ) from leap.soledad.common.tests import u1db_tests as tests from u1db.backends import sqlite_backend -from leap.soledad.common.tests.u1db_tests.test_backends import TestAlternativeDocument +from leap.soledad.common.tests.u1db_tests.test_backends \ + import TestAlternativeDocument class TestU1DBOpen(tests.TestCase): diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_sqlite_backend.py b/common/src/leap/soledad/common/tests/u1db_tests/test_sqlite_backend.py index a53ea6cc..8292dd07 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_sqlite_backend.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_sqlite_backend.py @@ -30,7 +30,8 @@ from u1db import ( from leap.soledad.common.tests import u1db_tests as tests from u1db.backends import sqlite_backend -from leap.soledad.common.tests.u1db_tests.test_backends import TestAlternativeDocument +from leap.soledad.common.tests.u1db_tests.test_backends \ + import TestAlternativeDocument simple_doc = '{"key": "value"}' diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py index 5346d540..1f78f912 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py @@ -760,7 +760,7 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, {'docs': [], 'last_known_gen': 0}, 'return': {'docs': [(doc.doc_id, doc.rev)], - 'last_gen': 1}}) + 'last_gen': 1}}) self.assertEqual([doc], self.db1.get_from_index('test-idx', 'value')) def test_sync_pulling_doesnt_update_other_if_changed(self): @@ -785,7 +785,7 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, {'docs': [], 'last_known_gen': 0}, 'return': {'docs': [(doc.doc_id, doc.rev)], - 'last_gen': 1}}) + 'last_gen': 1}}) self.assertEqual(1, self.db1._get_replica_gen_and_trans_id('test2')[0]) # c2 should not have gotten a '_record_sync_info' call, because the # local database had been updated more than just by the messages @@ -819,8 +819,8 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, self.assertLastExchangeLog(self.db2, {'receive': {'docs': [(doc.doc_id, doc.rev)], - 'source_uid': 'test1', - 'source_gen': 1, 'last_known_gen': 0}, + 'source_uid': 'test1', + 'source_gen': 1, 'last_known_gen': 0}, 'return': {'docs': [], 'last_gen': 1}}) def test_sync_ignores_superseded(self): @@ -839,11 +839,11 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, self.assertLastExchangeLog(self.db1, {'receive': {'docs': [(doc.doc_id, doc_rev1)], - 'source_uid': 'test2', - 'source_gen': 1, 'last_known_gen': 0}, + 'source_uid': 'test2', + 'source_gen': 1, 'last_known_gen': 0}, 'return': {'docs': [(doc.doc_id, doc_rev2)], - 'last_gen': 2}}) + 'last_gen': 2}}) self.assertGetDoc(self.db1, doc.doc_id, doc_rev2, new_content, False) def test_sync_sees_remote_conflicted(self): @@ -861,11 +861,11 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, self.assertLastExchangeLog(self.db2, {'receive': {'docs': [(doc_id, doc1_rev)], - 'source_uid': 'test1', - 'source_gen': 1, 'last_known_gen': 0}, + 'source_uid': 'test1', + 'source_gen': 1, 'last_known_gen': 0}, 'return': {'docs': [(doc_id, doc2_rev)], - 'last_gen': 1}}) + 'last_gen': 1}}) self.assertTransactionLog([doc_id, doc_id], self.db1) self.assertGetDoc(self.db1, doc_id, doc2_rev, new_doc, True) self.assertGetDoc(self.db2, doc_id, doc2_rev, new_doc, False) @@ -892,10 +892,10 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, self.assertLastExchangeLog(self.db2, {'receive': {'docs': [(doc_id, doc1.rev)], - 'source_uid': 'test1', - 'source_gen': 2, 'last_known_gen': 1}, + 'source_uid': 'test1', + 'source_gen': 2, 'last_known_gen': 1}, 'return': {'docs': [(doc_id, doc2.rev)], - 'last_gen': 2}}) + 'last_gen': 2}}) self.assertTransactionLog([doc_id, doc_id, doc_id], self.db1) self.assertGetDocIncludeDeleted(self.db1, doc_id, doc2.rev, None, True) self.assertGetDocIncludeDeleted( @@ -950,8 +950,8 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, self.assertLastExchangeLog(self.db2, {'receive': {'docs': [(doc_id, deleted_rev)], - 'source_uid': 'test1', - 'source_gen': 2, 'last_known_gen': 1}, + 'source_uid': 'test1', + 'source_gen': 2, 'last_known_gen': 1}, 'return': {'docs': [], 'last_gen': 2}}) self.assertGetDocIncludeDeleted( self.db1, doc_id, deleted_rev, None, False) @@ -1121,6 +1121,7 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, class TestDbSync(tests.TestCaseWithServer): + """Test db.sync remote sync shortcut""" scenarios = [ @@ -1189,6 +1190,7 @@ class TestDbSync(tests.TestCaseWithServer): class TestRemoteSyncIntegration(tests.TestCaseWithServer): + """Integration tests for the most common sync scenario local -> remote""" make_app_with_state = staticmethod(make_http_app) @@ -1204,7 +1206,7 @@ class TestRemoteSyncIntegration(tests.TestCaseWithServer): doc12 = self.db1.create_doc_from_json('{"a": 2}') doc21 = self.db2.create_doc_from_json('{"b": 1}') doc22 = self.db2.create_doc_from_json('{"b": 2}') - #sanity + # sanity self.assertEqual(2, len(self.db1._get_transaction_log())) self.assertEqual(2, len(self.db2._get_transaction_log())) progress1 = [] -- cgit v1.2.3 From 45241ec8618d24b61b768bc7027473b52945609f Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 16 Dec 2013 13:21:19 -0400 Subject: patch the _version file so it reports the running version until now this was only possible when running python setup.py version from the source tree. now the .__version__ also reports correctly the runnng version --- common/src/leap/soledad/common/_version.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/_version.py b/common/src/leap/soledad/common/_version.py index 5116b516..7d4262b5 100644 --- a/common/src/leap/soledad/common/_version.py +++ b/common/src/leap/soledad/common/_version.py @@ -128,7 +128,16 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): for i in range(len(versionfile_source.split("/"))): root = os.path.dirname(root) else: - root = os.path.dirname(here) + root = os.path.dirname( + os.path.join('..', here)) + + ###################################################### + # XXX patch for our specific configuration with + # the three projects leap.soledad.{common, client, server} + # inside the same repo. + ###################################################### + root = os.path.dirname(os.path.join('..', root)) + if not os.path.exists(os.path.join(root, ".git")): if verbose: print("no .git in %s" % root) -- cgit v1.2.3 From ae2894ba47ee5bc905f298db6b67ae40af6ebd74 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 4 Dec 2013 13:42:59 -0200 Subject: Make couch backend consistent and lightweight. This commit introduces the use of couch view, list and update functions to put and get data from the couch database. This avoids loads of metadata transferring and encapsulates operations in atomic PUTs. --- common/src/leap/soledad/common/README.txt | 79 ++ common/src/leap/soledad/common/couch.py | 1229 ++++++++++++++------ common/src/leap/soledad/common/ddocs/README.txt | 29 + common/src/leap/soledad/common/ddocs/__init__.py | 138 +++ .../leap/soledad/common/ddocs/docs/updates/put.js | 64 + .../common/ddocs/docs/updates/resolve_doc.js | 39 + .../soledad/common/ddocs/docs/views/get/map.js | 20 + .../leap/soledad/common/ddocs/syncs/updates/put.js | 22 + .../soledad/common/ddocs/syncs/views/log/map.js | 12 + .../common/ddocs/transactions/lists/generation.js | 20 + .../ddocs/transactions/lists/trans_id_for_gen.js | 19 + .../ddocs/transactions/lists/whats_changed.js | 22 + .../common/ddocs/transactions/views/log/map.js | 7 + common/src/leap/soledad/common/objectstore.py | 282 ----- common/src/leap/soledad/common/tests/test_couch.py | 3 +- 15 files changed, 1325 insertions(+), 660 deletions(-) create mode 100644 common/src/leap/soledad/common/README.txt create mode 100644 common/src/leap/soledad/common/ddocs/README.txt create mode 100644 common/src/leap/soledad/common/ddocs/__init__.py create mode 100644 common/src/leap/soledad/common/ddocs/docs/updates/put.js create mode 100644 common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js create mode 100644 common/src/leap/soledad/common/ddocs/docs/views/get/map.js create mode 100644 common/src/leap/soledad/common/ddocs/syncs/updates/put.js create mode 100644 common/src/leap/soledad/common/ddocs/syncs/views/log/map.js create mode 100644 common/src/leap/soledad/common/ddocs/transactions/lists/generation.js create mode 100644 common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js create mode 100644 common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js create mode 100644 common/src/leap/soledad/common/ddocs/transactions/views/log/map.js delete mode 100644 common/src/leap/soledad/common/objectstore.py (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/README.txt b/common/src/leap/soledad/common/README.txt new file mode 100644 index 00000000..106efb5e --- /dev/null +++ b/common/src/leap/soledad/common/README.txt @@ -0,0 +1,79 @@ +Soledad common package +====================== + +This package contains Soledad bits used by both server and client. + +Couch U1DB Backend +------------------ + +U1DB backends rely on some atomic operations that modify documents contents +and metadata (conflicts, transaction ids and indexes). The only atomic +operation in Couch is a document put, so every u1db atomic operation has to be +mapped to a couch document put. + +The atomic operations in the U1DB SQLite reference backend implementation may +be identified by the use of a context manager to access the underlying +database. A listing of the methods involved in each atomic operation are +depiced below. The top-level elements correpond to the atomic operations that +have to be mapped, and items on deeper levels of the list have to be +implemented in a way that all changes will be pushed with just one operation. + + * _set_replica_uid + * put_doc: + * _get_doc + * _put_and_update_indexes + * insert/update the document + * insert into transaction log + * delete_doc + * _get_doc + * _put_and_update_indexes + * get_doc_conflicts + * _get_conflicts + * _set_replica_gen_and_trans_id + * _do_set_replica_gen_and_trans_id + * _put_doc_if_newer + * _get_doc + * _validate_source (**) + * _get_replica_gen_and_trans_id + * cases: + * is newer: + * _prune_conflicts (**) + * _has_conflicts + * _delete_conflicts + * _put_and_update_indexes + * same content as: + * _put_and_update_indexes + * conflicted: + * _force_doc_sync_conflict + * _prune_conflicts + * _add_conflict + * _put_and_update_indexes + * _do_set_replica_gen_and_trans_id + * resolve_doc + * _get_doc + * cases: + * doc is superseded + * _put_and_update_indexes + * else + * _add_conflict + * _delete_conflicts + * delete_index + * create_index + +Couch views and update functions are used in order to achieve atomicity on the +Couch backend. Transactions are stored in the `u1db_transactions` field of the +couch document. Document's content and conflicted versions are stored as couch +document attachments with names, respectivelly, `u1db_content` and +`u1db_conflicts`. + +A map of methods and couch query URI can be found on the `./ddocs/README.txt` +document. + +Notes: + + * Currently, the couch backend does not implement indexing, so what is + depicted as `_put_and_update_indexes` above will be found as `_put_doc` in + the backend. + + * Conflict updates are part of document put using couch update functions, + and as such are part of the same atomic operation as document put. diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 1396f4d7..3b0e042a 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -18,26 +18,34 @@ """A U1DB backend that uses CouchDB as its persistence layer.""" -import re +import os import simplejson as json -import socket +import sys +import time +import uuid import logging +import binascii +import socket -from u1db import errors -from u1db.sync import Synchronizer -from u1db.backends.inmemory import InMemoryIndex +from u1db.backends import CommonBackend, CommonSyncTarget from u1db.remote.server_state import ServerState -from u1db.errors import DatabaseDoesNotExist -from couchdb.client import Server, Document as CouchDocument +from u1db import ( + errors, + query_parser, + vectorclock, +) +from couchdb.client import ( + Server, + Document as CouchDocument, + _doc_resource, +) from couchdb.http import ResourceNotFound, Unauthorized from leap.soledad.common import USER_DB_PREFIX -from leap.soledad.common.objectstore import ( - ObjectStoreDatabase, - ObjectStoreSyncTarget, -) +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common.ddocs import ensure_ddocs_on_remote_db logger = logging.getLogger(__name__) @@ -49,158 +57,125 @@ class InvalidURLError(Exception): """ -def persistent_class(cls): +class CouchDocument(SoledadDocument): """ - Decorator that modifies a class to ensure u1db metadata persists on - underlying storage. + This is the document used for maintaining the Couch backend. - @param cls: The class that will be modified. - @type cls: type + A CouchDocument can fetch and manipulate conflicts and also holds a + reference to the couch document revision. This data is used to ensure an + atomic and consistent update of the database. """ - def _create_persistent_method(old_method_name, key, load_method_name, - dump_method_name, store): - """ - Create a persistent method to replace C{old_method_name}. - - The new method will load C{key} using C{load_method_name} and stores - it using C{dump_method_name} depending on the value of C{store}. - """ - # get methods - old_method = getattr(cls, old_method_name) - load_method = getattr(cls, load_method_name) \ - if load_method_name is not None \ - else lambda self, data: setattr(self, key, data) - dump_method = getattr(cls, dump_method_name) \ - if dump_method_name is not None \ - else lambda self: getattr(self, key) - - def _new_method(self, *args, **kwargs): - # get u1db data from couch db - doc = self._get_doc('%s%s' % - (self.U1DB_DATA_DOC_ID_PREFIX, key)) - load_method(self, doc.content['content']) - # run old method - retval = old_method(self, *args, **kwargs) - # store u1db data on couch - if store: - doc.content = {'content': dump_method(self)} - self._put_doc(doc) - return retval - - return _new_method - - # ensure the class has a persistency map - if not hasattr(cls, 'PERSISTENCY_MAP'): - logger.error('Class %s has no PERSISTENCY_MAP attribute, skipping ' - 'persistent methods substitution.' % cls) - return cls - # replace old methods with new persistent ones - for key, ((load_method_name, dump_method_name), - persistent_methods) in cls.PERSISTENCY_MAP.iteritems(): - for (method_name, store) in persistent_methods: - setattr(cls, method_name, - _create_persistent_method( - method_name, - key, - load_method_name, - dump_method_name, - store)) - return cls - - -@persistent_class -class CouchDatabase(ObjectStoreDatabase): + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, + syncable=True): + """ + Container for handling a document that is stored in couch backend. + + :param doc_id: The unique document identifier. + :type doc_id: str + :param rev: The revision identifier of the document. + :type rev: str + :param json: The JSON string for this document. + :type json: str + :param has_conflicts: Boolean indicating if this document has conflicts + :type has_conflicts: bool + :param syncable: Should this document be synced with remote replicas? + :type syncable: bool + """ + SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) + self._couch_rev = None + self._conflicts = None + self._modified_conflicts = False + + def ensure_fetch_conflicts(self, get_conflicts_fun): + """ + Ensure conflict data has been fetched from the server. + + :param get_conflicts_fun: A function which, given the document id and + the couch revision, return the conflicted + versions of the current document. + :type get_conflicts_fun: function + """ + if self._conflicts is None: + self._conflicts = get_conflicts_fun(self.doc_id, + couch_rev=self.couch_rev) + self.has_conflicts = len(self._conflicts) > 0 + + def get_conflicts(self): + """ + Get the conflicted versions of the document. + + :return: The conflicted versions of the document. + :rtype: [CouchDocument] + """ + return self._conflicts + + def add_conflict(self, doc): + """ + Add a conflict to this document. + + :param doc: The conflicted version to be added. + :type doc: CouchDocument + """ + if self._conflicts is None: + raise Exception("Run self.ensure_fetch_conflicts first!") + self._modified_conflicts = True + self._conflicts.append(doc) + self.has_conflicts = len(self._conflicts) > 0 + + def delete_conflicts(self, conflict_revs): + """ + Delete conflicted versions of this document. + + :param conflict_revs: The conflicted revisions to be deleted. + :type conflict_revs: [str] + """ + if self._conflicts is None: + raise Exception("Run self.ensure_fetch_conflicts first!") + conflicts_len = len(self._conflicts) + self._conflicts = filter( + lambda doc: doc.rev not in conflict_revs, + self._conflicts) + if len(self._conflicts) < conflicts_len: + self._modified_conflicts = True + self.has_conflicts = len(self._conflicts) > 0 + + def modified_conflicts(self): + """ + Return whether this document's conflicts have been modified. + + :return: Whether this document's conflicts have been modified. + :rtype: bool + """ + return self._conflicts is not None and \ + self._modified_conflicts is True + + def _get_couch_rev(self): + return self._couch_rev + + def _set_couch_rev(self, rev): + self._couch_rev = rev + + couch_rev = property(_get_couch_rev, _set_couch_rev) + + +class CouchDatabase(CommonBackend): """ - A U1DB backend that uses Couch as its persistence layer. + A U1DB implementation that uses CouchDB as its persistence layer. """ - U1DB_TRANSACTION_LOG_KEY = '_transaction_log' - U1DB_CONFLICTS_KEY = '_conflicts' - U1DB_OTHER_GENERATIONS_KEY = '_other_generations' - U1DB_INDEXES_KEY = '_indexes' - U1DB_REPLICA_UID_KEY = '_replica_uid' - - U1DB_DATA_KEYS = [ - U1DB_TRANSACTION_LOG_KEY, - U1DB_CONFLICTS_KEY, - U1DB_OTHER_GENERATIONS_KEY, - U1DB_INDEXES_KEY, - U1DB_REPLICA_UID_KEY, - ] - - COUCH_ID_KEY = '_id' - COUCH_REV_KEY = '_rev' - COUCH_U1DB_ATTACHMENT_KEY = 'u1db_json' - COUCH_U1DB_REV_KEY = 'u1db_rev' - - # the following map describes information about methods usage of - # properties that have to persist on the underlying database. The format - # of the map is assumed to be: - # - # { - # 'property_name': [ - # ('property_load_method_name', 'property_dump_method_name'), - # [('method_1_name', bool), - # ... - # ('method_N_name', bool)]], - # ... - # } - # - # where the booleans indicate if the property should be stored after - # each method execution (i.e. if the method alters the property). Property - # load/dump methods will be run after/before properties are read/written - # to the underlying db. - PERSISTENCY_MAP = { - U1DB_TRANSACTION_LOG_KEY: [ - ('_load_transaction_log_from_json', None), - [('_get_transaction_log', False), - ('_get_generation', False), - ('_get_generation_info', False), - ('_get_trans_id_for_gen', False), - ('whats_changed', False), - ('_put_and_update_indexes', True)]], - U1DB_CONFLICTS_KEY: [ - (None, None), - [('_has_conflicts', False), - ('get_doc_conflicts', False), - ('_prune_conflicts', False), - ('resolve_doc', False), - ('_replace_conflicts', True), - ('_force_doc_sync_conflict', True)]], - U1DB_OTHER_GENERATIONS_KEY: [ - ('_load_other_generations_from_json', None), - [('_get_replica_gen_and_trans_id', False), - ('_do_set_replica_gen_and_trans_id', True)]], - U1DB_INDEXES_KEY: [ - ('_load_indexes_from_json', '_dump_indexes_as_json'), - [('list_indexes', False), - ('get_from_index', False), - ('get_range_from_index', False), - ('get_index_keys', False), - ('_put_and_update_indexes', True), - ('create_index', True), - ('delete_index', True)]], - U1DB_REPLICA_UID_KEY: [ - (None, None), - [('_allocate_doc_rev', False), - ('_put_doc_if_newer', False), - ('_ensure_maximal_rev', False), - ('_prune_conflicts', False), - ('_set_replica_uid', True)]]} - @classmethod def open_database(cls, url, create): """ Open a U1DB database using CouchDB as backend. - @param url: the url of the database replica - @type url: str - @param create: should the replica be created if it does not exist? - @type create: bool + :param url: the url of the database replica + :type url: str + :param create: should the replica be created if it does not exist? + :type create: bool - @return: the database instance - @rtype: CouchDatabase + :return: the database instance + :rtype: CouchDatabase """ # get database from url m = re.match('(^https?://[^/]+)/(.+)$', url) @@ -221,293 +196,795 @@ class CouchDatabase(ObjectStoreDatabase): """ Create a new Couch data container. - @param url: the url of the couch database - @type url: str - @param dbname: the database name - @type dbname: str - @param replica_uid: an optional unique replica identifier - @type replica_uid: str - @param full_commit: turn on the X-Couch-Full-Commit header - @type full_commit: bool - @param session: an http.Session instance or None for a default session - @type session: http.Session + :param url: the url of the couch database + :type url: str + :param dbname: the database name + :type dbname: str + :param replica_uid: an optional unique replica identifier + :type replica_uid: str + :param full_commit: turn on the X-Couch-Full-Commit header + :type full_commit: bool + :param session: an http.Session instance or None for a default session + :type session: http.Session + :param ensure_ddocs: Ensure that the design docs exist on server. + :type ensure_ddocs: bool """ # save params self._url = url self._full_commit = full_commit self._session = session + self._factory = CouchDocument + self._real_replica_uid = None # configure couch self._server = Server(url=self._url, full_commit=self._full_commit, session=self._session) self._dbname = dbname - # this will ensure that transaction and sync logs exist and are - # up-to-date. try: self._database = self._server[self._dbname] except ResourceNotFound: self._server.create(self._dbname) self._database = self._server[self._dbname] - ObjectStoreDatabase.__init__(self, replica_uid=replica_uid) + self._initialize(replica_uid or uuid.uuid4().hex) + + def get_sync_target(self): + """ + Return a SyncTarget object, for another u1db to synchronize with. + + :return: The sync target. + :rtype: CouchSyncTarget + """ + return CouchSyncTarget(self) + + def delete_database(self): + """ + Delete a U1DB CouchDB database. + """ + del(self._server[self._dbname]) + + def close(self): + """ + Release any resources associated with this database. + + :return: True if db was succesfully closed. + :rtype: bool + """ + self._url = None + self._full_commit = None + self._session = None + self._server = None + self._database = None + return True + + def _set_replica_uid(self, replica_uid): + """ + Force the replica uid to be set. + + :param replica_uid: The new replica uid. + :type replica_uid: str + """ + try: + doc = self._database['u1db_config'] + except ResourceNotFound: + doc = { + '_id': 'u1db_config', + 'replica_uid': replica_uid, + } + self._database.save(doc) + + def _ensure_design_docs(self): + """ + Ensure that the design docs have been created. + """ + if self._is_initialized(): + return + self._initialize() + + def _set_replica_uid(self, replica_uid): + """Force the replica_uid to be set.""" + doc = self._database['u1db_config'] + doc['replica_uid'] = replica_uid + self._database.save(doc) + self._real_replica_uid = replica_uid + + def _get_replica_uid(self): + """ + Get the replica uid. + + :return: The replica uid. + :rtype: str + """ + if self._real_replica_uid is not None: + return self._real_replica_uid + doc = self._database['u1db_config'] + self._real_replica_uid = doc['replica_uid'] + return self._real_replica_uid + + _replica_uid = property(_get_replica_uid, _set_replica_uid) + + def _get_generation(self): + """ + Return the current generation. + + :return: The current generation. + :rtype: int + """ + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'generation', 'log') + response = res.get_json() + return response[2]['generation'] + + def _get_generation_info(self): + """ + Return the current generation. - #------------------------------------------------------------------------- - # methods from Database - #------------------------------------------------------------------------- + :return: A tuple containing the current generation and transaction id. + :rtype: (int, str) + """ + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'generation', 'log') + response = res.get_json() + return (response[2]['generation'], response[2]['transaction_id']) + + def _get_trans_id_for_gen(self, generation): + """ + Get the transaction id corresponding to a particular generation. + + :param generation: The generation for which to get the transaction id. + :type generation: int + + :return: The transaction id for C{generation}. + :rtype: str + + :raise InvalidGeneration: Raised when the generation does not exist. + """ + if generation == 0: + return '' + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'trans_id_for_gen', 'log') + response = res.get_json(gen=generation) + if response[2] == {}: + raise errors.InvalidGeneration + return response[2]['transaction_id'] + + def _get_transaction_log(self): + """ + This is only for the test suite, it is not part of the api. + + :return: The complete transaction log. + :rtype: [(str, str)] + """ + # query a couch view + res = self._database.resource( + '_design', 'transactions', '_view', 'log') + response = res.get_json() + return map(lambda row: (row['id'], row['value']), response[2]['rows']) def _get_doc(self, doc_id, check_for_conflicts=False): """ - Get just the document content, without fancy handling. + Extract the document from storage. + + This can return None if the document doesn't exist. + + :param doc_id: The unique document identifier + :type doc_id: str + :param check_for_conflicts: If set to False, then the conflict check + will be skipped. + :type check_for_conflicts: bool + + :return: The document. + :rtype: CouchDocument + """ + # get document with all attachments (u1db content and eventual + # conflicts) + try: + result = \ + self._database.resource(doc_id).get_json( + attachments=True)[2] + except ResourceNotFound: + return None + # restrict to u1db documents + if 'u1db_rev' not in result: + return None + doc = self._factory(doc_id, result['u1db_rev']) + # set contents or make tombstone + if '_attachments' not in result \ + or 'u1db_content' not in result['_attachments']: + doc.make_tombstone() + else: + doc.content = json.loads( + binascii.a2b_base64( + result['_attachments']['u1db_content']['data'])) + # determine if there are conflicts + if check_for_conflicts \ + and '_attachments' in result \ + and 'u1db_conflicts' in result['_attachments']: + doc.has_conflicts = True + # store couch revision + doc.couch_rev = result['_rev'] + return doc + + def get_doc(self, doc_id, include_deleted=False): + """ + Get the JSON string for the given document. - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be + :param doc_id: The unique document identifier + :type doc_id: str + :param include_deleted: If set to True, deleted documents will be returned with empty content. Otherwise asking for a deleted document will return None. - @type include_deleted: bool + :type include_deleted: bool - @return: a Document object. - @type: u1db.Document + :return: A document object. + :rtype: CouchDocument. """ - cdoc = self._database.get(doc_id) - if cdoc is None: + doc = self._get_doc(doc_id, check_for_conflicts=True) + if doc is None: + return None + if doc.is_tombstone() and not include_deleted: return None - has_conflicts = False - if check_for_conflicts: - has_conflicts = self._has_conflicts(doc_id) - doc = self._factory( - doc_id=doc_id, - rev=cdoc[self.COUCH_U1DB_REV_KEY], - has_conflicts=has_conflicts) - contents = self._database.get_attachment( - cdoc, - self.COUCH_U1DB_ATTACHMENT_KEY) - if contents: - doc.content = json.loads(contents.read()) - else: - doc.make_tombstone() return doc def get_all_docs(self, include_deleted=False): """ Get the JSON content for all documents in the database. - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise deleted documents will not - be included in the results. - @type include_deleted: bool + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :type include_deleted: bool - @return: (generation, [Document]) + :return: (generation, [CouchDocument]) The current generation of the database, followed by a list of all the documents in the database. - @rtype: tuple + :rtype: (int, [CouchDocument]) """ + generation = self._get_generation() results = [] - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue - doc = self._get_doc(doc_id, check_for_conflicts=True) - if doc.content is None and not include_deleted: - continue - results.append(doc) + for row in self._database.view('_all_docs'): + doc = self.get_doc(row.id, include_deleted=include_deleted) + if doc is not None: + results.append(doc) return (generation, results) - def _put_doc(self, doc): + def _put_doc(self, old_doc, doc): """ - Update a document. + Put the document in the Couch backend database. - This is called everytime we just want to do a raw put on the db (i.e. - without index updates, document constraint checks, and conflict - checks). - - @param doc: The document to update. - @type doc: u1db.Document - - @return: The new revision identifier for the document. - @rtype: str - """ - # prepare couch's Document - cdoc = CouchDocument() - cdoc[self.COUCH_ID_KEY] = doc.doc_id - # we have to guarantee that couch's _rev is consistent - old_cdoc = self._database.get(doc.doc_id) - if old_cdoc is not None: - cdoc[self.COUCH_REV_KEY] = old_cdoc[self.COUCH_REV_KEY] - # store u1db's rev - cdoc[self.COUCH_U1DB_REV_KEY] = doc.rev - # save doc in db - self._database.save(cdoc) - # store u1db's content as json string - if not doc.is_tombstone(): - self._database.put_attachment( - cdoc, doc.get_json(), - filename=self.COUCH_U1DB_ATTACHMENT_KEY) - else: - self._database.delete_attachment( - cdoc, - self.COUCH_U1DB_ATTACHMENT_KEY) + :param old_doc: The old document version. + :type old_doc: CouchDocument + :param doc: The document to be put. + :type doc: CouchDocument - def get_sync_target(self): + :raise RevisionConflict: Raised when trying to update a document but + couch revisions mismatch. """ - Return a SyncTarget object, for another u1db to synchronize with. - - @return: The sync target. - @rtype: CouchSyncTarget + trans_id = self._allocate_transaction_id() + # encode content + content = doc.get_json() + if content is not None: + content = binascii.b2a_base64(content)[:-1] # exclude trailing \n + # encode conflicts + conflicts = None + update_conflicts = doc.modified_conflicts() + if update_conflicts is True: + if doc.has_conflicts: + conflicts = binascii.b2a_base64( + json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + )[:-1] # exclude \n + # perform the request + resource = self._database.resource( + '_design', 'docs', '_update', 'put', doc.doc_id) + response = resource.put_json( + body={ + 'couch_rev': old_doc.couch_rev + if old_doc is not None else None, + 'u1db_rev': doc.rev, + 'content': content, + 'trans_id': trans_id, + 'conflicts': conflicts, + 'update_conflicts': update_conflicts, + }, + headers={'content-type': 'application/json'}) + # the document might have been updated in between, so we check for the + # return message + msg = response[2].read() + if msg == 'ok': + return + elif msg == 'revision conflict': + raise errors.RevisionConflict() + + def put_doc(self, doc): """ - return CouchSyncTarget(self) + Update a document. - def create_index(self, index_name, *index_expressions): - """ - Create a named index, which can then be queried for future lookups. + If the document currently has conflicts, put will fail. + If the database specifies a maximum document size and the document + exceeds it, put will fail and raise a DocumentTooBig exception. - @param index_name: A unique name which can be used as a key prefix. - @param index_expressions: Index expressions defining the index - information. - """ - if index_name in self._indexes: - if self._indexes[index_name]._definition == list( - index_expressions): - return - raise errors.IndexNameTakenError - index = InMemoryIndex(index_name, list(index_expressions)) - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue # skip special files - doc = self._get_doc(doc_id) - if doc.content is not None: - index.add_json(doc_id, doc.get_json()) - self._indexes[index_name] = index + :param doc: A Document with new content. + :return: new_doc_rev - The new revision identifier for the document. + The Document object will also be updated. - def close(self): + :raise errors.InvalidDocId: Raised if the document's id is invalid. + :raise errors.DocumentTooBig: Raised if the document size is too big. + :raise errors.ConflictedDoc: Raised if the document has conflicts. """ - Release any resources associated with this database. - - @return: True if db was succesfully closed. - @rtype: bool + if doc.doc_id is None: + raise errors.InvalidDocId() + self._check_doc_id(doc.doc_id) + self._check_doc_size(doc) + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc and old_doc.has_conflicts: + raise errors.ConflictedDoc() + if old_doc and doc.rev is None and old_doc.is_tombstone(): + new_rev = self._allocate_doc_rev(old_doc.rev) + else: + if old_doc is not None: + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + else: + if doc.rev is not None: + raise errors.RevisionConflict() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + self._put_doc(old_doc, doc) + return new_rev + + def whats_changed(self, old_generation=0): """ - # TODO: fix this method so the connection is properly closed and - # test_close (+tearDown, which deletes the db) works without problems. - self._url = None - self._full_commit = None - self._session = None - #self._server = None - self._database = None - return True - - def sync(self, url, creds=None, autocreate=True): + Return a list of documents that have changed since old_generation. + + :param old_generation: The generation of the database in the old + state. + :type old_generation: int + + :return: (generation, trans_id, [(doc_id, generation, trans_id),...]) + The current generation of the database, its associated + transaction id, and a list of of changed documents since + old_generation, represented by tuples with for each document + its doc_id and the generation and transaction id corresponding + to the last intervening change and sorted by generation (old + changes first) + :rtype: (int, str, [(str, int, str)]) """ - Synchronize documents with remote replica exposed at url. + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'whats_changed', 'log') + response = res.get_json(old_gen=old_generation) + results = map( + lambda row: + (row['generation'], row['doc_id'], row['transaction_id']), + response[2]['transactions']) + results.reverse() + cur_gen = old_generation + seen = set() + changes = [] + newest_trans_id = '' + for generation, doc_id, trans_id in results: + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + if changes: + cur_gen = changes[0][1] # max generation + newest_trans_id = changes[0][2] + changes.reverse() + else: + cur_gen, newest_trans_id = self._get_generation_info() - @param url: The url of the target replica to sync with. - @type url: str - @param creds: optional dictionary giving credentials. - to authorize the operation with the server. - @type creds: dict - @param autocreate: Ask the target to create the db if non-existent. - @type autocreate: bool + return cur_gen, newest_trans_id, changes - @return: The local generation before the synchronisation was performed. - @rtype: int + def delete_doc(self, doc): """ - return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync( - autocreate=autocreate) + Mark a document as deleted. + + Will abort if the current revision doesn't match doc.rev. + This will also set doc.content to None. - #------------------------------------------------------------------------- - # methods from ObjectStoreDatabase - #------------------------------------------------------------------------- + :param doc: The document to mark as deleted. + :type doc: CouchDocument. - def _init_u1db_data(self): + :raise errors.DocumentDoesNotExist: Raised if the document does not + exist. + :raise errors.RevisionConflict: Raised if the revisions do not match. + :raise errors.DocumentAlreadyDeleted: Raised if the document is + already deleted. + :raise errors.ConflictedDoc: Raised if the doc has conflicts. """ - Initialize u1db configuration data on backend storage. + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc is None: + raise errors.DocumentDoesNotExist + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + if old_doc.is_tombstone(): + raise errors.DocumentAlreadyDeleted + if old_doc.has_conflicts: + raise errors.ConflictedDoc() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + doc.make_tombstone() + self._put_doc(old_doc, doc) + return new_rev + + def _get_conflicts(self, doc_id, couch_rev=None): + """ + Get the conflicted versions of a document. + + If the C{couch_rev} parameter is not None, conflicts for a specific + document's couch revision are returned. - A U1DB database needs to keep track of all database transactions, - document conflicts, the generation of other replicas it has seen, - indexes created by users and so on. + :param couch_rev: The couch document revision. + :type couch_rev: str - In this implementation, all this information is stored in special - documents stored in the underlying with doc_id prefix equal to - U1DB_DATA_DOC_ID_PREFIX. Those documents ids are reserved: put_doc(), - get_doc() and delete_doc() will not allow documents with a doc_id with - that prefix to be accessed or modified. + :return: A list of conflicted versions of the document. + :rtype: list """ - for key in self.U1DB_DATA_KEYS: - doc_id = '%s%s' % (self.U1DB_DATA_DOC_ID_PREFIX, key) - doc = self._get_doc(doc_id) - if doc is None: - doc = self._factory(doc_id) - doc.content = {'content': getattr(self, key)} - self._put_doc(doc) + # request conflicts attachment from server + params = {} + if couch_rev is not None: + params['rev'] = couch_rev # restric document's couch revision + resource = self._database.resource(doc_id, 'u1db_conflicts') + try: + response = resource.get_json(**params) + conflicts = [] + # build the conflicted versions + for doc_rev, content in json.loads(response[2].read()): + doc = self._factory(doc_id, doc_rev) + if content is None: + doc.make_tombstone() + else: + doc.content = content + conflicts.append(doc) + return conflicts + except ResourceNotFound: + return [] - #------------------------------------------------------------------------- - # Couch specific methods - #------------------------------------------------------------------------- + def get_doc_conflicts(self, doc_id): + """ + Get the list of conflicts for the given document. - INDEX_NAME_KEY = 'name' - INDEX_DEFINITION_KEY = 'definition' - INDEX_VALUES_KEY = 'values' + The order of the conflicts is such that the first entry is the value + that would be returned by "get_doc". - def delete_database(self): - """ - Delete a U1DB CouchDB database. + :return: A list of the document entries that are conflicted. + :rtype: [CouchDocument] """ - del(self._server[self._dbname]) + conflict_docs = self._get_conflicts(doc_id) + if len(conflict_docs) == 0: + return [] + this_doc = self._get_doc(doc_id, check_for_conflicts=True) + return [this_doc] + conflict_docs - def _dump_indexes_as_json(self): + def _get_replica_gen_and_trans_id(self, other_replica_uid): """ - Dump index definitions as JSON. + Return the last known generation and transaction id for the other db + replica. + + When you do a synchronization with another replica, the Database keeps + track of what generation the other database replica was at, and what + the associated transaction id was. This is used to determine what data + needs to be sent, and if two databases are claiming to be the same + replica. + + :param other_replica_uid: The identifier for the other replica. + :type other_replica_uid: str + + :return: A tuple containing the generation and transaction id we + encountered during synchronization. If we've never + synchronized with the replica, this is (0, ''). + :rtype: (int, str) """ - indexes = {} - for name, idx in self._indexes.iteritems(): - indexes[name] = {} - for attr in [self.INDEX_NAME_KEY, self.INDEX_DEFINITION_KEY, - self.INDEX_VALUES_KEY]: - indexes[name][attr] = getattr(idx, '_' + attr) - return indexes + # query a couch view + result = self._database.view('syncs/log') + if len(result[other_replica_uid].rows) == 0: + return (0, '') + return ( + result[other_replica_uid].rows[0]['value']['known_generation'], + result[other_replica_uid].rows[0]['value']['known_transaction_id'] + ) + + def _set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + self._do_set_replica_gen_and_trans_id( + other_replica_uid, other_generation, other_transaction_id) - def _load_indexes_from_json(self, indexes): + def _do_set_replica_gen_and_trans_id( + self, other_replica_uid, other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + # query a couch update function + res = self._database.resource( + '_design', 'syncs', '_update', 'put', 'u1db_sync_log') + res.put_json( + body={ + 'other_replica_uid': other_replica_uid, + 'other_generation': other_generation, + 'other_transaction_id': other_transaction_id, + }, + headers={'content-type': 'application/json'}) + + def _add_conflict(self, doc, my_doc_rev, my_content): + """ + Add a conflict to the document. + + Note that this method does not actually update the backend; rather, it + updates the CouchDocument object which will provide the conflict data + when the atomic document update is made. + + :param doc: The document to have conflicts added to. + :type doc: CouchDocument + :param my_doc_rev: The revision of the conflicted document. + :type my_doc_rev: str + :param my_content: The content of the conflicted document as a JSON + serialized string. + :type my_content: str """ - Load index definitions from stored JSON. + doc.ensure_fetch_conflicts(self._get_conflicts) + doc.add_conflict( + self._factory(doc_id=doc.doc_id, rev=my_doc_rev, + json=my_content)) - @param indexes: A JSON representation of indexes as - [('index-name', ['field', 'field2', ...]), ...]. - @type indexes: str + def _delete_conflicts(self, doc, conflict_revs): """ - self._indexes = {} - for name, idx_dict in indexes.iteritems(): - idx = InMemoryIndex(name, idx_dict[self.INDEX_DEFINITION_KEY]) - idx._values = idx_dict[self.INDEX_VALUES_KEY] - self._indexes[name] = idx + Delete the conflicted revisions from the list of conflicts of C{doc}. - def _load_transaction_log_from_json(self, transaction_log): + Note that thie method does not actually update the backed; rather, it + updates the CouchDocument object which will provide the conflict data + when the atomic document update is made. + + :param doc: The document to have conflicts deleted. + :type doc: CouchDocument + :param conflict_revs: A list of the revisions to be deleted. + :param conflict_revs: [str] """ - Load transaction log from stored JSON. + doc.ensure_fetch_conflicts(self._get_conflicts) + doc.delete_conflicts(conflict_revs) - @param transaction_log: A JSON representation of transaction_log as - [('generation', 'transaction_id'), ...]. - @type transaction_log: list + def _prune_conflicts(self, doc, doc_vcr): + """ + Prune conflicts that are older then the current document's revision, or + whose content match to the current document's content. + + :param doc: The document to have conflicts pruned. + :type doc: CouchDocument + :param doc_vcr: A vector clock representing the current document's + revision. + :type doc_vcr: u1db.vectorclock.VectorClock """ - self._transaction_log = [] - for gen, trans_id in transaction_log: - self._transaction_log.append((gen, trans_id)) + if doc.has_conflicts is True: + autoresolved = False + c_revs_to_prune = [] + for c_doc in doc.get_conflicts(): + c_vcr = vectorclock.VectorClockRev(c_doc.rev) + if doc_vcr.is_newer(c_vcr): + c_revs_to_prune.append(c_doc.rev) + elif doc.same_content_as(c_doc): + c_revs_to_prune.append(c_doc.rev) + doc_vcr.maximize(c_vcr) + autoresolved = True + if autoresolved: + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._delete_conflicts(doc, c_revs_to_prune) + + def _force_doc_sync_conflict(self, doc): + """ + Add a conflict and force a document put. - def _load_other_generations_from_json(self, other_generations): + :param doc: The document to be put. + :type doc: CouchDocument """ - Load other generations from stored JSON. + my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + self._add_conflict(doc, my_doc.rev, my_doc.get_json()) + doc.has_conflicts = True + self._put_doc(my_doc, doc) - @param other_generations: A JSON representation of other_generations - as {'replica_uid': ('generation', 'transaction_id'), ...}. - @type other_generations: dict + def resolve_doc(self, doc, conflicted_doc_revs): + """ + Mark a document as no longer conflicted. + + We take the list of revisions that the client knows about that it is + superseding. This may be a different list from the actual current + conflicts, in which case only those are removed as conflicted. This + may fail if the conflict list is significantly different from the + supplied information. (sync could have happened in the background from + the time you GET_DOC_CONFLICTS until the point where you RESOLVE) + + :param doc: A Document with the new content to be inserted. + :type doc: CouchDocument + :param conflicted_doc_revs: A list of revisions that the new content + supersedes. + :type conflicted_doc_revs: [str] """ - self._other_generations = {} - for replica_uid, [gen, trans_id] in other_generations.iteritems(): - self._other_generations[replica_uid] = (gen, trans_id) + cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + new_rev = self._ensure_maximal_rev(cur_doc.rev, + conflicted_doc_revs) + superseded_revs = set(conflicted_doc_revs) + doc.rev = new_rev + if cur_doc.rev in superseded_revs: + self._delete_conflicts(doc, superseded_revs) + self._put_doc(cur_doc, doc) + else: + self._add_conflict(doc, new_rev, doc.get_json()) + self._delete_conflicts(doc, superseded_revs) + # perform request to resolve document in server + resource = self._database.resource( + '_design', 'docs', '_update', 'resolve_doc', doc.doc_id) + conflicts = None + if doc.has_conflicts: + conflicts = binascii.b2a_base64( + json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + )[:-1] # exclude \n + response = resource.put_json( + body={ + 'couch_rev': cur_doc.couch_rev, + 'conflicts': conflicts, + }, + headers={'content-type': 'application/json'}) + + def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, + replica_trans_id=''): + """ + Insert/update document into the database with a given revision. + + This api is used during synchronization operations. + + If a document would conflict and save_conflict is set to True, the + content will be selected as the 'current' content for doc.doc_id, + even though doc.rev doesn't supersede the currently stored revision. + The currently stored document will be added to the list of conflict + alternatives for the given doc_id. + + This forces the new content to be 'current' so that we get convergence + after synchronizing, even if people don't resolve conflicts. Users can + then notice that their content is out of date, update it, and + synchronize again. (The alternative is that users could synchronize and + think the data has propagated, but their local copy looks fine, and the + remote copy is never updated again.) + + :param doc: A document object + :type doc: CouchDocument + :param save_conflict: If this document is a conflict, do you want to + save it as a conflict, or just ignore it. + :type save_conflict: bool + :param replica_uid: A unique replica identifier. + :type replica_uid: str + :param replica_gen: The generation of the replica corresponding to the + this document. The replica arguments are optional, + but are used during synchronization. + :type replica_gen: int + :param replica_trans_id: The transaction_id associated with the + generation. + :type replica_trans_id: str + + :return: (state, at_gen) - If we don't have doc_id already, or if + doc_rev supersedes the existing document revision, then the + content will be inserted, and state is 'inserted'. If + doc_rev is less than or equal to the existing revision, then + the put is ignored and state is respecitvely 'superseded' or + 'converged'. If doc_rev is not strictly superseded or + supersedes, then state is 'conflicted'. The document will not + be inserted if save_conflict is False. For 'inserted' or + 'converged', at_gen is the insertion/current generation. + :rtype: (str, int) + """ + cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + # at this point, `doc` has arrived from the other syncing party, and + # we will decide what to do with it. + # First, we prepare the arriving doc to update couch database. + old_doc = doc + doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) + if cur_doc is not None: + doc.couch_rev = cur_doc.couch_rev + # fetch conflicts because we will eventually manipulate them + doc.ensure_fetch_conflicts(self._get_conflicts) + # from now on, it works just like u1db sqlite backend + doc_vcr = vectorclock.VectorClockRev(doc.rev) + if cur_doc is None: + cur_vcr = vectorclock.VectorClockRev(None) + else: + cur_vcr = vectorclock.VectorClockRev(cur_doc.rev) + self._validate_source(replica_uid, replica_gen, replica_trans_id) + if doc_vcr.is_newer(cur_vcr): + rev = doc.rev + self._prune_conflicts(doc, doc_vcr) + if doc.rev != rev: + # conflicts have been autoresolved + state = 'superseded' + else: + state = 'inserted' + self._put_doc(cur_doc, doc) + elif doc.rev == cur_doc.rev: + # magical convergence + state = 'converged' + elif cur_vcr.is_newer(doc_vcr): + # Don't add this to seen_ids, because we have something newer, + # so we should send it back, and we should not generate a + # conflict + state = 'superseded' + elif cur_doc.same_content_as(doc): + # the documents have been edited to the same thing at both ends + doc_vcr.maximize(cur_vcr) + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._put_doc(cur_doc, doc) + state = 'superseded' + else: + state = 'conflicted' + if save_conflict: + self._force_doc_sync_conflict(doc) + if replica_uid is not None and replica_gen is not None: + self._do_set_replica_gen_and_trans_id( + replica_uid, replica_gen, replica_trans_id) + # update info + old_doc.rev = doc.rev + if doc.is_tombstone(): + old_doc.is_tombstone() + else: + old_doc.content = doc.content + old_doc.has_conflicts = doc.has_conflicts + return state, self._get_generation() -class CouchSyncTarget(ObjectStoreSyncTarget): +class CouchSyncTarget(CommonSyncTarget): """ Functionality for using a CouchDatabase as a synchronization target. """ - pass + + def get_sync_info(self, source_replica_uid): + source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( + source_replica_uid) + my_gen, my_trans_id = self._db._get_generation_info() + return ( + self._db._replica_uid, my_gen, my_trans_id, source_gen, + source_trans_id) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + if self._trace_hook: + self._trace_hook('record_sync_info') + self._db._set_replica_gen_and_trans_id( + source_replica_uid, source_replica_generation, + source_replica_transaction_id) class NotEnoughCouchPermissions(Exception): @@ -527,12 +1004,12 @@ class CouchServerState(ServerState): """ Initialize the couch server state. - @param couch_url: The URL for the couch database. - @type couch_url: str - @param shared_db_name: The name of the shared database. - @type shared_db_name: str - @param tokens_db_name: The name of the tokens database. - @type tokens_db_name: str + :param couch_url: The URL for the couch database. + :type couch_url: str + :param shared_db_name: The name of the shared database. + :type shared_db_name: str + :param tokens_db_name: The name of the tokens database. + :type tokens_db_name: str """ self._couch_url = couch_url self._shared_db_name = shared_db_name @@ -563,12 +1040,12 @@ class CouchServerState(ServerState): couch library to ensure that Soledad Server can do everything it needs on the underlying couch database. - @param couch_url: The URL of the couch database. - @type couch_url: str + :param couch_url: The URL of the couch database. + :type couch_url: str @raise NotEnoughCouchPermissions: Raised in case there are not enough permissions to read/write/create the needed couch databases. - @rtype: bool + :rtype: bool """ def _open_couch_db(dbname): @@ -601,11 +1078,11 @@ class CouchServerState(ServerState): """ Open a couch database. - @param dbname: The name of the database to open. - @type dbname: str + :param dbname: The name of the database to open. + :type dbname: str - @return: The CouchDatabase object. - @rtype: CouchDatabase + :return: The CouchDatabase object. + :rtype: CouchDatabase """ # TODO: open couch return CouchDatabase.open_database( @@ -616,11 +1093,11 @@ class CouchServerState(ServerState): """ Ensure couch database exists. - @param dbname: The name of the database to ensure. - @type dbname: str + :param dbname: The name of the database to ensure. + :type dbname: str - @return: The CouchDatabase object and the replica uid. - @rtype: (CouchDatabase, str) + :return: The CouchDatabase object and the replica uid. + :rtype: (CouchDatabase, str) """ db = CouchDatabase.open_database( self._couch_url + '/' + dbname, @@ -631,8 +1108,8 @@ class CouchServerState(ServerState): """ Delete couch database. - @param dbname: The name of the database to delete. - @type dbname: str + :param dbname: The name of the database to delete. + :type dbname: str """ CouchDatabase.delete_database(self._couch_url + '/' + dbname) @@ -640,8 +1117,8 @@ class CouchServerState(ServerState): """ Set the couchdb URL - @param url: CouchDB URL - @type url: str + :param url: CouchDB URL + :type url: str """ self._couch_url = url @@ -649,7 +1126,7 @@ class CouchServerState(ServerState): """ Return CouchDB URL - @rtype: str + :rtype: str """ return self._couch_url diff --git a/common/src/leap/soledad/common/ddocs/README.txt b/common/src/leap/soledad/common/ddocs/README.txt new file mode 100644 index 00000000..37d89fbf --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/README.txt @@ -0,0 +1,29 @@ +The following table depicts the U1DB CouchDB backend method and the URI that +is queried to obtain/update data from/to the server. + + +----------------------------------+------------------------------------------------------------------+ + | u1db backend method | URI | + |----------------------------------+------------------------------------------------------------------| + | _get_generation | _design/transactions/_list/generation/log | + | _get_generation_info | _design/transactions/_list/generation/log | + | _get_trans_id_for_gen | _design/transactions/_list/trans_id_for_gen/log | + | _get_transaction_log | _design/transactions/_view/log | + | _get_doc (*) | _design/docs/_view/get?key= | + | _has_conflicts | _design/docs/_view/get?key= | + | get_all_docs | _design/docs/_view/get | + | _put_doc | _design/docs/_update/put/ | + | _whats_changed | _design/transactions/_list/whats_changed/log?old_gen= | + | _get_conflicts (*) | _design/docs/_view/conflicts?key= | + | _get_replica_gen_and_trans_id | _design/syncs/_view/log?other_replica_uid= | + | _do_set_replica_gen_and_trans_id | _design/syncs/_update/put/u1db_sync_log | + | _add_conflict | _design/docs/_update/add_conflict/ | + | _delete_conflicts | _design/docs/_update/delete_conflicts/?doc_rev= | + | list_indexes | not implemented | + | _get_index_definition | not implemented | + | delete_index | not implemented | + | _get_indexed_fields | not implemented | + | _put_and_update_indexes | not implemented | + +----------------------------------+------------------------------------------------------------------+ + +(*) These methods also request CouchDB document attachments that store U1DB + document contents. diff --git a/common/src/leap/soledad/common/ddocs/__init__.py b/common/src/leap/soledad/common/ddocs/__init__.py new file mode 100644 index 00000000..c2f78e18 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/__init__.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +CouchDB U1DB backend design documents helper. +""" + + +from os import listdir +from os.path import realpath, dirname, isdir, join, isfile, basename +import json +import logging + + +from couchdb import Document as CouchDocument + + +logger = logging.getLogger(__name__) + + +# where to search for design docs definitions +prefix = dirname(realpath(__file__)) + + +def ensure_ddocs_on_remote_db(db, prefix=prefix): + """ + Ensure that the design documents in C{db} contain. + + :param db: The database in which to create/update the design docs. + :type db: couchdb.client.Server + :param prefix: Where to look for design documents definitions. + :type prefix: str + """ + ddocs = build_ddocs(prefix) + for ddoc_name, ddoc_content in ddocs.iteritems(): + ddoc_id = "_design/%s" % ddoc_name + ddoc = CouchDocument({'_id': ddoc_id}) + ddoc.update(ddoc_content) + # ensure revision if ddoc is already in db + doc = db.get(ddoc_id) + if doc is not None: + ddoc['_rev'] = doc.rev + db.save(ddoc) + + +def create_local_ddocs(prefix=prefix): + """ + Create local design docs based on content from subdirectories in + C{prefix}. + + :param create_local: Whether to create local .json files. + :type create_local: bool + """ + ddocs = build_ddocs(prefix) + for ddoc_name, ddoc_content in ddocs.iteritems(): + with open(join(prefix, '%s.json' % ddoc_name), 'w') as f: + f.write(json.dumps(ddoc_content, indent=4)) + + +def build_ddocs(prefix=prefix): + """ + Build design documents based on content from subdirectories in + C{prefix}. + + :param prefix: Where to look for design documents definitions. + :type prefix: str + + :return: A dictionary containing the design docs definitions. + :rtype: dict + """ + ddocs = {} + # design docs are represented by subdirectories in current directory + for ddoc in [f for f in listdir(prefix) if isdir(join(prefix, f))]: + logger.debug("Building %s.json ..." % ddoc) + + ddocs[ddoc] = {} + + for t in ['views', 'lists', 'updates']: + tdir = join(prefix, ddoc, t) + if not isdir(tdir): + logger.debug(" - no %s" % t) + else: + + ddocs[ddoc][t] = {} + + if t == 'views': # handle views (with map/reduce functions) + for view in [f for f in listdir(tdir) \ + if isdir(join(tdir, f))]: + logger.debug(" - view: %s" % view) + # look for map.js and reduce.js + mapfile = join(tdir, view, 'map.js') + reducefile = join(tdir, view, 'reduce.js') + mapfun = None + reducefun = None + try: + with open(mapfile) as f: + mapfun = f.read() + except IOError: + pass + try: + with open(reducefile) as f: + reducefun = f.read() + except IOError: + pass + ddocs[ddoc]['views'][view] = {} + + if mapfun is not None: + ddocs[ddoc]['views'][view]['map'] = mapfun + if reducefun is not None: + ddocs[ddoc]['views'][view]['reduce'] = reducefun + + else: # handle lists, updates, etc + for fun in [f for f in listdir(tdir) \ + if isfile(join(tdir, f))]: + logger.debug(" - %s: %s" % (t, fun)) + funfile = join(tdir, fun) + funname = basename(funfile).replace('.js', '') + try: + with open(funfile) as f: + ddocs[ddoc][t][funname] = f.read() + except IOError: + pass + return ddocs diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js new file mode 100644 index 00000000..5a4647de --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/updates/put.js @@ -0,0 +1,64 @@ +function(doc, req){ + /* we expect to receive the following in `req.body`: + * { + * 'couch_rev': '', + * 'u1db_rev': '', + * 'content': '', + * 'trans_id': '' + * 'conflicts': '', + * 'update_conflicts': + * } + */ + var body = JSON.parse(req.body); + + // create a new document document + if (!doc) { + doc = {} + doc['_id'] = req['id']; + } + // or fail if couch revisions do not match + else if (doc['_rev'] != body['couch_rev']) { + // of fail if revisions do not match + return [null, 'revision conflict'] + } + + // store u1db rev + doc.u1db_rev = body['u1db_rev']; + + // save content as attachment + if (body['content'] != null) { + // save u1db content as attachment + if (!doc._attachments) + doc._attachments = {}; + doc._attachments.u1db_content = { + content_type: "application/octet-stream", + data: body['content'] // should be base64 encoded + }; + } + // or delete the attachment if document is tombstone + else if (doc._attachments && + doc._attachments.u1db_content) + delete doc._attachments.u1db_content; + + // store the transaction id + if (!doc.u1db_transactions) + doc.u1db_transactions = []; + var d = new Date(); + doc.u1db_transactions.push([d.getTime(), body['trans_id']]); + + // save conflicts as attachment if they were sent + if (body['update_conflicts']) + if (body['conflicts'] != null) { + if (!doc._attachments) + doc._attachments = {}; + doc._attachments.u1db_conflicts = { + content_type: "application/octet-stream", + data: body['conflicts'] // should be base64 encoded + } + } else { + if(doc._attachments && doc._attachments.u1db_conflicts) + delete doc._attachments.u1db_conflicts + } + + return [doc, 'ok']; +} diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js new file mode 100644 index 00000000..7ba66cf8 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js @@ -0,0 +1,39 @@ +function(doc, req){ + /* we expect to receive the following in `req.body`: + * { + * 'couch_rev': '', + * 'conflicts': '', + * } + */ + var body = JSON.parse(req.body); + + // fail if no document was given + if (!doc) { + return [null, 'document does not exist'] + } + + // fail if couch revisions do not match + if (body['couch_rev'] != null + && doc['_rev'] != body['couch_rev']) { + return [null, 'revision conflict'] + } + + // fail if conflicts were not sent + if (body['conflicts'] == null) + return [null, 'missing conflicts'] + + // save conflicts as attachment if they were sent + if (body['conflicts'] != null) { + if (!doc._attachments) + doc._attachments = {}; + doc._attachments.u1db_conflicts = { + content_type: "application/octet-stream", + data: body['conflicts'] // should be base64 encoded + } + } + // or delete attachment if there are no conflicts + else if (doc._attachments && doc._attachments.u1db_conflicts) + delete doc._attachments.u1db_conflicts; + + return [doc, 'ok']; +} diff --git a/common/src/leap/soledad/common/ddocs/docs/views/get/map.js b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js new file mode 100644 index 00000000..ae08d9e9 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js @@ -0,0 +1,20 @@ +function(doc) { + if (doc.u1db_rev) { + var is_tombstone = true; + var has_conflicts = false; + if (doc._attachments) { + if (doc._attachments.u1db_content) + is_tombstone = false; + if (doc._attachments.u1db_conflicts) + has_conflicts = true; + } + emit(doc._id, + { + "couch_rev": doc._rev, + "u1db_rev": doc.u1db_rev, + "is_tombstone": is_tombstone, + "has_conflicts": has_conflicts, + } + ); + } +} diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js new file mode 100644 index 00000000..722f695a --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -0,0 +1,22 @@ +function(doc, req){ + if (!doc) { + doc = {} + doc['_id'] = 'u1db_sync_log'; + doc['syncs'] = []; + } + body = JSON.parse(req.body); + // remove outdated info + doc['syncs'] = doc['syncs'].filter( + function (entry) { + return entry[0] != body['other_replica_uid']; + } + ); + // store u1db rev + doc['syncs'].push([ + body['other_replica_uid'], + body['other_generation'], + body['other_transaction_id'] + ]); + return [doc, 'ok']; +} + diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js new file mode 100644 index 00000000..a63c7cf4 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js @@ -0,0 +1,12 @@ +function(doc) { + if (doc._id == 'u1db_sync_log') { + if (doc.syncs) + doc.syncs.forEach(function (entry) { + emit(entry[0], + { + 'known_generation': entry[1], + 'known_transaction_id': entry[2] + }); + }); + } +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js new file mode 100644 index 00000000..dbdfff0d --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js @@ -0,0 +1,20 @@ +function(head, req) { + var row; + var rows=[]; + // fetch all rows + while(row = getRow()) { + rows.push(row); + } + if (rows.length > 0) + send(JSON.stringify({ + "generation": rows.length, + "doc_id": rows[rows.length-1]['id'], + "transaction_id": rows[rows.length-1]['value'] + })); + else + send(JSON.stringify({ + "generation": 0, + "doc_id": "", + "transaction_id": "", + })); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js new file mode 100644 index 00000000..2ec91794 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js @@ -0,0 +1,19 @@ +function(head, req) { + var row; + var rows=[]; + var i = 1; + var gen = 1; + if (req.query.gen) + gen = parseInt(req.query['gen']); + // fetch all rows + while(row = getRow()) + rows.push(row); + if (gen <= rows.length) + send(JSON.stringify({ + "generation": gen, + "doc_id": rows[gen-1]['id'], + "transaction_id": rows[gen-1]['value'], + })); + else + send('{}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js new file mode 100644 index 00000000..b35cdf51 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js @@ -0,0 +1,22 @@ +function(head, req) { + var row; + var gen = 1; + var old_gen = 0; + if (req.query.old_gen) + old_gen = parseInt(req.query['old_gen']); + send('{"transactions":[\n'); + // fetch all rows + while(row = getRow()) { + if (gen > old_gen) { + if (gen > old_gen+1) + send(',\n'); + send(JSON.stringify({ + "generation": gen, + "doc_id": row["id"], + "transaction_id": row["value"] + })); + } + gen++; + } + send('\n]}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js new file mode 100644 index 00000000..94ef63ca --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js @@ -0,0 +1,7 @@ +function(doc) { + if (doc.u1db_transactions) + doc.u1db_transactions.forEach(function(t) { + emit(t[0], // use timestamp as key so the results are ordered + t[1]); // value is the transaction_id + }); +} diff --git a/common/src/leap/soledad/common/objectstore.py b/common/src/leap/soledad/common/objectstore.py deleted file mode 100644 index 7aff3e32..00000000 --- a/common/src/leap/soledad/common/objectstore.py +++ /dev/null @@ -1,282 +0,0 @@ -# -*- coding: utf-8 -*- -# objectstore.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - - -""" -Abstract U1DB backend to handle storage using object stores (like CouchDB, for -example). - -This backend uses special documents to store all U1DB data (replica uid, -indexes, transaction logs and info about other dbs). The id of these documents -are reserved and have prefix equal to ObjectStore.U1DB_DATA_DOC_ID_PREFIX. - -Right now, this is only used by CouchDatabase backend, but can also be -extended to implement OpenStack or Amazon S3 storage, for example. - -See U1DB documentation for more information on how to use databases. -""" - - -from base64 import b64encode, b64decode - - -import uuid -import simplejson as json - - -from u1db import errors -from u1db.backends.inmemory import ( - InMemoryDatabase, - InMemorySyncTarget, -) - - -class ObjectStoreDatabase(InMemoryDatabase): - """ - A backend for storing u1db data in an object store. - """ - - U1DB_DATA_DOC_ID_PREFIX = 'u1db/' - - @classmethod - def open_database(cls, url, create, document_factory=None): - """ - Open a U1DB database using an object store as backend. - - @param url: the url of the database replica - @type url: str - @param create: should the replica be created if it does not exist? - @type create: bool - @param document_factory: A function that will be called with the same - parameters as Document.__init__. - @type document_factory: callable - - @return: the database instance - @rtype: CouchDatabase - """ - raise NotImplementedError(cls.open_database) - - def __init__(self, replica_uid=None, document_factory=None): - """ - Initialize the object store database. - - @param replica_uid: an optional unique replica identifier - @type replica_uid: str - @param document_factory: A function that will be called with the same - parameters as Document.__init__. - @type document_factory: callable - """ - InMemoryDatabase.__init__( - self, - replica_uid, - document_factory=document_factory) - if self._replica_uid is None: - self._replica_uid = uuid.uuid4().hex - self._init_u1db_data() - - def _init_u1db_data(self): - """ - Initialize u1db configuration data on backend storage. - - A U1DB database needs to keep track of all database transactions, - document conflicts, the generation of other replicas it has seen, - indexes created by users and so on. - - In this implementation, all this information is stored in special - documents stored in the couch db with id prefix equal to - U1DB_DATA_DOC_ID_PREFIX. Those documents ids are reserved: - put_doc(), get_doc() and delete_doc() will not allow documents with - a doc_id with that prefix to be accessed or modified. - """ - raise NotImplementedError(self._init_u1db_data) - - #------------------------------------------------------------------------- - # methods from Database - #------------------------------------------------------------------------- - - def put_doc(self, doc): - """ - Update a document. - - If the document currently has conflicts, put will fail. - If the database specifies a maximum document size and the document - exceeds it, put will fail and raise a DocumentTooBig exception. - - This method prevents from updating the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc: A Document with new content. - @type doc: Document - - @return: new_doc_rev - The new revision identifier for the document. - The Document object will also be updated. - @rtype: str - """ - if doc.doc_id is not None and \ - doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - return InMemoryDatabase.put_doc(self, doc) - - def _put_doc(self, doc): - """ - Update a document. - - This is called everytime we just want to do a raw put on the db (i.e. - without index updates, document constraint checks, and conflict - checks). - - @param doc: The document to update. - @type doc: u1db.Document - - @return: The new revision identifier for the document. - @rtype: str - """ - raise NotImplementedError(self._put_doc) - - def get_doc(self, doc_id, include_deleted=False): - """ - Get the JSON string for the given document. - - This method prevents from getting the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise asking for a deleted - document will return None. - @type include_deleted: bool - - @return: a Document object. - @rtype: Document - """ - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - return InMemoryDatabase.get_doc(self, doc_id, include_deleted) - - def _get_doc(self, doc_id): - """ - Get just the document content, without fancy handling. - - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise asking for a deleted - document will return None. - @type include_deleted: bool - - @return: a Document object. - @type: u1db.Document - """ - raise NotImplementedError(self._get_doc) - - def get_all_docs(self, include_deleted=False): - """ - Get the JSON content for all documents in the database. - - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise deleted documents will not - be included in the results. - @type include_deleted: bool - - @return: (generation, [Document]) - The current generation of the database, followed by a list of all - the documents in the database. - @rtype: tuple - """ - generation = self._get_generation() - results = [] - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue - doc = self._get_doc(doc_id, check_for_conflicts=True) - if doc.content is None and not include_deleted: - continue - results.append(doc) - return (generation, results) - - def delete_doc(self, doc): - """ - Mark a document as deleted. - - This method prevents from deleting the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc: The document to mark as deleted. - @type doc: u1db.Document - - @return: The new revision id of the document. - @type: str - """ - if doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) - if old_doc is None: - raise errors.DocumentDoesNotExist - if old_doc.rev != doc.rev: - raise errors.RevisionConflict() - if old_doc.is_tombstone(): - raise errors.DocumentAlreadyDeleted - if old_doc.has_conflicts: - raise errors.ConflictedDoc() - new_rev = self._allocate_doc_rev(doc.rev) - doc.rev = new_rev - doc.make_tombstone() - self._put_and_update_indexes(old_doc, doc) - return new_rev - - # index-related methods - - def create_index(self, index_name, *index_expressions): - """ - Create a named index, which can then be queried for future lookups. - - See U1DB documentation for more information. - - @param index_name: A unique name which can be used as a key prefix. - @param index_expressions: Index expressions defining the index - information. - """ - raise NotImplementedError(self.create_index) - - #------------------------------------------------------------------------- - # implemented methods from CommonBackend - #------------------------------------------------------------------------- - - def _put_and_update_indexes(self, old_doc, doc): - """ - Update a document and all indexes related to it. - - @param old_doc: The old version of the document. - @type old_doc: u1db.Document - @param doc: The new version of the document. - @type doc: u1db.Document - """ - for index in self._indexes.itervalues(): - if old_doc is not None and not old_doc.is_tombstone(): - index.remove_json(old_doc.doc_id, old_doc.get_json()) - if not doc.is_tombstone(): - index.add_json(doc.doc_id, doc.get_json()) - trans_id = self._allocate_transaction_id() - self._put_doc(doc) - self._transaction_log.append((doc.doc_id, trans_id)) - - -class ObjectStoreSyncTarget(InMemorySyncTarget): - """ - Functionality for using an ObjectStore as a synchronization target. - """ diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 42edf9fe..bdef4e0d 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -27,7 +27,6 @@ from base64 import b64decode from leap.common.files import mkdir_p -from leap.soledad.common.document import SoledadDocument from leap.soledad.common.tests import u1db_tests as tests from leap.soledad.common.tests.u1db_tests import test_backends from leap.soledad.common.tests.u1db_tests import test_sync @@ -188,7 +187,7 @@ def copy_couch_database_for_test(test, db): def make_document_for_test(test, doc_id, rev, content, has_conflicts=False): - return SoledadDocument(doc_id, rev, content, has_conflicts=has_conflicts) + return couch.CouchDocument(doc_id, rev, content, has_conflicts=has_conflicts) COUCH_SCENARIOS = [ -- cgit v1.2.3 From 69762784c41f9e231260d1e790a4a5c05bf6de96 Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 16 Dec 2013 10:56:57 -0200 Subject: Monkey-patch u1db to use CouchDocument. --- common/src/leap/soledad/common/couch.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 3b0e042a..4caaf48f 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -29,6 +29,7 @@ import socket from u1db.backends import CommonBackend, CommonSyncTarget +from u1db.remote import http_app from u1db.remote.server_state import ServerState from u1db import ( errors, @@ -159,6 +160,10 @@ class CouchDocument(SoledadDocument): couch_rev = property(_get_couch_rev, _set_couch_rev) +# monkey-patch the u1db http app to use CouchDocument +http_app.Document = CouchDocument + + class CouchDatabase(CommonBackend): """ A U1DB implementation that uses CouchDB as its persistence layer. -- cgit v1.2.3 From ab93164a3a9532340a899e0b23f4b541d61c22c6 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 10 Dec 2013 18:53:54 -0200 Subject: Fix couch tests to reflect remodelling. --- common/src/leap/soledad/common/couch.py | 43 +--- common/src/leap/soledad/common/ddocs/__init__.py | 4 +- .../leap/soledad/common/tests/couchdb.ini.template | 4 +- common/src/leap/soledad/common/tests/test_couch.py | 240 +++++++++++---------- 4 files changed, 141 insertions(+), 150 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 4caaf48f..b9e699f3 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -18,35 +18,25 @@ """A U1DB backend that uses CouchDB as its persistence layer.""" -import os import simplejson as json -import sys -import time +import re import uuid import logging import binascii import socket +from couchdb.client import Server +from couchdb.http import ResourceNotFound, Unauthorized +from u1db import errors, query_parser, vectorclock from u1db.backends import CommonBackend, CommonSyncTarget from u1db.remote import http_app from u1db.remote.server_state import ServerState -from u1db import ( - errors, - query_parser, - vectorclock, -) -from couchdb.client import ( - Server, - Document as CouchDocument, - _doc_resource, -) -from couchdb.http import ResourceNotFound, Unauthorized from leap.soledad.common import USER_DB_PREFIX from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.ddocs import ensure_ddocs_on_remote_db +from leap.soledad.common.ddocs import ensure_ddocs_on_db logger = logging.getLogger(__name__) @@ -193,11 +183,11 @@ class CouchDatabase(CommonBackend): server[dbname] except ResourceNotFound: if not create: - raise DatabaseDoesNotExist() + raise errors.DatabaseDoesNotExist() return cls(url, dbname) def __init__(self, url, dbname, replica_uid=None, full_commit=True, - session=None): + session=None, ensure_ddocs=False): """ Create a new Couch data container. @@ -230,7 +220,9 @@ class CouchDatabase(CommonBackend): except ResourceNotFound: self._server.create(self._dbname) self._database = self._server[self._dbname] - self._initialize(replica_uid or uuid.uuid4().hex) + self._set_replica_uid(replica_uid or uuid.uuid4().hex) + if ensure_ddocs: + ensure_ddocs_on_db(self._database) def get_sync_target(self): """ @@ -270,25 +262,12 @@ class CouchDatabase(CommonBackend): """ try: doc = self._database['u1db_config'] + doc['replica_uid'] = replica_uid except ResourceNotFound: doc = { '_id': 'u1db_config', 'replica_uid': replica_uid, } - self._database.save(doc) - - def _ensure_design_docs(self): - """ - Ensure that the design docs have been created. - """ - if self._is_initialized(): - return - self._initialize() - - def _set_replica_uid(self, replica_uid): - """Force the replica_uid to be set.""" - doc = self._database['u1db_config'] - doc['replica_uid'] = replica_uid self._database.save(doc) self._real_replica_uid = replica_uid diff --git a/common/src/leap/soledad/common/ddocs/__init__.py b/common/src/leap/soledad/common/ddocs/__init__.py index c2f78e18..389bdff9 100644 --- a/common/src/leap/soledad/common/ddocs/__init__.py +++ b/common/src/leap/soledad/common/ddocs/__init__.py @@ -37,7 +37,7 @@ logger = logging.getLogger(__name__) prefix = dirname(realpath(__file__)) -def ensure_ddocs_on_remote_db(db, prefix=prefix): +def ensure_ddocs_on_db(db, prefix=prefix): """ Ensure that the design documents in C{db} contain. @@ -118,7 +118,7 @@ def build_ddocs(prefix=prefix): except IOError: pass ddocs[ddoc]['views'][view] = {} - + if mapfun is not None: ddocs[ddoc]['views'][view]['map'] = mapfun if reducefun is not None: diff --git a/common/src/leap/soledad/common/tests/couchdb.ini.template b/common/src/leap/soledad/common/tests/couchdb.ini.template index 7d0316f0..217ae201 100644 --- a/common/src/leap/soledad/common/tests/couchdb.ini.template +++ b/common/src/leap/soledad/common/tests/couchdb.ini.template @@ -74,6 +74,8 @@ use_users_db = false [query_servers] ; javascript = %(tempdir)s/server/main.js +javascript = /usr/bin/couchjs /usr/share/couchdb/server/main.js +coffeescript = /usr/bin/couchjs /usr/share/couchdb/server/main-coffee.js ; Changing reduce_limit to false will disable reduce_limit. @@ -219,4 +221,4 @@ min_file_size = 131072 ;[admins] ;testuser = -hashed-f50a252c12615697c5ed24ec5cd56b05d66fe91e,b05471ba260132953930cf9f97f327f5 -; pass for above user is 'testpass' \ No newline at end of file +; pass for above user is 'testpass' diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index bdef4e0d..a181c6cb 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -25,6 +25,9 @@ import copy import shutil from base64 import b64decode +from couchdb.client import Server +from u1db import errors + from leap.common.files import mkdir_p from leap.soledad.common.tests import u1db_tests as tests @@ -147,7 +150,7 @@ class TestCouchBackendImpl(CouchDBTestCase): def test__allocate_doc_id(self): db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') + 'u1db_tests', ensure_ddocs=True) doc_id1 = db._allocate_doc_id() self.assertTrue(doc_id1.startswith('D-')) self.assertEqual(34, len(doc_id1)) @@ -162,32 +165,51 @@ class TestCouchBackendImpl(CouchDBTestCase): def make_couch_database_for_test(test, replica_uid): port = str(test.wrapper.port) return couch.CouchDatabase('http://localhost:' + port, replica_uid, - replica_uid=replica_uid or 'test') + replica_uid=replica_uid or 'test', + ensure_ddocs=True) def copy_couch_database_for_test(test, db): port = str(test.wrapper.port) - new_db = couch.CouchDatabase('http://localhost:' + port, - db._replica_uid + '_copy', + couch_url = 'http://localhost:' + port + new_dbname = db._replica_uid + '_copy' + new_db = couch.CouchDatabase(couch_url, + new_dbname, replica_uid=db._replica_uid or 'test') - gen, docs = db.get_all_docs(include_deleted=True) - for doc in docs: - new_db._put_doc(doc) - new_db._transaction_log = copy.deepcopy(db._transaction_log) - new_db._conflicts = copy.deepcopy(db._conflicts) - new_db._other_generations = copy.deepcopy(db._other_generations) - new_db._indexes = copy.deepcopy(db._indexes) - # save u1db data on couch - for key in new_db.U1DB_DATA_KEYS: - doc_id = '%s%s' % (new_db.U1DB_DATA_DOC_ID_PREFIX, key) - doc = new_db._get_doc(doc_id) - doc.content = {'content': getattr(new_db, key)} - new_db._put_doc(doc) + # copy all docs + old_couch_db = Server(couch_url)[db._replica_uid] + new_couch_db = Server(couch_url)[new_dbname] + for doc_id in old_couch_db: + doc = old_couch_db.get(doc_id) + # copy design docs + if ('u1db_rev' not in doc): + new_couch_db.save(doc) + # copy u1db docs + else: + new_doc = { + '_id': doc['_id'], + 'u1db_transactions': doc['u1db_transactions'], + 'u1db_rev': doc['u1db_rev'] + } + attachments = [] + if ('u1db_conflicts' in doc): + new_doc['u1db_conflicts'] = doc['u1db_conflicts'] + for c_rev in doc['u1db_conflicts']: + attachments.append('u1db_conflict_%s' % c_rev) + new_couch_db.save(new_doc) + # save conflict data + attachments.append('u1db_content') + for att_name in attachments: + att = old_couch_db.get_attachment(doc_id, att_name) + if (att is not None): + new_couch_db.put_attachment(new_doc, att, + filename=att_name) return new_db def make_document_for_test(test, doc_id, rev, content, has_conflicts=False): - return couch.CouchDocument(doc_id, rev, content, has_conflicts=has_conflicts) + return couch.CouchDocument( + doc_id, rev, content, has_conflicts=has_conflicts) COUCH_SCENARIOS = [ @@ -245,9 +267,8 @@ class CouchWithConflictsTests( test_backends.LocalDatabaseWithConflictsTests.tearDown(self) -# Notice: the CouchDB backend is currently used for storing encrypted data in -# the server, so indexing makes no sense. Thus, we ignore index testing for -# now. +# Notice: the CouchDB backend does not have indexing capabilities, but we +# added in memory indexing for tests only. class CouchIndexTests(test_backends.DatabaseIndexTests, CouchDBTestCase): @@ -310,6 +331,89 @@ class CouchDatabaseSyncTargetTests(test_sync.DatabaseSyncTargetTests, [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]}) +# The following tests need that the database have an index, so we fake one. +old_class = couch.CouchDatabase + +from u1db.backends.inmemory import InMemoryIndex + + +class IndexedCouchDatabase(couch.CouchDatabase): + + def __init__(self, url, dbname, replica_uid=None, full_commit=True, + session=None, ensure_ddocs=True): + old_class.__init__(self, url, dbname, replica_uid, full_commit, + session, ensure_ddocs=True) + self._indexes = {} + + def _put_doc(self, old_doc, doc): + for index in self._indexes.itervalues(): + if old_doc is not None and not old_doc.is_tombstone(): + index.remove_json(old_doc.doc_id, old_doc.get_json()) + if not doc.is_tombstone(): + index.add_json(doc.doc_id, doc.get_json()) + old_class._put_doc(self, old_doc, doc) + + def create_index(self, index_name, *index_expressions): + if index_name in self._indexes: + if self._indexes[index_name]._definition == list( + index_expressions): + return + raise errors.IndexNameTakenError + index = InMemoryIndex(index_name, list(index_expressions)) + _, all_docs = self.get_all_docs() + for doc in all_docs: + index.add_json(doc.doc_id, doc.get_json()) + self._indexes[index_name] = index + + def delete_index(self, index_name): + del self._indexes[index_name] + + def list_indexes(self): + definitions = [] + for idx in self._indexes.itervalues(): + definitions.append((idx._name, idx._definition)) + return definitions + + def get_from_index(self, index_name, *key_values): + try: + index = self._indexes[index_name] + except KeyError: + raise errors.IndexDoesNotExist + doc_ids = index.lookup(key_values) + result = [] + for doc_id in doc_ids: + result.append(self._get_doc(doc_id, check_for_conflicts=True)) + return result + + def get_range_from_index(self, index_name, start_value=None, + end_value=None): + """Return all documents with key values in the specified range.""" + try: + index = self._indexes[index_name] + except KeyError: + raise errors.IndexDoesNotExist + if isinstance(start_value, basestring): + start_value = (start_value,) + if isinstance(end_value, basestring): + end_value = (end_value,) + doc_ids = index.lookup_range(start_value, end_value) + result = [] + for doc_id in doc_ids: + result.append(self._get_doc(doc_id, check_for_conflicts=True)) + return result + + def get_index_keys(self, index_name): + try: + index = self._indexes[index_name] + except KeyError: + raise errors.IndexDoesNotExist + keys = index.keys() + # XXX inefficiency warning + return list(set([tuple(key.split('\x01')) for key in keys])) + + +couch.CouchDatabase = IndexedCouchDatabase + sync_scenarios = [] for name, scenario in COUCH_SCENARIOS: scenario = dict(scenario) @@ -343,98 +447,4 @@ class CouchDatabaseSyncTests(test_sync.DatabaseSyncTests, CouchDBTestCase): test_sync.DatabaseSyncTests.tearDown(self) -#----------------------------------------------------------------------------- -# The following tests test extra functionality introduced by our backends -#----------------------------------------------------------------------------- - -class CouchDatabaseStorageTests(CouchDBTestCase): - - def _listify(self, l): - if type(l) is dict: - return { - self._listify(a): self._listify(b) for a, b in l.iteritems()} - if hasattr(l, '__iter__'): - return [self._listify(i) for i in l] - return l - - def _fetch_u1db_data(self, db, key): - doc = db._get_doc("%s%s" % (db.U1DB_DATA_DOC_ID_PREFIX, key)) - return doc.content['content'] - - def test_transaction_log_storage_after_put(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - db.create_doc({'simple': 'doc'}) - content = self._fetch_u1db_data(db, db.U1DB_TRANSACTION_LOG_KEY) - self.assertEqual( - self._listify(db._transaction_log), - self._listify(content)) - - def test_conflict_log_storage_after_put_if_newer(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - doc = db.create_doc({'simple': 'doc'}) - doc.set_json(nested_doc) - doc.rev = db._replica_uid + ':2' - db._force_doc_sync_conflict(doc) - content = self._fetch_u1db_data(db, db.U1DB_CONFLICTS_KEY) - self.assertEqual( - self._listify(db._conflicts), - self._listify(content)) - - def test_other_gens_storage_after_set(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - doc = db.create_doc({'simple': 'doc'}) - db._set_replica_gen_and_trans_id('a', 'b', 'c') - content = self._fetch_u1db_data(db, db.U1DB_OTHER_GENERATIONS_KEY) - self.assertEqual( - self._listify(db._other_generations), - self._listify(content)) - - def test_index_storage_after_create(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - doc = db.create_doc({'name': 'john'}) - db.create_index('myindex', 'name') - content = self._fetch_u1db_data(db, db.U1DB_INDEXES_KEY) - myind = db._indexes['myindex'] - index = { - 'myindex': { - 'definition': myind._definition, - 'name': myind._name, - 'values': myind._values, - } - } - self.assertEqual( - self._listify(index), - self._listify(content)) - - def test_index_storage_after_delete(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - doc = db.create_doc({'name': 'john'}) - db.create_index('myindex', 'name') - db.create_index('myindex2', 'name') - db.delete_index('myindex') - content = self._fetch_u1db_data(db, db.U1DB_INDEXES_KEY) - myind = db._indexes['myindex2'] - index = { - 'myindex2': { - 'definition': myind._definition, - 'name': myind._name, - 'values': myind._values, - } - } - self.assertEqual( - self._listify(index), - self._listify(content)) - - def test_replica_uid_storage_after_db_creation(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - content = self._fetch_u1db_data(db, db.U1DB_REPLICA_UID_KEY) - self.assertEqual(db._replica_uid, content) - - load_tests = tests.load_with_scenarios -- cgit v1.2.3 From af913a77b384a97d857c37ba9c0a8730b0846373 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 10 Dec 2013 22:24:47 -0200 Subject: Remove index tests for couch backend. --- common/src/leap/soledad/common/tests/test_couch.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index a181c6cb..48b3585f 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -267,16 +267,16 @@ class CouchWithConflictsTests( test_backends.LocalDatabaseWithConflictsTests.tearDown(self) -# Notice: the CouchDB backend does not have indexing capabilities, but we -# added in memory indexing for tests only. +# Notice: the CouchDB backend does not have indexing capabilities, so we do +# not test indexing now. -class CouchIndexTests(test_backends.DatabaseIndexTests, CouchDBTestCase): - - scenarios = COUCH_SCENARIOS - - def tearDown(self): - self.db.delete_database() - test_backends.DatabaseIndexTests.tearDown(self) +#class CouchIndexTests(test_backends.DatabaseIndexTests, CouchDBTestCase): +# +# scenarios = COUCH_SCENARIOS +# +# def tearDown(self): +# self.db.delete_database() +# test_backends.DatabaseIndexTests.tearDown(self) #----------------------------------------------------------------------------- -- cgit v1.2.3 From 96cf0af3880aa9bec8fcad422526297ef7f40964 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 11 Dec 2013 15:32:42 -0200 Subject: Add 'ddocs.py' generation on setup.py. --- common/src/leap/soledad/common/.gitignore | 1 + common/src/leap/soledad/common/couch.py | 42 +++++-- common/src/leap/soledad/common/ddocs/README.txt | 5 + common/src/leap/soledad/common/ddocs/__init__.py | 138 ----------------------- 4 files changed, 40 insertions(+), 146 deletions(-) create mode 100644 common/src/leap/soledad/common/.gitignore delete mode 100644 common/src/leap/soledad/common/ddocs/__init__.py (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/.gitignore b/common/src/leap/soledad/common/.gitignore new file mode 100644 index 00000000..3378c78a --- /dev/null +++ b/common/src/leap/soledad/common/.gitignore @@ -0,0 +1 @@ +ddocs.py diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index b9e699f3..d2414477 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -34,9 +34,8 @@ from u1db.remote import http_app from u1db.remote.server_state import ServerState -from leap.soledad.common import USER_DB_PREFIX +from leap.soledad.common import USER_DB_PREFIX, ddocs from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.ddocs import ensure_ddocs_on_db logger = logging.getLogger(__name__) @@ -187,7 +186,7 @@ class CouchDatabase(CommonBackend): return cls(url, dbname) def __init__(self, url, dbname, replica_uid=None, full_commit=True, - session=None, ensure_ddocs=False): + session=None, ensure_ddocs=True): """ Create a new Couch data container. @@ -220,9 +219,27 @@ class CouchDatabase(CommonBackend): except ResourceNotFound: self._server.create(self._dbname) self._database = self._server[self._dbname] - self._set_replica_uid(replica_uid or uuid.uuid4().hex) + if replica_uid is not None: + self._set_replica_uid(replica_uid) if ensure_ddocs: - ensure_ddocs_on_db(self._database) + self.ensure_ddocs_on_db() + + def ensure_ddocs_on_db(self): + """ + Ensure that the design documents used by the backend exist on the + couch database. + """ + # we check for existence of one of the files, and put all of them if + # that one does not exist + try: + self._database['_design/docs'] + return + except ResourceNotFound: + for ddoc_name in ['docs', 'syncs', 'transactions']: + ddoc = json.loads( + binascii.a2b_base64( + getattr(ddocs, ddoc_name))) + self._database.save(ddoc) def get_sync_target(self): """ @@ -261,9 +278,11 @@ class CouchDatabase(CommonBackend): :type replica_uid: str """ try: + # set on existent config document doc = self._database['u1db_config'] doc['replica_uid'] = replica_uid except ResourceNotFound: + # or create the config document doc = { '_id': 'u1db_config', 'replica_uid': replica_uid, @@ -280,9 +299,16 @@ class CouchDatabase(CommonBackend): """ if self._real_replica_uid is not None: return self._real_replica_uid - doc = self._database['u1db_config'] - self._real_replica_uid = doc['replica_uid'] - return self._real_replica_uid + try: + # grab replica_uid from server + doc = self._database['u1db_config'] + self._real_replica_uid = doc['replica_uid'] + return self._real_replica_uid + except ResourceNotFound: + # create a unique replica_uid + self._real_replica_uid = uuid.uuid4().hex + self._set_replica_uid(self._real_replica_uid) + return self._real_replica_uid _replica_uid = property(_get_replica_uid, _set_replica_uid) diff --git a/common/src/leap/soledad/common/ddocs/README.txt b/common/src/leap/soledad/common/ddocs/README.txt index 37d89fbf..5569d929 100644 --- a/common/src/leap/soledad/common/ddocs/README.txt +++ b/common/src/leap/soledad/common/ddocs/README.txt @@ -1,3 +1,8 @@ +This directory holds a folder structure containing javascript files that +represent the design documents needed by the CouchDB U1DB backend. These files +are compiled into the `../ddocs.py` file by setuptools when creating the +source distribution. + The following table depicts the U1DB CouchDB backend method and the URI that is queried to obtain/update data from/to the server. diff --git a/common/src/leap/soledad/common/ddocs/__init__.py b/common/src/leap/soledad/common/ddocs/__init__.py deleted file mode 100644 index 389bdff9..00000000 --- a/common/src/leap/soledad/common/ddocs/__init__.py +++ /dev/null @@ -1,138 +0,0 @@ -# -*- coding: utf-8 -*- -# __init__.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - - -""" -CouchDB U1DB backend design documents helper. -""" - - -from os import listdir -from os.path import realpath, dirname, isdir, join, isfile, basename -import json -import logging - - -from couchdb import Document as CouchDocument - - -logger = logging.getLogger(__name__) - - -# where to search for design docs definitions -prefix = dirname(realpath(__file__)) - - -def ensure_ddocs_on_db(db, prefix=prefix): - """ - Ensure that the design documents in C{db} contain. - - :param db: The database in which to create/update the design docs. - :type db: couchdb.client.Server - :param prefix: Where to look for design documents definitions. - :type prefix: str - """ - ddocs = build_ddocs(prefix) - for ddoc_name, ddoc_content in ddocs.iteritems(): - ddoc_id = "_design/%s" % ddoc_name - ddoc = CouchDocument({'_id': ddoc_id}) - ddoc.update(ddoc_content) - # ensure revision if ddoc is already in db - doc = db.get(ddoc_id) - if doc is not None: - ddoc['_rev'] = doc.rev - db.save(ddoc) - - -def create_local_ddocs(prefix=prefix): - """ - Create local design docs based on content from subdirectories in - C{prefix}. - - :param create_local: Whether to create local .json files. - :type create_local: bool - """ - ddocs = build_ddocs(prefix) - for ddoc_name, ddoc_content in ddocs.iteritems(): - with open(join(prefix, '%s.json' % ddoc_name), 'w') as f: - f.write(json.dumps(ddoc_content, indent=4)) - - -def build_ddocs(prefix=prefix): - """ - Build design documents based on content from subdirectories in - C{prefix}. - - :param prefix: Where to look for design documents definitions. - :type prefix: str - - :return: A dictionary containing the design docs definitions. - :rtype: dict - """ - ddocs = {} - # design docs are represented by subdirectories in current directory - for ddoc in [f for f in listdir(prefix) if isdir(join(prefix, f))]: - logger.debug("Building %s.json ..." % ddoc) - - ddocs[ddoc] = {} - - for t in ['views', 'lists', 'updates']: - tdir = join(prefix, ddoc, t) - if not isdir(tdir): - logger.debug(" - no %s" % t) - else: - - ddocs[ddoc][t] = {} - - if t == 'views': # handle views (with map/reduce functions) - for view in [f for f in listdir(tdir) \ - if isdir(join(tdir, f))]: - logger.debug(" - view: %s" % view) - # look for map.js and reduce.js - mapfile = join(tdir, view, 'map.js') - reducefile = join(tdir, view, 'reduce.js') - mapfun = None - reducefun = None - try: - with open(mapfile) as f: - mapfun = f.read() - except IOError: - pass - try: - with open(reducefile) as f: - reducefun = f.read() - except IOError: - pass - ddocs[ddoc]['views'][view] = {} - - if mapfun is not None: - ddocs[ddoc]['views'][view]['map'] = mapfun - if reducefun is not None: - ddocs[ddoc]['views'][view]['reduce'] = reducefun - - else: # handle lists, updates, etc - for fun in [f for f in listdir(tdir) \ - if isfile(join(tdir, f))]: - logger.debug(" - %s: %s" % (t, fun)) - funfile = join(tdir, fun) - funname = basename(funfile).replace('.js', '') - try: - with open(funfile) as f: - ddocs[ddoc][t][funname] = f.read() - except IOError: - pass - return ddocs -- cgit v1.2.3 From 69c9e8776b3c30af74440482b8d1e37945e0d2b3 Mon Sep 17 00:00:00 2001 From: drebs Date: Sun, 15 Dec 2013 14:40:48 -0200 Subject: Add changes file. --- common/src/leap/soledad/common/tests/test_couch.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 48b3585f..72346333 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -223,8 +223,22 @@ class CouchTests(test_backends.AllDatabaseTests, CouchDBTestCase): scenarios = COUCH_SCENARIOS + def setUp(self): + test_backends.AllDatabaseTests.setUp(self) + # save db info because of test_close + self._server = self.db._server + self._dbname = self.db._dbname + def tearDown(self): - self.db.delete_database() + # if current test is `test_close` we have to use saved objects to + # delete the database because the close() method will have removed the + # references needed to do it using the CouchDatabase. + if self.id() == \ + 'leap.soledad.common.tests.test_couch.CouchTests.' \ + 'test_close(couch)': + del(self._server[self._dbname]) + else: + self.db.delete_database() test_backends.AllDatabaseTests.tearDown(self) -- cgit v1.2.3 From ebb113849f26e91803f5c2017a8576fbd2de7033 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 19 Dec 2013 11:42:31 -0200 Subject: Add couch atomicity tests. --- .../tests/test_couch_operations_atomicity.py | 339 +++++++++++++++++++++ 1 file changed, 339 insertions(+) create mode 100644 common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py new file mode 100644 index 00000000..a0c473b1 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -0,0 +1,339 @@ +# -*- coding: utf-8 -*- +# test_soledad.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +""" + +import os +import mock +import tempfile +import threading + +from leap.soledad.client import Soledad +from leap.soledad.common.couch import CouchDatabase, CouchServerState +from leap.soledad.common.tests.test_couch import CouchDBTestCase +from leap.soledad.common.tests.u1db_tests import TestCaseWithServer +from leap.soledad.common.tests.test_target import ( + make_token_soledad_app, + make_leap_document_for_test, + token_leap_sync_target, +) + + +REPEAT_TIMES = 20 + + +class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): + + @staticmethod + def make_app_after_state(state): + return make_token_soledad_app(state) + + make_document_for_test = make_leap_document_for_test + + sync_target = token_leap_sync_target + + def _soledad_instance(self, user='user-uuid', passphrase=u'123', + prefix='', + secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME, + local_db_path='soledad.u1db', server_url='', + cert_file=None, auth_token=None, secret_id=None): + """ + Instantiate Soledad. + """ + + # this callback ensures we save a document which is sent to the shared + # db. + def _put_doc_side_effect(doc): + self._doc_put = doc + + # we need a mocked shared db or else Soledad will try to access the + # network to find if there are uploaded secrets. + class MockSharedDB(object): + + get_doc = mock.Mock(return_value=None) + put_doc = mock.Mock(side_effect=_put_doc_side_effect) + lock = mock.Mock(return_value=('atoken', 300)) + unlock = mock.Mock() + + def __call__(self): + return self + + Soledad._shared_db = MockSharedDB() + return Soledad( + user, + passphrase, + secrets_path=os.path.join(self.tempdir, prefix, secrets_path), + local_db_path=os.path.join( + self.tempdir, prefix, local_db_path), + server_url=server_url, + cert_file=cert_file, + auth_token=auth_token, + secret_id=secret_id) + + def make_app(self): + self.request_state = CouchServerState(self._couch_url, 'shared', + 'tokens') + return self.make_app_after_state(self.request_state) + + def setUp(self): + TestCaseWithServer.setUp(self) + CouchDBTestCase.setUp(self) + self._couch_url = 'http://localhost:' + str(self.wrapper.port) + self.db = CouchDatabase( + self._couch_url, 'user-user-uuid', replica_uid='replica') + self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") + + def tearDown(self): + CouchDBTestCase.tearDown(self) + TestCaseWithServer.tearDown(self) + + # + # Sequential tests + # + + def test_correct_transaction_log_after_sequential_puts(self): + """ + Assert that the transaction_log increases accordingly with sequential + puts. + """ + doc = self.db.create_doc({'ops': 0}) + ops = 1 + docs = [doc.doc_id] + for i in range(0, REPEAT_TIMES): + self.assertEqual( + i+1, len(self.db._get_transaction_log())) + doc.content['ops'] += 1 + self.db.put_doc(doc) + docs.append(doc.doc_id) + + # assert length of transaction_log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + REPEAT_TIMES+1, len(transaction_log)) + + # assert that all entries in the log belong to the same doc + self.assertEqual(REPEAT_TIMES+1, len(docs)) + for doc_id in docs: + self.assertEqual( + REPEAT_TIMES+1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_transaction_log_after_sequential_deletes(self): + """ + Assert that the transaction_log increases accordingly with sequential + puts and deletes. + """ + docs = [] + for i in range(0, REPEAT_TIMES): + doc = self.db.create_doc({'ops': 0}) + self.assertEqual( + 2*i+1, len(self.db._get_transaction_log())) + docs.append(doc.doc_id) + self.db.delete_doc(doc) + self.assertEqual( + 2*i+2, len(self.db._get_transaction_log())) + + # assert length of transaction_log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + 2*REPEAT_TIMES, len(transaction_log)) + + # assert that each doc appears twice in the transaction_log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 2, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_sync_log_after_sequential_syncs(self): + """ + Assert that the sync_log increases accordingly with sequential syncs. + """ + self.startServer() + sol = self._soledad_instance( + auth_token='auth-token', + server_url=self.getURL()) + + def _create_docs_and_sync(sol, syncs): + # create a lot of documents + for i in range(0, REPEAT_TIMES): + sol.create_doc({}) + # assert sizes of transaction and sync logs + self.assertEqual( + syncs*REPEAT_TIMES, + len(self.db._get_transaction_log())) + self.assertEqual( + 1 if syncs > 0 else 0, + len(self.db._database.view('syncs/log').rows)) + # sync to the remote db + sol.sync() + gen, docs = self.db.get_all_docs() + self.assertEqual((syncs+1)*REPEAT_TIMES, gen) + self.assertEqual((syncs+1)*REPEAT_TIMES, len(docs)) + # assert sizes of transaction and sync logs + self.assertEqual((syncs+1)*REPEAT_TIMES, + len(self.db._get_transaction_log())) + sync_log_rows = self.db._database.view('syncs/log').rows + sync_log = sync_log_rows[0].value + replica_uid = sync_log_rows[0].key + known_gen = sync_log['known_generation'] + known_trans_id = sync_log['known_transaction_id'] + # assert sync_log has exactly 1 row + self.assertEqual(1, len(sync_log_rows)) + # assert it has the correct replica_uid, gen and trans_id + self.assertEqual(sol._db._replica_uid, replica_uid) + sol_gen, sol_trans_id = sol._db._get_generation_info() + self.assertEqual(sol_gen, known_gen) + self.assertEqual(sol_trans_id, known_trans_id) + + _create_docs_and_sync(sol, 0) + _create_docs_and_sync(sol, 1) + + # + # Concurrency tests + # + + class _WorkerThread(threading.Thread): + + def __init__(self, params, run_method): + threading.Thread.__init__(self) + self._params = params + self._run_method = run_method + + def run(self): + self._run_method(self) + + def test_correct_transaction_log_after_concurrent_puts(self): + """ + Assert that the transaction_log increases accordingly with concurrent + puts. + """ + pool = threading.BoundedSemaphore(value=1) + threads = [] + docs = [] + + def _run_method(self): + doc = self._params['db'].create_doc({}) + pool.acquire() + self._params['docs'].append(doc.doc_id) + pool.release() + + + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread( + {'docs': docs, 'db': self.db}, + _run_method) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + # assert length of transaction_log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + REPEAT_TIMES, len(transaction_log)) + + # assert all documents are in the log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_transaction_log_after_concurrent_deletes(self): + """ + Assert that the transaction_log increases accordingly with concurrent + puts and deletes. + """ + threads = [] + docs = [] + pool = threading.BoundedSemaphore(value=1) + + # create/delete method that will be run concurrently + def _run_method(self): + doc = self._params['db'].create_doc({}) + pool.acquire() + docs.append(doc.doc_id) + pool.release() + self._params['db'].delete_doc(doc) + + # launch concurrent threads + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread({'db': self.db}, _run_method) + thread.start() + threads.append(thread) + + # wait for threads to finish + for thread in threads: + thread.join() + + # assert transaction log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + 2*REPEAT_TIMES, len(transaction_log)) + # assert that each doc appears twice in the transaction_log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 2, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_sync_log_after_concurrent_puts_and_sync(self): + """ + Assert that the sync_log is correct after concurrent syncs. + """ + threads = [] + docs = [] + pool = threading.BoundedSemaphore(value=1) + self.startServer() + sol = self._soledad_instance( + auth_token='auth-token', + server_url=self.getURL()) + + def _run_method(self): + # create a lot of documents + doc = self._params['sol'].create_doc({}) + pool.acquire() + docs.append(doc.doc_id) + pool.release() + + # launch threads to create documents in parallel + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread( + {'sol': sol, 'syncs': i}, + _run_method) + thread.start() + threads.append(thread) + + # wait for threads to finish + for thread in threads: + thread.join() + + # do the sync! + sol.sync() + + transaction_log = self.db._get_transaction_log() + self.assertEqual(REPEAT_TIMES, len(transaction_log)) + # assert all documents are in the remote log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) -- cgit v1.2.3 From 8d504fa812da93df3a26c4b4b761a74685d40f25 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 24 Dec 2013 08:17:37 -0200 Subject: Avoid concurrent sync attempts from the same replica in the client (#4451). --- .../tests/test_couch_operations_atomicity.py | 43 ++++++++++++++++++++++ 1 file changed, 43 insertions(+) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index a0c473b1..8b001859 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -337,3 +337,46 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): self.assertEqual( 1, len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_concurrent_syncs_do_not_fail(self): + """ + Assert that concurrent attempts to sync end up being executed + sequentially and do not fail. + """ + threads = [] + docs = [] + pool = threading.BoundedSemaphore(value=1) + self.startServer() + sol = self._soledad_instance( + auth_token='auth-token', + server_url=self.getURL()) + + def _run_method(self): + # create a lot of documents + doc = self._params['sol'].create_doc({}) + # do the sync! + sol.sync() + pool.acquire() + docs.append(doc.doc_id) + pool.release() + + # launch threads to create documents in parallel + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread( + {'sol': sol, 'syncs': i}, + _run_method) + thread.start() + threads.append(thread) + + # wait for threads to finish + for thread in threads: + thread.join() + + transaction_log = self.db._get_transaction_log() + self.assertEqual(REPEAT_TIMES, len(transaction_log)) + # assert all documents are in the remote log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) -- cgit v1.2.3 From 89d3e4a1321ff9701ac67933f8e649cfecd1d95e Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 6 Jan 2014 10:29:43 -0200 Subject: Add proper error reporting to shared db lock. --- common/src/leap/soledad/common/errors.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index 7c2d7296..62de19f8 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -57,14 +57,26 @@ class AlreadyLockedError(errors.U1DBError): wire_description = "lock is locked" status = 403 + +class LockTimedOutError(errors.U1DBError): + """ + Exception raised when timing out while trying to lock the shared database. + """ + + wire_description = "lock timed out" + status = 408 + + # update u1db "wire description to status" and "wire description to exception" # maps. -for e in [InvalidTokenError, NotLockedError, AlreadyLockedError]: +for e in [InvalidTokenError, NotLockedError, AlreadyLockedError, + LockTimedOutError]: http_errors.wire_description_to_status.update({ e.wire_description: e.status}) errors.wire_description_to_exc.update({ e.wire_description: e}) + # u1db error statuses also have to be updated http_errors.ERROR_STATUSES = set( http_errors.wire_description_to_status.values()) -- cgit v1.2.3 From 82628d8284c5c11452da75a6604f2f68b8dd8520 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 7 Jan 2014 11:05:39 -0200 Subject: Use temp dir for server side locks (#4918). --- common/src/leap/soledad/common/errors.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index 62de19f8..c41e0b0f 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -67,10 +67,19 @@ class LockTimedOutError(errors.U1DBError): status = 408 +class CouldNotObtainLockError(errors.U1DBError): + """ + Exception raised when timing out while trying to lock the shared database. + """ + + wire_description = "error obtaining lock" + status = 500 + + # update u1db "wire description to status" and "wire description to exception" # maps. for e in [InvalidTokenError, NotLockedError, AlreadyLockedError, - LockTimedOutError]: + LockTimedOutError, CouldNotObtainLockError]: http_errors.wire_description_to_status.update({ e.wire_description: e.status}) errors.wire_description_to_exc.update({ -- cgit v1.2.3 From 3587985a014daefbae75351911b06485c3c91134 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 16 Jan 2014 19:02:01 -0200 Subject: Improve error reporting on couch backend design documents access failure (#4994). --- common/src/leap/soledad/common/couch.py | 415 ++++++++++++++++----- common/src/leap/soledad/common/errors.py | 74 +++- common/src/leap/soledad/common/tests/test_couch.py | 219 ++++++++++- 3 files changed, 596 insertions(+), 112 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index d2414477..45ca4282 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -27,14 +27,23 @@ import socket from couchdb.client import Server -from couchdb.http import ResourceNotFound, Unauthorized -from u1db import errors, query_parser, vectorclock +from couchdb.http import ResourceNotFound, Unauthorized, ServerError +from u1db import query_parser, vectorclock +from u1db.errors import ( + DatabaseDoesNotExist, + InvalidGeneration, + RevisionConflict, + InvalidDocId, + ConflictedDoc, + DocumentDoesNotExist, + DocumentAlreadyDeleted, +) from u1db.backends import CommonBackend, CommonSyncTarget from u1db.remote import http_app from u1db.remote.server_state import ServerState -from leap.soledad.common import USER_DB_PREFIX, ddocs +from leap.soledad.common import USER_DB_PREFIX, ddocs, errors from leap.soledad.common.document import SoledadDocument @@ -153,6 +162,66 @@ class CouchDocument(SoledadDocument): http_app.Document = CouchDocument +def raise_missing_design_doc_error(exc, ddoc_path): + """ + Raise an appropriate exception when catching a ResourceNotFound when + accessing a design document. + + :param exc: The exception cought. + :type exc: ResourceNotFound + :param ddoc_path: A list representing the requested path. + :type ddoc_path: list + + :raise MissingDesignDocError: Raised when tried to access a missing design + document. + :raise MissingDesignDocListFunctionError: Raised when trying to access a + missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a design + document for an yet unknown reason. + """ + path = "".join(ddoc_path) + if exc.message[1] == 'missing': + raise errors.MissingDesignDocError(path) + elif exc.message[1] == 'missing function' or \ + exc.message[1].startswith('missing lists function'): + raise errors.MissingDesignDocListFunctionError(path) + elif exc.message[1] == 'missing_named_view': + raise errors.MissingDesignDocNamedViewError(path) + elif exc.message[1] == 'deleted': + raise errors.MissingDesignDocDeletedError(path) + # other errors are unknown for now + raise errors.DesignDocUnknownError(path) + + +def raise_server_error(exc, ddoc_path): + """ + Raise an appropriate exception when catching a ServerError when + accessing a design document. + + :param exc: The exception cought. + :type exc: ResourceNotFound + :param ddoc_path: A list representing the requested path. + :type ddoc_path: list + + :raise MissingDesignDocListFunctionError: Raised when trying to access a + missing list function on a + design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a design + document for an yet unknown reason. + """ + path = "".join(ddoc_path) + if exc.message[1][0] == 'unnamed_error': + raise errors.MissingDesignDocListFunctionError(path) + # other errors are unknown for now + raise errors.DesignDocUnknownError(path) + + class CouchDatabase(CommonBackend): """ A U1DB implementation that uses CouchDB as its persistence layer. @@ -182,7 +251,7 @@ class CouchDatabase(CommonBackend): server[dbname] except ResourceNotFound: if not create: - raise errors.DatabaseDoesNotExist() + raise DatabaseDoesNotExist() return cls(url, dbname) def __init__(self, url, dbname, replica_uid=None, full_commit=True, @@ -318,12 +387,31 @@ class CouchDatabase(CommonBackend): :return: The current generation. :rtype: int + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ # query a couch list function - res = self._database.resource( - '_design', 'transactions', '_list', 'generation', 'log') - response = res.get_json() - return response[2]['generation'] + ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json() + return response[2]['generation'] + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) + except ServerError as e: + raise_server_error(e, ddoc_path) def _get_generation_info(self): """ @@ -331,12 +419,31 @@ class CouchDatabase(CommonBackend): :return: A tuple containing the current generation and transaction id. :rtype: (int, str) + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ # query a couch list function - res = self._database.resource( - '_design', 'transactions', '_list', 'generation', 'log') - response = res.get_json() - return (response[2]['generation'], response[2]['transaction_id']) + ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json() + return (response[2]['generation'], response[2]['transaction_id']) + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) + except ServerError as e: + raise_server_error(e, ddoc_path) def _get_trans_id_for_gen(self, generation): """ @@ -349,16 +456,36 @@ class CouchDatabase(CommonBackend): :rtype: str :raise InvalidGeneration: Raised when the generation does not exist. + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ if generation == 0: return '' # query a couch list function - res = self._database.resource( - '_design', 'transactions', '_list', 'trans_id_for_gen', 'log') - response = res.get_json(gen=generation) - if response[2] == {}: - raise errors.InvalidGeneration - return response[2]['transaction_id'] + ddoc_path = [ + '_design', 'transactions', '_list', 'trans_id_for_gen', 'log' + ] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json(gen=generation) + if response[2] == {}: + raise InvalidGeneration + return response[2]['transaction_id'] + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) + except ServerError as e: + raise_server_error(e, ddoc_path) def _get_transaction_log(self): """ @@ -366,12 +493,31 @@ class CouchDatabase(CommonBackend): :return: The complete transaction log. :rtype: [(str, str)] + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ # query a couch view - res = self._database.resource( - '_design', 'transactions', '_view', 'log') - response = res.get_json() - return map(lambda row: (row['id'], row['value']), response[2]['rows']) + ddoc_path = ['_design', 'transactions', '_view', 'log'] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json() + return map( + lambda row: (row['id'], row['value']), + response[2]['rows']) + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) def _get_doc(self, doc_id, check_for_conflicts=False): """ @@ -472,6 +618,19 @@ class CouchDatabase(CommonBackend): :raise RevisionConflict: Raised when trying to update a document but couch revisions mismatch. + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ trans_id = self._allocate_transaction_id() # encode content @@ -489,26 +648,29 @@ class CouchDatabase(CommonBackend): doc.get_conflicts())) )[:-1] # exclude \n # perform the request - resource = self._database.resource( - '_design', 'docs', '_update', 'put', doc.doc_id) - response = resource.put_json( - body={ - 'couch_rev': old_doc.couch_rev - if old_doc is not None else None, - 'u1db_rev': doc.rev, - 'content': content, - 'trans_id': trans_id, - 'conflicts': conflicts, - 'update_conflicts': update_conflicts, - }, - headers={'content-type': 'application/json'}) - # the document might have been updated in between, so we check for the - # return message - msg = response[2].read() - if msg == 'ok': - return - elif msg == 'revision conflict': - raise errors.RevisionConflict() + ddoc_path = ['_design', 'docs', '_update', 'put', doc.doc_id] + resource = self._database.resource(*ddoc_path) + try: + response = resource.put_json( + body={ + 'couch_rev': old_doc.couch_rev + if old_doc is not None else None, + 'u1db_rev': doc.rev, + 'content': content, + 'trans_id': trans_id, + 'conflicts': conflicts, + 'update_conflicts': update_conflicts, + }, + headers={'content-type': 'application/json'}) + # the document might have been updated in between, so we check for + # the return message + msg = response[2].read() + if msg == 'ok': + return + elif msg == 'revision conflict': + raise RevisionConflict() + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) def put_doc(self, doc): """ @@ -522,26 +684,26 @@ class CouchDatabase(CommonBackend): :return: new_doc_rev - The new revision identifier for the document. The Document object will also be updated. - :raise errors.InvalidDocId: Raised if the document's id is invalid. - :raise errors.DocumentTooBig: Raised if the document size is too big. - :raise errors.ConflictedDoc: Raised if the document has conflicts. + :raise InvalidDocId: Raised if the document's id is invalid. + :raise DocumentTooBig: Raised if the document size is too big. + :raise ConflictedDoc: Raised if the document has conflicts. """ if doc.doc_id is None: - raise errors.InvalidDocId() + raise InvalidDocId() self._check_doc_id(doc.doc_id) self._check_doc_size(doc) old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) if old_doc and old_doc.has_conflicts: - raise errors.ConflictedDoc() + raise ConflictedDoc() if old_doc and doc.rev is None and old_doc.is_tombstone(): new_rev = self._allocate_doc_rev(old_doc.rev) else: if old_doc is not None: if old_doc.rev != doc.rev: - raise errors.RevisionConflict() + raise RevisionConflict() else: if doc.rev is not None: - raise errors.RevisionConflict() + raise RevisionConflict() new_rev = self._allocate_doc_rev(doc.rev) doc.rev = new_rev self._put_doc(old_doc, doc) @@ -563,32 +725,53 @@ class CouchDatabase(CommonBackend): to the last intervening change and sorted by generation (old changes first) :rtype: (int, str, [(str, int, str)]) + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ # query a couch list function - res = self._database.resource( - '_design', 'transactions', '_list', 'whats_changed', 'log') - response = res.get_json(old_gen=old_generation) - results = map( - lambda row: - (row['generation'], row['doc_id'], row['transaction_id']), - response[2]['transactions']) - results.reverse() - cur_gen = old_generation - seen = set() - changes = [] - newest_trans_id = '' - for generation, doc_id, trans_id in results: - if doc_id not in seen: - changes.append((doc_id, generation, trans_id)) - seen.add(doc_id) - if changes: - cur_gen = changes[0][1] # max generation - newest_trans_id = changes[0][2] - changes.reverse() - else: - cur_gen, newest_trans_id = self._get_generation_info() + ddoc_path = [ + '_design', 'transactions', '_list', 'whats_changed', 'log' + ] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json(old_gen=old_generation) + results = map( + lambda row: + (row['generation'], row['doc_id'], row['transaction_id']), + response[2]['transactions']) + results.reverse() + cur_gen = old_generation + seen = set() + changes = [] + newest_trans_id = '' + for generation, doc_id, trans_id in results: + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + if changes: + cur_gen = changes[0][1] # max generation + newest_trans_id = changes[0][2] + changes.reverse() + else: + cur_gen, newest_trans_id = self._get_generation_info() - return cur_gen, newest_trans_id, changes + return cur_gen, newest_trans_id, changes + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) + except ServerError as e: + raise_server_error(e, ddoc_path) def delete_doc(self, doc): """ @@ -600,22 +783,22 @@ class CouchDatabase(CommonBackend): :param doc: The document to mark as deleted. :type doc: CouchDocument. - :raise errors.DocumentDoesNotExist: Raised if the document does not + :raise DocumentDoesNotExist: Raised if the document does not exist. - :raise errors.RevisionConflict: Raised if the revisions do not match. - :raise errors.DocumentAlreadyDeleted: Raised if the document is + :raise RevisionConflict: Raised if the revisions do not match. + :raise DocumentAlreadyDeleted: Raised if the document is already deleted. - :raise errors.ConflictedDoc: Raised if the doc has conflicts. + :raise ConflictedDoc: Raised if the doc has conflicts. """ old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) if old_doc is None: - raise errors.DocumentDoesNotExist + raise DocumentDoesNotExist if old_doc.rev != doc.rev: - raise errors.RevisionConflict() + raise RevisionConflict() if old_doc.is_tombstone(): - raise errors.DocumentAlreadyDeleted + raise DocumentAlreadyDeleted if old_doc.has_conflicts: - raise errors.ConflictedDoc() + raise ConflictedDoc() new_rev = self._allocate_doc_rev(doc.rev) doc.rev = new_rev doc.make_tombstone() @@ -737,17 +920,34 @@ class CouchDatabase(CommonBackend): :param other_transaction_id: The transaction id associated with the generation. :type other_transaction_id: str + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ # query a couch update function - res = self._database.resource( - '_design', 'syncs', '_update', 'put', 'u1db_sync_log') - res.put_json( - body={ - 'other_replica_uid': other_replica_uid, - 'other_generation': other_generation, - 'other_transaction_id': other_transaction_id, - }, - headers={'content-type': 'application/json'}) + ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log'] + res = self._database.resource(*ddoc_path) + try: + res.put_json( + body={ + 'other_replica_uid': other_replica_uid, + 'other_generation': other_generation, + 'other_transaction_id': other_transaction_id, + }, + headers={'content-type': 'application/json'}) + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) def _add_conflict(self, doc, my_doc_rev, my_content): """ @@ -774,7 +974,7 @@ class CouchDatabase(CommonBackend): """ Delete the conflicted revisions from the list of conflicts of C{doc}. - Note that thie method does not actually update the backed; rather, it + Note that this method does not actually update the backend; rather, it updates the CouchDocument object which will provide the conflict data when the atomic document update is made. @@ -842,6 +1042,20 @@ class CouchDatabase(CommonBackend): :param conflicted_doc_revs: A list of revisions that the new content supersedes. :type conflicted_doc_revs: [str] + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) new_rev = self._ensure_maximal_rev(cur_doc.rev, @@ -855,8 +1069,10 @@ class CouchDatabase(CommonBackend): self._add_conflict(doc, new_rev, doc.get_json()) self._delete_conflicts(doc, superseded_revs) # perform request to resolve document in server - resource = self._database.resource( - '_design', 'docs', '_update', 'resolve_doc', doc.doc_id) + ddoc_path = [ + '_design', 'docs', '_update', 'resolve_doc', doc.doc_id + ] + resource = self._database.resource(*ddoc_path) conflicts = None if doc.has_conflicts: conflicts = binascii.b2a_base64( @@ -864,12 +1080,15 @@ class CouchDatabase(CommonBackend): map(lambda cdoc: (cdoc.rev, cdoc.content), doc.get_conflicts())) )[:-1] # exclude \n - response = resource.put_json( - body={ - 'couch_rev': cur_doc.couch_rev, - 'conflicts': conflicts, - }, - headers={'content-type': 'application/json'}) + try: + response = resource.put_json( + body={ + 'couch_rev': cur_doc.couch_rev, + 'conflicts': conflicts, + }, + headers={'content-type': 'application/json'}) + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, replica_trans_id=''): diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index c41e0b0f..f241ee06 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -25,11 +25,17 @@ from u1db import errors from u1db.remote import http_errors +class SoledadError(errors.U1DBError): + """ + Base Soledad HTTP errors. + """ + pass + # -# LockResource: a lock based on a document in the shared database. +# LockResource errors # -class InvalidTokenError(errors.U1DBError): +class InvalidTokenError(SoledadError): """ Exception raised when trying to unlock shared database with invalid token. """ @@ -38,7 +44,7 @@ class InvalidTokenError(errors.U1DBError): status = 401 -class NotLockedError(errors.U1DBError): +class NotLockedError(SoledadError): """ Exception raised when trying to unlock shared database when it is not locked. @@ -48,7 +54,7 @@ class NotLockedError(errors.U1DBError): status = 404 -class AlreadyLockedError(errors.U1DBError): +class AlreadyLockedError(SoledadError): """ Exception raised when trying to lock shared database but it is already locked. @@ -58,7 +64,7 @@ class AlreadyLockedError(errors.U1DBError): status = 403 -class LockTimedOutError(errors.U1DBError): +class LockTimedOutError(SoledadError): """ Exception raised when timing out while trying to lock the shared database. """ @@ -67,7 +73,7 @@ class LockTimedOutError(errors.U1DBError): status = 408 -class CouldNotObtainLockError(errors.U1DBError): +class CouldNotObtainLockError(SoledadError): """ Exception raised when timing out while trying to lock the shared database. """ @@ -76,10 +82,64 @@ class CouldNotObtainLockError(errors.U1DBError): status = 500 +# +# CouchDatabase errors +# + +class MissingDesignDocError(SoledadError): + """ + Raised when trying to access a missing couch design document. + """ + + wire_description = "missing design document" + status = 500 + + +class MissingDesignDocNamedViewError(SoledadError): + """ + Raised when trying to access a missing named view on a couch design + document. + """ + + wire_description = "missing design document named function" + status = 500 + + +class MissingDesignDocListFunctionError(SoledadError): + """ + Raised when trying to access a missing list function on a couch design + document. + """ + + wire_description = "missing design document list function" + status = 500 + + +class MissingDesignDocDeletedError(SoledadError): + """ + Raised when trying to access a deleted couch design document. + """ + + wire_description = "design document was deleted" + status = 500 + + +class DesignDocUnknownError(SoledadError): + """ + Raised when trying to access a couch design document and getting an + unknown error. + """ + + wire_description = "missing design document unknown error" + status = 500 + + # update u1db "wire description to status" and "wire description to exception" # maps. for e in [InvalidTokenError, NotLockedError, AlreadyLockedError, - LockTimedOutError, CouldNotObtainLockError]: + LockTimedOutError, CouldNotObtainLockError, MissingDesignDocError, + MissingDesignDocListFunctionError, MissingDesignDocNamedViewError, + MissingDesignDocDeletedError, DesignDocUnknownError]: http_errors.wire_description_to_status.update({ e.wire_description: e.status}) errors.wire_description_to_exc.update({ diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 72346333..0e07575d 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -24,16 +24,17 @@ import re import copy import shutil from base64 import b64decode +from mock import Mock from couchdb.client import Server -from u1db import errors +from u1db import errors as u1db_errors from leap.common.files import mkdir_p from leap.soledad.common.tests import u1db_tests as tests from leap.soledad.common.tests.u1db_tests import test_backends from leap.soledad.common.tests.u1db_tests import test_sync -from leap.soledad.common import couch +from leap.soledad.common import couch, errors import simplejson as json @@ -356,7 +357,7 @@ class IndexedCouchDatabase(couch.CouchDatabase): def __init__(self, url, dbname, replica_uid=None, full_commit=True, session=None, ensure_ddocs=True): old_class.__init__(self, url, dbname, replica_uid, full_commit, - session, ensure_ddocs=True) + session, ensure_ddocs=ensure_ddocs) self._indexes = {} def _put_doc(self, old_doc, doc): @@ -372,7 +373,7 @@ class IndexedCouchDatabase(couch.CouchDatabase): if self._indexes[index_name]._definition == list( index_expressions): return - raise errors.IndexNameTakenError + raise u1db_errors.IndexNameTakenError index = InMemoryIndex(index_name, list(index_expressions)) _, all_docs = self.get_all_docs() for doc in all_docs: @@ -392,7 +393,7 @@ class IndexedCouchDatabase(couch.CouchDatabase): try: index = self._indexes[index_name] except KeyError: - raise errors.IndexDoesNotExist + raise u1db_errors.IndexDoesNotExist doc_ids = index.lookup(key_values) result = [] for doc_id in doc_ids: @@ -405,7 +406,7 @@ class IndexedCouchDatabase(couch.CouchDatabase): try: index = self._indexes[index_name] except KeyError: - raise errors.IndexDoesNotExist + raise u1db_errors.IndexDoesNotExist if isinstance(start_value, basestring): start_value = (start_value,) if isinstance(end_value, basestring): @@ -420,7 +421,7 @@ class IndexedCouchDatabase(couch.CouchDatabase): try: index = self._indexes[index_name] except KeyError: - raise errors.IndexDoesNotExist + raise u1db_errors.IndexDoesNotExist keys = index.keys() # XXX inefficiency warning return list(set([tuple(key.split('\x01')) for key in keys])) @@ -461,4 +462,208 @@ class CouchDatabaseSyncTests(test_sync.DatabaseSyncTests, CouchDBTestCase): test_sync.DatabaseSyncTests.tearDown(self) +class CouchDatabaseExceptionsTests(CouchDBTestCase): + + def setUp(self): + CouchDBTestCase.setUp(self) + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=False) # note that we don't enforce ddocs here + + def tearDown(self): + self.db.delete_database() + + def test_missing_design_doc_raises(self): + """ + Test that all methods that access design documents will raise if the + design docs are not present. + """ + # _get_generation() + self.assertRaises( + errors.MissingDesignDocError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocError, + self.db._get_trans_id_for_gen, 1) + # _get_transaction_log() + self.assertRaises( + errors.MissingDesignDocError, + self.db._get_transaction_log) + # create_doc() + self.assertRaises( + errors.MissingDesignDocError, + self.db.create_doc, {}) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocError, + self.db.whats_changed) + # _do_set_replica_gen_and_trans_id() + self.assertRaises( + errors.MissingDesignDocError, + self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) + # fake a conflict so we can test resolve_doc() + first_rev = self.db._allocate_doc_rev(None) + doc = couch.CouchDocument( + doc_id='mydoc', rev=self.db._allocate_doc_rev(first_rev)) + self.db._get_doc = Mock(return_value=doc) + self.assertRaises( + errors.MissingDesignDocError, + self.db.resolve_doc, doc, [first_rev]) + + def test_missing_design_doc_functions_raises(self): + """ + Test that all methods that access design documents list functions + will raise if the functions are not present. + """ + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=True) + # erase views from _design/transactions + transactions = self.db._database['_design/transactions'] + transactions['lists'] = {} + self.db._database.save(transactions) + # _get_generation() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_trans_id_for_gen, 1) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db.whats_changed) + + def test_absent_design_doc_functions_raises(self): + """ + Test that all methods that access design documents list functions + will raise if the functions are not present. + """ + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=True) + # erase views from _design/transactions + transactions = self.db._database['_design/transactions'] + del transactions['lists'] + self.db._database.save(transactions) + # _get_generation() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_trans_id_for_gen, 1) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db.whats_changed) + + def test_missing_design_doc_named_views_raises(self): + """ + Test that all methods that access design documents' named views will + raise if the views are not present. + """ + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=True) + # erase views from _design/docs + docs = self.db._database['_design/docs'] + del docs['views'] + self.db._database.save(docs) + # erase views from _design/syncs + syncs = self.db._database['_design/syncs'] + del syncs['views'] + self.db._database.save(syncs) + # erase views from _design/transactions + transactions = self.db._database['_design/transactions'] + del transactions['views'] + self.db._database.save(transactions) + # _get_generation() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db._get_trans_id_for_gen, 1) + # _get_transaction_log() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db._get_transaction_log) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db.whats_changed) + + def test_deleted_design_doc_raises(self): + """ + Test that all methods that access design documents will raise if the + design docs are not present. + """ + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=True) + # delete _design/docs + del self.db._database['_design/docs'] + # delete _design/syncs + del self.db._database['_design/syncs'] + # delete _design/transactions + del self.db._database['_design/transactions'] + # _get_generation() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._get_trans_id_for_gen, 1) + # _get_transaction_log() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._get_transaction_log) + # create_doc() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db.create_doc, {}) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db.whats_changed) + # _do_set_replica_gen_and_trans_id() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) + # fake a conflict so we can test resolve_doc() + first_rev = self.db._allocate_doc_rev(None) + doc = couch.CouchDocument( + doc_id='mydoc', rev=self.db._allocate_doc_rev(first_rev)) + self.db._get_doc = Mock(return_value=doc) + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db.resolve_doc, doc, [first_rev]) + + load_tests = tests.load_with_scenarios -- cgit v1.2.3 From f125180609c3e97f56eedfe534f7f1c6f985f8f6 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 25 Dec 2013 21:02:58 -0200 Subject: Allow sync of large files (~100MB) (#4836). --- common/src/leap/soledad/common/couch.py | 7 +++- .../leap/soledad/common/tests/couchdb.ini.template | 2 +- .../tests/test_couch_operations_atomicity.py | 1 + .../src/leap/soledad/common/tests/test_server.py | 37 ++++++++++++++++++++++ 4 files changed, 45 insertions(+), 2 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 45ca4282..3ca3f408 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -27,7 +27,7 @@ import socket from couchdb.client import Server -from couchdb.http import ResourceNotFound, Unauthorized, ServerError +from couchdb.http import ResourceNotFound, Unauthorized, ServerError, Session from u1db import query_parser, vectorclock from u1db.errors import ( DatabaseDoesNotExist, @@ -50,6 +50,9 @@ from leap.soledad.common.document import SoledadDocument logger = logging.getLogger(__name__) +COUCH_TIMEOUT = 120 # timeout for transfers between Soledad server and Couch + + class InvalidURLError(Exception): """ Exception raised when Soledad encounters a malformed URL. @@ -275,6 +278,8 @@ class CouchDatabase(CommonBackend): # save params self._url = url self._full_commit = full_commit + if session is None: + session = Session(timeout=COUCH_TIMEOUT) self._session = session self._factory = CouchDocument self._real_replica_uid = None diff --git a/common/src/leap/soledad/common/tests/couchdb.ini.template b/common/src/leap/soledad/common/tests/couchdb.ini.template index 217ae201..1fc2205b 100644 --- a/common/src/leap/soledad/common/tests/couchdb.ini.template +++ b/common/src/leap/soledad/common/tests/couchdb.ini.template @@ -6,7 +6,7 @@ database_dir = %(tempdir)s/lib view_index_dir = %(tempdir)s/lib max_document_size = 4294967296 ; 4 GB -os_process_timeout = 5000 ; 5 seconds. for view and external servers. +os_process_timeout = 120000 ; 120 seconds. for view and external servers. max_dbs_open = 100 delayed_commits = true ; set this to false to ensure an fsync before 201 Created is returned uri_file = %(tempdir)s/lib/couch.uri diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index 8b001859..5384d465 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -100,6 +100,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") def tearDown(self): + self.db.delete_database() CouchDBTestCase.tearDown(self) TestCaseWithServer.tearDown(self) diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index 83df192b..ff699486 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -25,6 +25,7 @@ import tempfile import simplejson as json import mock import time +import binascii from leap.common.testing.basetest import BaseLeapTest @@ -436,6 +437,42 @@ class EncryptedSyncTestCase( self.assertEqual(doc1, doc2) + def test_sync_very_large_files(self): + """ + Test if Soledad can sync very large files. + """ + # define the size of the "very large file" + length = 100*(10**6) # 100 MB + self.startServer() + # instantiate soledad and create a document + sol1 = self._soledad_instance( + # token is verified in test_target.make_token_soledad_app + auth_token='auth-token' + ) + _, doclist = sol1.get_all_docs() + self.assertEqual([], doclist) + content = binascii.hexlify(os.urandom(length/2)) # len() == length + doc1 = sol1.create_doc({'data': content}) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # instantiate soledad with empty db, but with same secrets path + sol2 = self._soledad_instance(prefix='x', auth_token='auth-token') + _, doclist = sol2.get_all_docs() + self.assertEqual([], doclist) + sol2._secrets_path = sol1.secrets_path + sol2._load_secrets() + sol2._set_secret_id(sol1._secret_id) + # sync the new instance + sol2._server_url = self.getURL() + sol2.sync() + _, doclist = sol2.get_all_docs() + self.assertEqual(1, len(doclist)) + doc2 = doclist[0] + # assert incoming doc is equal to the first sent doc + self.assertEqual(doc1, doc2) + + class LockResourceTestCase( CouchDBTestCase, TestCaseWithServer): """ -- cgit v1.2.3 From 8f755c7cfcad8da2e6446f878c3df1f8e9cbae5b Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 26 Dec 2013 17:30:58 -0200 Subject: Add test for syncing many small documents (#4836). --- common/src/leap/soledad/common/tests/test_couch.py | 5 ++-- .../src/leap/soledad/common/tests/test_server.py | 35 ++++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 0e07575d..c833680e 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -81,9 +81,10 @@ class CouchDBWrapper(object): mkdir_p(os.path.join(self.tempdir, 'lib')) mkdir_p(os.path.join(self.tempdir, 'log')) args = ['couchdb', '-n', '-a', confPath] - #null = open('/dev/null', 'w') + null = open('/dev/null', 'w') + self.process = subprocess.Popen( - args, env=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + args, env=None, stdout=null.fileno(), stderr=null.fileno(), close_fds=True) # find port logPath = os.path.join(self.tempdir, 'log', 'couch.log') diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index ff699486..922f16a2 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -473,6 +473,41 @@ class EncryptedSyncTestCase( self.assertEqual(doc1, doc2) + def test_sync_many_small_files(self): + """ + Test if Soledad can sync many smallfiles. + """ + number_of_docs = 100 + self.startServer() + # instantiate soledad and create a document + sol1 = self._soledad_instance( + # token is verified in test_target.make_token_soledad_app + auth_token='auth-token' + ) + _, doclist = sol1.get_all_docs() + self.assertEqual([], doclist) + # create many small files + for i in range(0, number_of_docs): + sol1.create_doc(json.loads(simple_doc)) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # instantiate soledad with empty db, but with same secrets path + sol2 = self._soledad_instance(prefix='x', auth_token='auth-token') + _, doclist = sol2.get_all_docs() + self.assertEqual([], doclist) + sol2._secrets_path = sol1.secrets_path + sol2._load_secrets() + sol2._set_secret_id(sol1._secret_id) + # sync the new instance + sol2._server_url = self.getURL() + sol2.sync() + _, doclist = sol2.get_all_docs() + self.assertEqual(number_of_docs, len(doclist)) + # assert incoming docs are equal to sent docs + for doc in doclist: + self.assertEqual(sol1.get_doc(doc.doc_id), doc) + class LockResourceTestCase( CouchDBTestCase, TestCaseWithServer): """ -- cgit v1.2.3 From dc35d67834edde4e0f927c35e0459c27b575f08d Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 26 Dec 2013 17:41:41 -0200 Subject: Make couch tests use only one couch instance. --- common/src/leap/soledad/common/tests/test_couch.py | 14 ++++++------- .../src/leap/soledad/common/tests/test_server.py | 23 +++++++++++++++++++++- 2 files changed, 29 insertions(+), 8 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index c833680e..dc0ea906 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -127,21 +127,21 @@ class CouchDBTestCase(unittest.TestCase): TestCase base class for tests against a real CouchDB server. """ - def setUp(self): + @classmethod + def setUpClass(cls): """ Make sure we have a CouchDB instance for a test. """ - self.wrapper = CouchDBWrapper() - self.wrapper.start() + cls.wrapper = CouchDBWrapper() + cls.wrapper.start() #self.db = self.wrapper.db - unittest.TestCase.setUp(self) - def tearDown(self): + @classmethod + def tearDownClass(cls): """ Stop CouchDB instance for test. """ - self.wrapper.stop() - unittest.TestCase.tearDown(self) + cls.wrapper.stop() #----------------------------------------------------------------------------- diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index 922f16a2..06595ed2 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -377,6 +377,7 @@ class EncryptedSyncTestCase( doc2 = doclist[0] # assert incoming doc is equal to the first sent doc self.assertEqual(doc1, doc2) + db.delete_database() def test_encrypted_sym_sync_with_unicode_passphrase(self): """ @@ -435,7 +436,7 @@ class EncryptedSyncTestCase( doc2 = doclist[0] # assert incoming doc is equal to the first sent doc self.assertEqual(doc1, doc2) - + db.delete_database() def test_sync_very_large_files(self): """ @@ -471,6 +472,13 @@ class EncryptedSyncTestCase( doc2 = doclist[0] # assert incoming doc is equal to the first sent doc self.assertEqual(doc1, doc2) + # delete remote database + db = CouchDatabase( + self._couch_url, + # the name of the user database is "user-". + 'user-user-uuid', + ) + db.delete_database() def test_sync_many_small_files(self): @@ -507,6 +515,13 @@ class EncryptedSyncTestCase( # assert incoming docs are equal to sent docs for doc in doclist: self.assertEqual(sol1.get_doc(doc.doc_id), doc) + # delete remote database + db = CouchDatabase( + self._couch_url, + # the name of the user database is "user-". + 'user-user-uuid', + ) + db.delete_database() class LockResourceTestCase( CouchDBTestCase, TestCaseWithServer): @@ -533,6 +548,12 @@ class LockResourceTestCase( def tearDown(self): CouchDBTestCase.tearDown(self) TestCaseWithServer.tearDown(self) + # delete remote database + db = CouchDatabase( + self._couch_url, + 'shared', + ) + db.delete_database() def test__try_obtain_filesystem_lock(self): responder = mock.Mock() -- cgit v1.2.3 From 4a2c8a111470247d15a5a1771c5ddb18980f37b7 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 22 Jan 2014 15:11:43 -0200 Subject: Improve DesignDocUnknownError error reporting. --- common/src/leap/soledad/common/couch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 3ca3f408..40d64370 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -199,7 +199,7 @@ def raise_missing_design_doc_error(exc, ddoc_path): elif exc.message[1] == 'deleted': raise errors.MissingDesignDocDeletedError(path) # other errors are unknown for now - raise errors.DesignDocUnknownError(path) + raise errors.DesignDocUnknownError("%s: %s" % (path, str(exc.message))) def raise_server_error(exc, ddoc_path): -- cgit v1.2.3 From 49e7366e3b6dcad7fde6b78ed3c2bffd3b1430e4 Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 24 Jan 2014 18:24:55 -0200 Subject: Add decorator to register exceptions. --- common/src/leap/soledad/common/errors.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index f241ee06..fc4d106a 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -25,16 +25,29 @@ from u1db import errors from u1db.remote import http_errors +exceptions = [] + + +def register_exception(cls): + """ + A small decorator to make it easier to register exceptions. + """ + exceptions.append(cls) + return cls + + class SoledadError(errors.U1DBError): """ Base Soledad HTTP errors. """ pass + # # LockResource errors # +@register_exception class InvalidTokenError(SoledadError): """ Exception raised when trying to unlock shared database with invalid token. @@ -44,6 +57,7 @@ class InvalidTokenError(SoledadError): status = 401 +@register_exception class NotLockedError(SoledadError): """ Exception raised when trying to unlock shared database when it is not @@ -54,6 +68,7 @@ class NotLockedError(SoledadError): status = 404 +@register_exception class AlreadyLockedError(SoledadError): """ Exception raised when trying to lock shared database but it is already @@ -64,6 +79,7 @@ class AlreadyLockedError(SoledadError): status = 403 +@register_exception class LockTimedOutError(SoledadError): """ Exception raised when timing out while trying to lock the shared database. @@ -73,6 +89,7 @@ class LockTimedOutError(SoledadError): status = 408 +@register_exception class CouldNotObtainLockError(SoledadError): """ Exception raised when timing out while trying to lock the shared database. @@ -86,6 +103,7 @@ class CouldNotObtainLockError(SoledadError): # CouchDatabase errors # +@register_exception class MissingDesignDocError(SoledadError): """ Raised when trying to access a missing couch design document. @@ -95,6 +113,7 @@ class MissingDesignDocError(SoledadError): status = 500 +@register_exception class MissingDesignDocNamedViewError(SoledadError): """ Raised when trying to access a missing named view on a couch design @@ -105,6 +124,7 @@ class MissingDesignDocNamedViewError(SoledadError): status = 500 +@register_exception class MissingDesignDocListFunctionError(SoledadError): """ Raised when trying to access a missing list function on a couch design @@ -115,6 +135,7 @@ class MissingDesignDocListFunctionError(SoledadError): status = 500 +@register_exception class MissingDesignDocDeletedError(SoledadError): """ Raised when trying to access a deleted couch design document. @@ -124,6 +145,7 @@ class MissingDesignDocDeletedError(SoledadError): status = 500 +@register_exception class DesignDocUnknownError(SoledadError): """ Raised when trying to access a couch design document and getting an @@ -136,10 +158,7 @@ class DesignDocUnknownError(SoledadError): # update u1db "wire description to status" and "wire description to exception" # maps. -for e in [InvalidTokenError, NotLockedError, AlreadyLockedError, - LockTimedOutError, CouldNotObtainLockError, MissingDesignDocError, - MissingDesignDocListFunctionError, MissingDesignDocNamedViewError, - MissingDesignDocDeletedError, DesignDocUnknownError]: +for e in exceptions: http_errors.wire_description_to_status.update({ e.wire_description: e.status}) errors.wire_description_to_exc.update({ -- cgit v1.2.3 From c97f48660a1aaad96f7356ac1a5fce6265241e0f Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 24 Jan 2014 19:09:46 -0200 Subject: Improve unauthorized error messages. --- common/src/leap/soledad/common/errors.py | 46 ++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 14 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index fc4d106a..446c4c75 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -25,14 +25,17 @@ from u1db import errors from u1db.remote import http_errors -exceptions = [] - - def register_exception(cls): """ - A small decorator to make it easier to register exceptions. + A small decorator that registers exceptions in u1db maps. """ - exceptions.append(cls) + # update u1db "wire description to status" and "wire description to + # exception" maps. + http_errors.wire_description_to_status.update({ + cls.wire_description: cls.status}) + errors.wire_description_to_exc.update({ + cls.wire_description: cls}) + # do not modify the exception return cls @@ -43,6 +46,30 @@ class SoledadError(errors.U1DBError): pass +# +# Authorization errors +# + +@register_exception +class MissingAuthTokenError(errors.Unauthorized): + """ + Exception raised when failing to get authorization for some action because + the auth token is missing in the tokens db. + """ + + wire_description = "missing token" + status = 401 + +@register_exception +class InvalidAuthTokenError(errors.Unauthorized): + """ + Exception raised when failing to get authorization for some action because + the provided token is different from the one in the tokens db. + """ + + wire_descrition = "token mismatch" + status = 401 + # # LockResource errors # @@ -156,15 +183,6 @@ class DesignDocUnknownError(SoledadError): status = 500 -# update u1db "wire description to status" and "wire description to exception" -# maps. -for e in exceptions: - http_errors.wire_description_to_status.update({ - e.wire_description: e.status}) - errors.wire_description_to_exc.update({ - e.wire_description: e}) - - # u1db error statuses also have to be updated http_errors.ERROR_STATUSES = set( http_errors.wire_description_to_status.values()) -- cgit v1.2.3 From 7de7b4653b1815818611d2a6302bd38d1e7a7ccc Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 21 Feb 2014 17:49:28 -0300 Subject: Use less memory when putting docs on couch. Closes #5011. --- common/src/leap/soledad/common/couch.py | 267 +++++++++++++++------ .../leap/soledad/common/ddocs/docs/updates/put.js | 64 ----- common/src/leap/soledad/common/tests/test_couch.py | 8 - 3 files changed, 198 insertions(+), 141 deletions(-) delete mode 100644 common/src/leap/soledad/common/ddocs/docs/updates/put.js (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 40d64370..09fe1ca3 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -24,10 +24,21 @@ import uuid import logging import binascii import socket +import time +import sys + + +from StringIO import StringIO from couchdb.client import Server -from couchdb.http import ResourceNotFound, Unauthorized, ServerError, Session +from couchdb.http import ( + ResourceConflict, + ResourceNotFound, + Unauthorized, + ServerError, + Session, +) from u1db import query_parser, vectorclock from u1db.errors import ( DatabaseDoesNotExist, @@ -87,9 +98,9 @@ class CouchDocument(SoledadDocument): SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) self._couch_rev = None self._conflicts = None - self._modified_conflicts = False + self._transactions = None - def ensure_fetch_conflicts(self, get_conflicts_fun): + def _ensure_fetch_conflicts(self, get_conflicts_fun): """ Ensure conflict data has been fetched from the server. @@ -120,8 +131,7 @@ class CouchDocument(SoledadDocument): :type doc: CouchDocument """ if self._conflicts is None: - raise Exception("Run self.ensure_fetch_conflicts first!") - self._modified_conflicts = True + raise Exception("Run self._ensure_fetch_conflicts first!") self._conflicts.append(doc) self.has_conflicts = len(self._conflicts) > 0 @@ -133,25 +143,13 @@ class CouchDocument(SoledadDocument): :type conflict_revs: [str] """ if self._conflicts is None: - raise Exception("Run self.ensure_fetch_conflicts first!") + raise Exception("Run self._ensure_fetch_conflicts first!") conflicts_len = len(self._conflicts) self._conflicts = filter( lambda doc: doc.rev not in conflict_revs, self._conflicts) - if len(self._conflicts) < conflicts_len: - self._modified_conflicts = True self.has_conflicts = len(self._conflicts) > 0 - def modified_conflicts(self): - """ - Return whether this document's conflicts have been modified. - - :return: Whether this document's conflicts have been modified. - :rtype: bool - """ - return self._conflicts is not None and \ - self._modified_conflicts is True - def _get_couch_rev(self): return self._couch_rev @@ -160,6 +158,14 @@ class CouchDocument(SoledadDocument): couch_rev = property(_get_couch_rev, _set_couch_rev) + def _get_transactions(self): + return self._transactions + + def _set_transactions(self, rev): + self._transactions = rev + + transactions = property(_get_transactions, _set_transactions) + # monkey-patch the u1db http app to use CouchDocument http_app.Document = CouchDocument @@ -225,6 +231,94 @@ def raise_server_error(exc, ddoc_path): raise errors.DesignDocUnknownError(path) +class MultipartWriter(object): + """ + A multipart writer adapted from python-couchdb's one so we can PUT + documents using couch's multipart PUT. + + This stripped down version does not allow for nested structures, and + contains only the essential things we need to PUT SoledadDocuments to the + couch backend. + """ + + CRLF = '\r\n' + + def __init__(self, fileobj, headers=None, boundary=None): + """ + Initialize the multipart writer. + """ + self.fileobj = fileobj + if boundary is None: + boundary = self._make_boundary() + self._boundary = boundary + self._build_headers('related', headers) + + def add(self, mimetype, content, headers={}): + """ + Add a part to the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + self.fileobj.write(self.CRLF) + headers['Content-Type'] = mimetype + self._write_headers(headers) + if content: + # XXX: throw an exception if a boundary appears in the content?? + self.fileobj.write(content) + self.fileobj.write(self.CRLF) + + def close(self): + """ + Close the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + # be careful not to have anything after '--', otherwise old couch + # versions (including bigcouch) will fail. + self.fileobj.write('--') + + def _make_boundary(self): + """ + Create a boundary to discern multi parts. + """ + try: + from uuid import uuid4 + return '==' + uuid4().hex + '==' + except ImportError: + from random import randrange + token = randrange(sys.maxint) + format = '%%0%dd' % len(repr(sys.maxint - 1)) + return '===============' + (format % token) + '==' + + def _write_headers(self, headers): + """ + Write a part header in the buffer stream. + """ + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.fileobj.write(name) + self.fileobj.write(': ') + self.fileobj.write(value) + self.fileobj.write(self.CRLF) + self.fileobj.write(self.CRLF) + + def _build_headers(self, subtype, headers): + """ + Build the main headers of the multipart stream. + + This is here so we can send headers separete from content using + python-couchdb API. + """ + self.headers = {} + self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \ + (subtype, self._boundary) + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.headers[name] = value + + class CouchDatabase(CommonBackend): """ A U1DB implementation that uses CouchDB as its persistence layer. @@ -564,8 +658,12 @@ class CouchDatabase(CommonBackend): and '_attachments' in result \ and 'u1db_conflicts' in result['_attachments']: doc.has_conflicts = True + #doc.conflicts = self._build_conflicts( + # json.loads(result['_attachments']['u1db_conflicts'])) # store couch revision doc.couch_rev = result['_rev'] + # store transactions + doc.transactions = result['u1db_transactions'] return doc def get_doc(self, doc_id, include_deleted=False): @@ -616,6 +714,10 @@ class CouchDatabase(CommonBackend): """ Put the document in the Couch backend database. + Note that C{old_doc} must have been fetched with the parameter + C{check_for_conflicts} equal to True, so we can properly update the + new document using the conflict information from the old one. + :param old_doc: The old document version. :type old_doc: CouchDocument :param doc: The document to be put. @@ -637,45 +739,61 @@ class CouchDatabase(CommonBackend): design document for an yet unknown reason. """ - trans_id = self._allocate_transaction_id() - # encode content - content = doc.get_json() - if content is not None: - content = binascii.b2a_base64(content)[:-1] # exclude trailing \n - # encode conflicts - conflicts = None - update_conflicts = doc.modified_conflicts() - if update_conflicts is True: - if doc.has_conflicts: - conflicts = binascii.b2a_base64( - json.dumps( - map(lambda cdoc: (cdoc.rev, cdoc.content), - doc.get_conflicts())) - )[:-1] # exclude \n - # perform the request - ddoc_path = ['_design', 'docs', '_update', 'put', doc.doc_id] - resource = self._database.resource(*ddoc_path) + attachments = {} # we save content and conflicts as attachments + parts = [] # and we put it using couch's multipart PUT + # save content as attachment + if doc.is_tombstone() is False: + content = doc.get_json() + attachments['u1db_content'] = { + 'follows': True, + 'content_type': 'application/octet-stream', + 'length': len(content), + } + parts.append(content) + # save conflicts as attachment + if doc.has_conflicts is True: + conflicts = json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + attachments['u1db_conflicts'] = { + 'follows': True, + 'content_type': 'application/octet-stream', + 'length': len(conflicts), + } + parts.append(conflicts) + # store old transactions, if any + transactions = old_doc.transactions[:] if old_doc is not None else [] + # create a new transaction id and timestamp it so the transaction log + # is consistent when querying the database. + transactions.append( + # here we store milliseconds to keep consistent with javascript + # Date.prototype.getTime() which was used before inside a couchdb + # update handler. + (int(time.time() * 1000), + self._allocate_transaction_id())) + # build the couch document + couch_doc = { + '_id': doc.doc_id, + 'u1db_rev': doc.rev, + 'u1db_transactions': transactions, + '_attachments': attachments, + } + # if we are updating a doc we have to add the couch doc revision + if old_doc is not None: + couch_doc['_rev'] = old_doc.couch_rev + # prepare the multipart PUT + buf = StringIO() + envelope = MultipartWriter(buf) + envelope.add('application/json', json.dumps(couch_doc)) + for part in parts: + envelope.add('application/octet-stream', part) + envelope.close() + # try to save and fail if there's a revision conflict try: - response = resource.put_json( - body={ - 'couch_rev': old_doc.couch_rev - if old_doc is not None else None, - 'u1db_rev': doc.rev, - 'content': content, - 'trans_id': trans_id, - 'conflicts': conflicts, - 'update_conflicts': update_conflicts, - }, - headers={'content-type': 'application/json'}) - # the document might have been updated in between, so we check for - # the return message - msg = response[2].read() - if msg == 'ok': - return - elif msg == 'revision conflict': - raise RevisionConflict() - except ResourceNotFound as e: - raise_missing_design_doc_error(e, ddoc_path) + self._database.resource.put_json( + doc.doc_id, body=buf.getvalue(), headers=envelope.headers) + except ResourceConflict: + raise RevisionConflict() def put_doc(self, doc): """ @@ -810,6 +928,25 @@ class CouchDatabase(CommonBackend): self._put_doc(old_doc, doc) return new_rev + def _build_conflicts(self, doc_id, attached_conflicts): + """ + Build the conflicted documents list from the conflicts attachment + fetched from a couch document. + + :param attached_conflicts: The document's conflicts as fetched from a + couch document attachment. + :type attached_conflicts: dict + """ + conflicts = [] + for doc_rev, content in attached_conflicts: + doc = self._factory(doc_id, doc_rev) + if content is None: + doc.make_tombstone() + else: + doc.content = content + conflicts.append(doc) + return conflicts + def _get_conflicts(self, doc_id, couch_rev=None): """ Get the conflicted versions of a document. @@ -830,16 +967,8 @@ class CouchDatabase(CommonBackend): resource = self._database.resource(doc_id, 'u1db_conflicts') try: response = resource.get_json(**params) - conflicts = [] - # build the conflicted versions - for doc_rev, content in json.loads(response[2].read()): - doc = self._factory(doc_id, doc_rev) - if content is None: - doc.make_tombstone() - else: - doc.content = content - conflicts.append(doc) - return conflicts + return self._build_conflicts( + doc_id, json.loads(response[2].read())) except ResourceNotFound: return [] @@ -970,7 +1099,7 @@ class CouchDatabase(CommonBackend): serialized string. :type my_content: str """ - doc.ensure_fetch_conflicts(self._get_conflicts) + doc._ensure_fetch_conflicts(self._get_conflicts) doc.add_conflict( self._factory(doc_id=doc.doc_id, rev=my_doc_rev, json=my_content)) @@ -988,7 +1117,7 @@ class CouchDatabase(CommonBackend): :param conflict_revs: A list of the revisions to be deleted. :param conflict_revs: [str] """ - doc.ensure_fetch_conflicts(self._get_conflicts) + doc._ensure_fetch_conflicts(self._get_conflicts) doc.delete_conflicts(conflict_revs) def _prune_conflicts(self, doc, doc_vcr): @@ -1150,7 +1279,7 @@ class CouchDatabase(CommonBackend): if cur_doc is not None: doc.couch_rev = cur_doc.couch_rev # fetch conflicts because we will eventually manipulate them - doc.ensure_fetch_conflicts(self._get_conflicts) + doc._ensure_fetch_conflicts(self._get_conflicts) # from now on, it works just like u1db sqlite backend doc_vcr = vectorclock.VectorClockRev(doc.rev) if cur_doc is None: diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js deleted file mode 100644 index 5a4647de..00000000 --- a/common/src/leap/soledad/common/ddocs/docs/updates/put.js +++ /dev/null @@ -1,64 +0,0 @@ -function(doc, req){ - /* we expect to receive the following in `req.body`: - * { - * 'couch_rev': '', - * 'u1db_rev': '', - * 'content': '', - * 'trans_id': '' - * 'conflicts': '', - * 'update_conflicts': - * } - */ - var body = JSON.parse(req.body); - - // create a new document document - if (!doc) { - doc = {} - doc['_id'] = req['id']; - } - // or fail if couch revisions do not match - else if (doc['_rev'] != body['couch_rev']) { - // of fail if revisions do not match - return [null, 'revision conflict'] - } - - // store u1db rev - doc.u1db_rev = body['u1db_rev']; - - // save content as attachment - if (body['content'] != null) { - // save u1db content as attachment - if (!doc._attachments) - doc._attachments = {}; - doc._attachments.u1db_content = { - content_type: "application/octet-stream", - data: body['content'] // should be base64 encoded - }; - } - // or delete the attachment if document is tombstone - else if (doc._attachments && - doc._attachments.u1db_content) - delete doc._attachments.u1db_content; - - // store the transaction id - if (!doc.u1db_transactions) - doc.u1db_transactions = []; - var d = new Date(); - doc.u1db_transactions.push([d.getTime(), body['trans_id']]); - - // save conflicts as attachment if they were sent - if (body['update_conflicts']) - if (body['conflicts'] != null) { - if (!doc._attachments) - doc._attachments = {}; - doc._attachments.u1db_conflicts = { - content_type: "application/octet-stream", - data: body['conflicts'] // should be base64 encoded - } - } else { - if(doc._attachments && doc._attachments.u1db_conflicts) - delete doc._attachments.u1db_conflicts - } - - return [doc, 'ok']; -} diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index dc0ea906..97d744e4 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -495,10 +495,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocError, self.db._get_transaction_log) - # create_doc() - self.assertRaises( - errors.MissingDesignDocError, - self.db.create_doc, {}) # whats_changed() self.assertRaises( errors.MissingDesignDocError, @@ -645,10 +641,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocDeletedError, self.db._get_transaction_log) - # create_doc() - self.assertRaises( - errors.MissingDesignDocDeletedError, - self.db.create_doc, {}) # whats_changed() self.assertRaises( errors.MissingDesignDocDeletedError, -- cgit v1.2.3 From 981eca6e00555e00c2f04621d6caad328ccc0363 Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 10 Mar 2014 12:20:52 -0300 Subject: Make resolve_doc() not use a design document. --- common/src/leap/soledad/common/couch.py | 54 ++++++++++++---------- .../common/ddocs/docs/updates/resolve_doc.js | 39 ---------------- common/src/leap/soledad/common/tests/test_couch.py | 16 ------- 3 files changed, 29 insertions(+), 80 deletions(-) delete mode 100644 common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 09fe1ca3..456d4fdf 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -123,6 +123,16 @@ class CouchDocument(SoledadDocument): """ return self._conflicts + def set_conflicts(self, conflicts): + """ + Set the conflicted versions of the document. + + :param conflicts: The conflicted versions of the document. + :type conflicts: list + """ + self._conflicts = conflicts + self.has_conflicts = len(self._conflicts) > 0 + def add_conflict(self, doc): """ Add a conflict to this document. @@ -658,8 +668,11 @@ class CouchDatabase(CommonBackend): and '_attachments' in result \ and 'u1db_conflicts' in result['_attachments']: doc.has_conflicts = True - #doc.conflicts = self._build_conflicts( - # json.loads(result['_attachments']['u1db_conflicts'])) + doc.set_conflicts( + self._build_conflicts( + doc.doc_id, + json.loads(binascii.a2b_base64( + result['_attachments']['u1db_conflicts']['data'])))) # store couch revision doc.couch_rev = result['_rev'] # store transactions @@ -1196,33 +1209,24 @@ class CouchDatabase(CommonBackend): conflicted_doc_revs) superseded_revs = set(conflicted_doc_revs) doc.rev = new_rev + # this backend stores conflicts as properties of the documents, so we + # have to copy these conflicts over to the document being updated. if cur_doc.rev in superseded_revs: + # the newer doc version will supersede the one in the database, so + # we copy conflicts before updating the backend. + doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over. self._delete_conflicts(doc, superseded_revs) self._put_doc(cur_doc, doc) else: - self._add_conflict(doc, new_rev, doc.get_json()) - self._delete_conflicts(doc, superseded_revs) - # perform request to resolve document in server - ddoc_path = [ - '_design', 'docs', '_update', 'resolve_doc', doc.doc_id - ] - resource = self._database.resource(*ddoc_path) - conflicts = None - if doc.has_conflicts: - conflicts = binascii.b2a_base64( - json.dumps( - map(lambda cdoc: (cdoc.rev, cdoc.content), - doc.get_conflicts())) - )[:-1] # exclude \n - try: - response = resource.put_json( - body={ - 'couch_rev': cur_doc.couch_rev, - 'conflicts': conflicts, - }, - headers={'content-type': 'application/json'}) - except ResourceNotFound as e: - raise_missing_design_doc_error(e, ddoc_path) + # the newer doc version does not supersede the one in the + # database, so we will add a conflict to the database and copy + # those over to the document the user has in her hands. + self._add_conflict(cur_doc, new_rev, doc.get_json()) + self._delete_conflicts(cur_doc, superseded_revs) + self._put_doc(cur_doc, cur_doc) # just update conflicts + # backend has been updated with current conflicts, now copy them + # to the current document. + doc.set_conflicts(cur_doc.get_conflicts()) def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, replica_trans_id=''): diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js deleted file mode 100644 index 7ba66cf8..00000000 --- a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js +++ /dev/null @@ -1,39 +0,0 @@ -function(doc, req){ - /* we expect to receive the following in `req.body`: - * { - * 'couch_rev': '', - * 'conflicts': '', - * } - */ - var body = JSON.parse(req.body); - - // fail if no document was given - if (!doc) { - return [null, 'document does not exist'] - } - - // fail if couch revisions do not match - if (body['couch_rev'] != null - && doc['_rev'] != body['couch_rev']) { - return [null, 'revision conflict'] - } - - // fail if conflicts were not sent - if (body['conflicts'] == null) - return [null, 'missing conflicts'] - - // save conflicts as attachment if they were sent - if (body['conflicts'] != null) { - if (!doc._attachments) - doc._attachments = {}; - doc._attachments.u1db_conflicts = { - content_type: "application/octet-stream", - data: body['conflicts'] // should be base64 encoded - } - } - // or delete attachment if there are no conflicts - else if (doc._attachments && doc._attachments.u1db_conflicts) - delete doc._attachments.u1db_conflicts; - - return [doc, 'ok']; -} diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 97d744e4..86bb4b93 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -503,14 +503,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocError, self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) - # fake a conflict so we can test resolve_doc() - first_rev = self.db._allocate_doc_rev(None) - doc = couch.CouchDocument( - doc_id='mydoc', rev=self.db._allocate_doc_rev(first_rev)) - self.db._get_doc = Mock(return_value=doc) - self.assertRaises( - errors.MissingDesignDocError, - self.db.resolve_doc, doc, [first_rev]) def test_missing_design_doc_functions_raises(self): """ @@ -649,14 +641,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase): self.assertRaises( errors.MissingDesignDocDeletedError, self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) - # fake a conflict so we can test resolve_doc() - first_rev = self.db._allocate_doc_rev(None) - doc = couch.CouchDocument( - doc_id='mydoc', rev=self.db._allocate_doc_rev(first_rev)) - self.db._get_doc = Mock(return_value=doc) - self.assertRaises( - errors.MissingDesignDocDeletedError, - self.db.resolve_doc, doc, [first_rev]) load_tests = tests.load_with_scenarios -- cgit v1.2.3 From c332b78995334a8c49c788e2fd3fa15eb8936ca8 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 12 Mar 2014 19:31:58 -0300 Subject: Prevent Soledad server from creating or deleting couch databases (#3502). --- common/src/leap/soledad/common/couch.py | 17 ++++--- .../tests/test_couch_operations_atomicity.py | 6 +++ .../src/leap/soledad/common/tests/test_server.py | 54 ++++++++++++++-------- 3 files changed, 52 insertions(+), 25 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 456d4fdf..11b77938 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -35,7 +35,6 @@ from couchdb.client import Server from couchdb.http import ( ResourceConflict, ResourceNotFound, - Unauthorized, ServerError, Session, ) @@ -48,6 +47,7 @@ from u1db.errors import ( ConflictedDoc, DocumentDoesNotExist, DocumentAlreadyDeleted, + Unauthorized, ) from u1db.backends import CommonBackend, CommonSyncTarget from u1db.remote import http_app @@ -1451,7 +1451,6 @@ class CouchServerState(ServerState): :return: The CouchDatabase object. :rtype: CouchDatabase """ - # TODO: open couch return CouchDatabase.open_database( self._couch_url + '/' + dbname, create=False) @@ -1460,16 +1459,20 @@ class CouchServerState(ServerState): """ Ensure couch database exists. + Usually, this method is used by the server to ensure the existence of + a database. In our setup, the Soledad user that accesses the underlying + couch server should never have permission to create (or delete) + databases. But, in case it ever does, by raising an exception here we + have one more guarantee that no modified client will be able to + enforce creation of a database when syncing. + :param dbname: The name of the database to ensure. :type dbname: str :return: The CouchDatabase object and the replica uid. :rtype: (CouchDatabase, str) """ - db = CouchDatabase.open_database( - self._couch_url + '/' + dbname, - create=True) - return db, db._replica_uid + raise Unauthorized() def delete_database(self, dbname): """ @@ -1478,7 +1481,7 @@ class CouchServerState(ServerState): :param dbname: The name of the database to delete. :type dbname: str """ - CouchDatabase.delete_database(self._couch_url + '/' + dbname) + raise Unauthorized() def _set_couch_url(self, url): """ diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index 5384d465..3c457cc5 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -33,11 +33,17 @@ from leap.soledad.common.tests.test_target import ( make_leap_document_for_test, token_leap_sync_target, ) +from leap.soledad.common.tests.test_server import _couch_ensure_database REPEAT_TIMES = 20 +# monkey path CouchServerState so it can ensure databases. + +CouchServerState.ensure_database = _couch_ensure_database + + class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): @staticmethod diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index 06595ed2..f8d2a64f 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -51,6 +51,17 @@ from leap.soledad.server import SoledadApp, LockResource from leap.soledad.server.auth import URLToAuthorization +# monkey path CouchServerState so it can ensure databases. + +def _couch_ensure_database(self, dbname): + db = CouchDatabase.open_database( + self._couch_url + '/' + dbname, + create=True) + return db, db._replica_uid + +CouchServerState.ensure_database = _couch_ensure_database + + class ServerAuthorizationTestCase(BaseLeapTest): """ Tests related to Soledad server authorization. @@ -340,15 +351,16 @@ class EncryptedSyncTestCase( _, doclist = sol1.get_all_docs() self.assertEqual([], doclist) doc1 = sol1.create_doc(json.loads(simple_doc)) - # sync with server - sol1._server_url = self.getURL() - sol1.sync() - # assert doc was sent to couch db + # ensure remote db exists before syncing db = CouchDatabase( self._couch_url, # the name of the user database is "user-". 'user-user-uuid', ) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # assert doc was sent to couch db _, doclist = db.get_all_docs() self.assertEqual(1, len(doclist)) couchdoc = doclist[0] @@ -395,15 +407,16 @@ class EncryptedSyncTestCase( _, doclist = sol1.get_all_docs() self.assertEqual([], doclist) doc1 = sol1.create_doc(json.loads(simple_doc)) - # sync with server - sol1._server_url = self.getURL() - sol1.sync() - # assert doc was sent to couch db + # ensure remote db exists before syncing db = CouchDatabase( self._couch_url, # the name of the user database is "user-". 'user-user-uuid', ) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # assert doc was sent to couch db _, doclist = db.get_all_docs() self.assertEqual(1, len(doclist)) couchdoc = doclist[0] @@ -454,6 +467,12 @@ class EncryptedSyncTestCase( self.assertEqual([], doclist) content = binascii.hexlify(os.urandom(length/2)) # len() == length doc1 = sol1.create_doc({'data': content}) + # ensure remote db exists before syncing + db = CouchDatabase( + self._couch_url, + # the name of the user database is "user-". + 'user-user-uuid', + ) # sync with server sol1._server_url = self.getURL() sol1.sync() @@ -473,11 +492,6 @@ class EncryptedSyncTestCase( # assert incoming doc is equal to the first sent doc self.assertEqual(doc1, doc2) # delete remote database - db = CouchDatabase( - self._couch_url, - # the name of the user database is "user-". - 'user-user-uuid', - ) db.delete_database() @@ -497,6 +511,12 @@ class EncryptedSyncTestCase( # create many small files for i in range(0, number_of_docs): sol1.create_doc(json.loads(simple_doc)) + # ensure remote db exists before syncing + db = CouchDatabase( + self._couch_url, + # the name of the user database is "user-". + 'user-user-uuid', + ) # sync with server sol1._server_url = self.getURL() sol1.sync() @@ -516,11 +536,6 @@ class EncryptedSyncTestCase( for doc in doclist: self.assertEqual(sol1.get_doc(doc.doc_id), doc) # delete remote database - db = CouchDatabase( - self._couch_url, - # the name of the user database is "user-". - 'user-user-uuid', - ) db.delete_database() class LockResourceTestCase( @@ -542,6 +557,9 @@ class LockResourceTestCase( CouchDBTestCase.setUp(self) self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") self._couch_url = 'http://localhost:' + str(self.wrapper.port) + # create the databases + CouchDatabase(self._couch_url, 'shared') + CouchDatabase(self._couch_url, 'tokens') self._state = CouchServerState( self._couch_url, 'shared', 'tokens') -- cgit v1.2.3 From b85c9797827731b86798adc958cee0c114667918 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 12 Mar 2014 17:33:11 -0300 Subject: Remove check for couch permissions. --- common/src/leap/soledad/common/couch.py | 67 --------------------------------- 1 file changed, 67 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 11b77938..d16563d3 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -1354,14 +1354,6 @@ class CouchSyncTarget(CommonSyncTarget): source_replica_transaction_id) -class NotEnoughCouchPermissions(Exception): - """ - Raised when failing to assert for enough permissions on underlying Couch - Database. - """ - pass - - class CouchServerState(ServerState): """ Inteface of the WSGI server with the CouchDB backend. @@ -1381,65 +1373,6 @@ class CouchServerState(ServerState): self._couch_url = couch_url self._shared_db_name = shared_db_name self._tokens_db_name = tokens_db_name - try: - self._check_couch_permissions() - except NotEnoughCouchPermissions: - logger.error("Not enough permissions on underlying couch " - "database (%s)." % self._couch_url) - except (socket.error, socket.gaierror, socket.herror, - socket.timeout), e: - logger.error("Socket problem while trying to reach underlying " - "couch database: (%s, %s)." % - (self._couch_url, e)) - - def _check_couch_permissions(self): - """ - Assert that Soledad Server has enough permissions on the underlying - couch database. - - Soledad Server has to be able to do the following in the couch server: - - * Create, read and write from/to 'shared' db. - * Create, read and write from/to 'user-' dbs. - * Read from 'tokens' db. - - This function tries to perform the actions above using the "low level" - couch library to ensure that Soledad Server can do everything it needs - on the underlying couch database. - - :param couch_url: The URL of the couch database. - :type couch_url: str - - @raise NotEnoughCouchPermissions: Raised in case there are not enough - permissions to read/write/create the needed couch databases. - :rtype: bool - """ - - def _open_couch_db(dbname): - server = Server(url=self._couch_url) - try: - server[dbname] - except ResourceNotFound: - server.create(dbname) - return server[dbname] - - def _create_delete_test_doc(db): - doc_id, _ = db.save({'test': 'document'}) - doc = db[doc_id] - db.delete(doc) - - try: - # test read/write auth for shared db - _create_delete_test_doc( - _open_couch_db(self._shared_db_name)) - # test read/write auth for user- db - _create_delete_test_doc( - _open_couch_db('%stest-db' % USER_DB_PREFIX)) - # test read auth for tokens db - tokensdb = _open_couch_db(self._tokens_db_name) - tokensdb.info() - except Unauthorized: - raise NotEnoughCouchPermissions(self._couch_url) def open_database(self, dbname): """ -- cgit v1.2.3 From 35bf9f390db15e74ad4e37b6fc7fac9d6d7ca658 Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 14 Mar 2014 15:13:42 -0300 Subject: Parallelize get_docs on couch backend (#5008). --- common/src/leap/soledad/common/couch.py | 78 +++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index d16563d3..a4f30cf3 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -26,6 +26,7 @@ import binascii import socket import time import sys +import threading from StringIO import StringIO @@ -334,6 +335,47 @@ class CouchDatabase(CommonBackend): A U1DB implementation that uses CouchDB as its persistence layer. """ + # We spawn threads to parallelize the CouchDatabase.get_docs() method + MAX_GET_DOCS_THREADS = 20 + + class _GetDocThread(threading.Thread): + """ + A thread that gets a document from a database. + + TODO: switch this for a twisted deferred to thread. This depends on + replacing python-couchdb for paisley in this module. + """ + + def __init__(self, db, doc_id, check_for_conflicts, + release_fun): + """ + :param db: The database from where to get the document. + :type db: u1db.Database + :param doc_id: The doc_id of the document to be retrieved. + :type doc_id: str + :param check_for_conflicts: Whether the get_doc() method should + check for existing conflicts. + :type check_for_conflicts: bool + :param release_fun: A function that releases a semaphore, to be + called after the document is fetched. + :type release_fun: function + """ + threading.Thread.__init__(self) + self._db = db + self._doc_id = doc_id + self._check_for_conflicts = check_for_conflicts + self._release_fun = release_fun + self._doc = None + + def run(self): + """ + Fetch the document, store it as a property, and call the release + function. + """ + self._doc = self._db._get_doc( + self._doc_id, self._check_for_conflicts) + self._release_fun() + @classmethod def open_database(cls, url, create): """ @@ -401,6 +443,9 @@ class CouchDatabase(CommonBackend): self._set_replica_uid(replica_uid) if ensure_ddocs: self.ensure_ddocs_on_db() + # initialize a thread pool for parallelizing get_docs() + self._sem_pool = threading.BoundedSemaphore( + value=self.MAX_GET_DOCS_THREADS) def ensure_ddocs_on_db(self): """ @@ -1331,6 +1376,39 @@ class CouchDatabase(CommonBackend): old_doc.has_conflicts = doc.has_conflicts return state, self._get_generation() + def get_docs(self, doc_ids, check_for_conflicts=True, + include_deleted=False): + """ + Get the JSON content for many documents. + + :param doc_ids: A list of document identifiers. + :type doc_ids: list + :param check_for_conflicts: If set to False, then the conflict check + will be skipped, and 'None' will be + returned instead of True/False. + :type check_for_conflictsa: bool + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :return: iterable giving the Document object for each document id + in matching doc_ids order. + :rtype: iterable + """ + # spawn threads to retrieve docs + threads = [] + for doc_id in doc_ids: + self._sem_pool.acquire() + t = self._GetDocThread(self, doc_id, check_for_conflicts, + self._sem_pool.release) + t.start() + threads.append(t) + # join threads and yield docs + for t in threads: + t.join() + if t._doc.is_tombstone() and not include_deleted: + continue + yield t._doc + class CouchSyncTarget(CommonSyncTarget): """ -- cgit v1.2.3 From 288f506daed66e4acb08617dc1db127da4d36241 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 13 Mar 2014 15:08:55 -0300 Subject: Fix raising of auth token errors (#5191). --- common/src/leap/soledad/common/errors.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index 446c4c75..3a7eadd2 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -50,24 +50,16 @@ class SoledadError(errors.U1DBError): # Authorization errors # -@register_exception -class MissingAuthTokenError(errors.Unauthorized): - """ - Exception raised when failing to get authorization for some action because - the auth token is missing in the tokens db. - """ - - wire_description = "missing token" - status = 401 - @register_exception class InvalidAuthTokenError(errors.Unauthorized): """ Exception raised when failing to get authorization for some action because - the provided token is different from the one in the tokens db. + the provided token either does not exist in the tokens database, has a + distinct structure from the expected one, or is associated with a user + with a distinct uuid than the one provided by the client. """ - wire_descrition = "token mismatch" + wire_descrition = "invalid auth token" status = 401 # -- cgit v1.2.3 From f5b30eb5ae4ac5cc61163e0167e92827310d0c13 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 26 Mar 2014 12:09:10 -0300 Subject: Remove check for design docs from couch server state init (#5387). --- common/src/leap/soledad/common/couch.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index a4f30cf3..c0ebc425 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -377,7 +377,7 @@ class CouchDatabase(CommonBackend): self._release_fun() @classmethod - def open_database(cls, url, create): + def open_database(cls, url, create, ensure_ddocs=False): """ Open a U1DB database using CouchDB as backend. @@ -385,6 +385,8 @@ class CouchDatabase(CommonBackend): :type url: str :param create: should the replica be created if it does not exist? :type create: bool + :param ensure_ddocs: Ensure that the design docs exist on server. + :type ensure_ddocs: bool :return: the database instance :rtype: CouchDatabase @@ -401,7 +403,7 @@ class CouchDatabase(CommonBackend): except ResourceNotFound: if not create: raise DatabaseDoesNotExist() - return cls(url, dbname) + return cls(url, dbname, ensure_ddocs=ensure_ddocs) def __init__(self, url, dbname, replica_uid=None, full_commit=True, session=None, ensure_ddocs=True): @@ -1464,7 +1466,8 @@ class CouchServerState(ServerState): """ return CouchDatabase.open_database( self._couch_url + '/' + dbname, - create=False) + create=False, + ensure_ddocs=False) def ensure_database(self, dbname): """ -- cgit v1.2.3 From e2a3431c13773af43b50797566feedc15fdf94ce Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 27 Mar 2014 10:42:26 -0300 Subject: Retry recording sync info on couch backend (#5388). --- common/src/leap/soledad/common/couch.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index c0ebc425..ebe8477f 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -30,6 +30,7 @@ import threading from StringIO import StringIO +from collections import defaultdict from couchdb.client import Server @@ -338,6 +339,8 @@ class CouchDatabase(CommonBackend): # We spawn threads to parallelize the CouchDatabase.get_docs() method MAX_GET_DOCS_THREADS = 20 + update_handler_lock = defaultdict(threading.Lock) + class _GetDocThread(threading.Thread): """ A thread that gets a document from a database. @@ -1133,13 +1136,14 @@ class CouchDatabase(CommonBackend): ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log'] res = self._database.resource(*ddoc_path) try: - res.put_json( - body={ - 'other_replica_uid': other_replica_uid, - 'other_generation': other_generation, - 'other_transaction_id': other_transaction_id, - }, - headers={'content-type': 'application/json'}) + with CouchDatabase.update_handler_lock[self._get_replica_uid()]: + res.put_json( + body={ + 'other_replica_uid': other_replica_uid, + 'other_generation': other_generation, + 'other_transaction_id': other_transaction_id, + }, + headers={'content-type': 'application/json'}) except ResourceNotFound as e: raise_missing_design_doc_error(e, ddoc_path) @@ -1367,7 +1371,7 @@ class CouchDatabase(CommonBackend): if save_conflict: self._force_doc_sync_conflict(doc) if replica_uid is not None and replica_gen is not None: - self._do_set_replica_gen_and_trans_id( + self._set_replica_gen_and_trans_id( replica_uid, replica_gen, replica_trans_id) # update info old_doc.rev = doc.rev -- cgit v1.2.3 From c3f5e9a6b340db969844a662c27fcb2b3f7596b9 Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 4 Apr 2014 11:55:35 -0300 Subject: Renew couch connection session after multipart PUT (#5448). --- common/src/leap/soledad/common/couch.py | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index ebe8477f..1bb84985 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -855,6 +855,7 @@ class CouchDatabase(CommonBackend): try: self._database.resource.put_json( doc.doc_id, body=buf.getvalue(), headers=envelope.headers) + self._renew_couch_session() except ResourceConflict: raise RevisionConflict() @@ -1415,6 +1416,15 @@ class CouchDatabase(CommonBackend): continue yield t._doc + def _renew_couch_session(self): + """ + Create a new couch connection session. + + This is a workaround for #5448. Will not be needed once bigcouch is + merged with couchdb. + """ + self._database.resource.session = Session(timeout=COUCH_TIMEOUT) + class CouchSyncTarget(CommonSyncTarget): """ -- cgit v1.2.3 From 86890a496ba05951573dad7f69fa2840087994d2 Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 4 Apr 2014 15:35:20 -0300 Subject: Preload time.strptime() to avoid multi thread problem on couch backend get_docs() method (#5449). --- common/src/leap/soledad/common/couch.py | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'common/src/leap/soledad') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 1bb84985..8e8613a1 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -1401,6 +1401,17 @@ class CouchDatabase(CommonBackend): in matching doc_ids order. :rtype: iterable """ + # Workaround for: + # + # http://bugs.python.org/issue7980 + # https://leap.se/code/issues/5449 + # + # python-couchdb uses time.strptime, which is not thread safe. In + # order to avoid the problem described on the issues above, we preload + # strptime here by evaluating the conversion of an arbitrary date. + # This will not be needed when/if we switch from python-couchdb to + # paisley. + time.strptime('Mar 4 1917', '%b %d %Y') # spawn threads to retrieve docs threads = [] for doc_id in doc_ids: -- cgit v1.2.3