diff options
author | Tomás Touceda <chiiph@leap.se> | 2014-04-04 16:34:57 -0300 |
---|---|---|
committer | Tomás Touceda <chiiph@leap.se> | 2014-04-04 16:34:57 -0300 |
commit | ce22976cc0e203e53799e771aa5e3717d498cc5c (patch) | |
tree | 8c69733f1f8a83ac83e40bf7e522a5fe8eae9b50 /common | |
parent | deb78a5f3502ece98ec3e0b70f93025c4a1b3da5 (diff) | |
parent | a3fed4d42ab4a7be7bc7ebe86b35805ac73d62de (diff) |
Diffstat (limited to 'common')
25 files changed, 2756 insertions, 927 deletions
diff --git a/common/setup.py b/common/setup.py index bcc2b4b3..e142d958 100644 --- a/common/setup.py +++ b/common/setup.py @@ -103,7 +103,163 @@ def get_versions(default={}, verbose=False): f.write(subst_template) +# +# Couch backend design docs file generation. +# + +from os import listdir +from os.path import realpath, dirname, isdir, join, isfile, basename +import json +import binascii + + +old_cmd_sdist = cmdclass["sdist"] + + +def build_ddocs_py(basedir=None, with_src=True): + """ + Build `ddocs.py` file. + + For ease of development, couch backend design documents are stored as + `.js` files in subdirectories of `src/leap/soledad/common/ddocs`. This + function scans that directory for javascript files, builds the design + documents structure, and encode those structures in the `ddocs.py` file. + + This function is used when installing in develop mode, building or + generating source distributions (see the next classes and the `cmdclass` + setuptools parameter. + + This funciton uses the following conventions to generate design documents: + + - Design documents are represented by directories in the form + `<prefix>/<ddoc>`, there prefix is the `src/leap/soledad/common/ddocs` + directory. + - Design document directories might contain `views`, `lists` and + `updates` subdirectories. + - Views subdirectories must contain a `map.js` file and may contain a + `reduce.js` file. + - List and updates subdirectories may contain any number of javascript + files (i.e. ending in `.js`) whose names will be mapped to the + corresponding list or update function name. + """ + cur_pwd = dirname(realpath(__file__)) + common_path = ('src', 'leap', 'soledad', 'common') + dest_common_path = common_path + if not with_src: + dest_common_path = common_path[1:] + prefix = join(cur_pwd, *common_path) + + dest_prefix = prefix + if basedir is not None: + # we're bulding a sdist + dest_prefix = join(basedir, *dest_common_path) + + ddocs_prefix = join(prefix, 'ddocs') + ddocs = {} + + # design docs are represented by subdirectories of `ddocs_prefix` + for ddoc in [f for f in listdir(ddocs_prefix) + if isdir(join(ddocs_prefix, f))]: + + ddocs[ddoc] = {'_id': '_design/%s' % ddoc} + + for t in ['views', 'lists', 'updates']: + tdir = join(ddocs_prefix, ddoc, t) + if isdir(tdir): + + ddocs[ddoc][t] = {} + + if t == 'views': # handle views (with map/reduce functions) + for view in [f for f in listdir(tdir) + if isdir(join(tdir, f))]: + # look for map.js and reduce.js + mapfile = join(tdir, view, 'map.js') + reducefile = join(tdir, view, 'reduce.js') + mapfun = None + reducefun = None + try: + with open(mapfile) as f: + mapfun = f.read() + except IOError: + pass + try: + with open(reducefile) as f: + reducefun = f.read() + except IOError: + pass + ddocs[ddoc]['views'][view] = {} + + if mapfun is not None: + ddocs[ddoc]['views'][view]['map'] = mapfun + if reducefun is not None: + ddocs[ddoc]['views'][view]['reduce'] = reducefun + + else: # handle lists, updates, etc + for fun in [f for f in listdir(tdir) + if isfile(join(tdir, f))]: + funfile = join(tdir, fun) + funname = basename(funfile).replace('.js', '') + try: + with open(funfile) as f: + ddocs[ddoc][t][funname] = f.read() + except IOError: + pass + # write file containing design docs strings + ddoc_filename = "ddocs.py" + with open(join(dest_prefix, ddoc_filename), 'w') as f: + for ddoc in ddocs: + f.write( + "%s = '%s'\n" % + (ddoc, binascii.b2a_base64(json.dumps(ddocs[ddoc]))[:-1])) + print "Wrote design docs in %s" % (dest_prefix + '/' + ddoc_filename,) + + +from setuptools.command.develop import develop as _cmd_develop + + +class cmd_develop(_cmd_develop): + def run(self): + # versioneer: + versions = versioneer.get_versions(verbose=True) + self._versioneer_generated_versions = versions + # unless we update this, the command will keep using the old version + self.distribution.metadata.version = versions["version"] + _cmd_develop.run(self) + build_ddocs_py() + + +# versioneer powered +old_cmd_sdist = cmdclass["sdist"] + + +class cmd_sdist(old_cmd_sdist): + """ + Generate 'src/leap/soledad/common/ddocs.py' which contains couch design + documents scripts. + """ + def run(self): + old_cmd_sdist.run(self) + + def make_release_tree(self, base_dir, files): + old_cmd_sdist.make_release_tree(self, base_dir, files) + build_ddocs_py(basedir=base_dir) + + +# versioneer powered +old_cmd_build = cmdclass["build"] + + +class cmd_build(old_cmd_build): + def run(self): + old_cmd_build.run(self) + build_ddocs_py(basedir=self.build_lib, with_src=False) + + cmdclass["freeze_debianver"] = freeze_debianver +cmdclass["build"] = cmd_build +cmdclass["sdist"] = cmd_sdist +cmdclass["develop"] = cmd_develop + # XXX add ref to docs diff --git a/common/src/leap/soledad/common/.gitignore b/common/src/leap/soledad/common/.gitignore new file mode 100644 index 00000000..3378c78a --- /dev/null +++ b/common/src/leap/soledad/common/.gitignore @@ -0,0 +1 @@ +ddocs.py diff --git a/common/src/leap/soledad/common/README.txt b/common/src/leap/soledad/common/README.txt new file mode 100644 index 00000000..106efb5e --- /dev/null +++ b/common/src/leap/soledad/common/README.txt @@ -0,0 +1,79 @@ +Soledad common package +====================== + +This package contains Soledad bits used by both server and client. + +Couch U1DB Backend +------------------ + +U1DB backends rely on some atomic operations that modify documents contents +and metadata (conflicts, transaction ids and indexes). The only atomic +operation in Couch is a document put, so every u1db atomic operation has to be +mapped to a couch document put. + +The atomic operations in the U1DB SQLite reference backend implementation may +be identified by the use of a context manager to access the underlying +database. A listing of the methods involved in each atomic operation are +depiced below. The top-level elements correpond to the atomic operations that +have to be mapped, and items on deeper levels of the list have to be +implemented in a way that all changes will be pushed with just one operation. + + * _set_replica_uid + * put_doc: + * _get_doc + * _put_and_update_indexes + * insert/update the document + * insert into transaction log + * delete_doc + * _get_doc + * _put_and_update_indexes + * get_doc_conflicts + * _get_conflicts + * _set_replica_gen_and_trans_id + * _do_set_replica_gen_and_trans_id + * _put_doc_if_newer + * _get_doc + * _validate_source (**) + * _get_replica_gen_and_trans_id + * cases: + * is newer: + * _prune_conflicts (**) + * _has_conflicts + * _delete_conflicts + * _put_and_update_indexes + * same content as: + * _put_and_update_indexes + * conflicted: + * _force_doc_sync_conflict + * _prune_conflicts + * _add_conflict + * _put_and_update_indexes + * _do_set_replica_gen_and_trans_id + * resolve_doc + * _get_doc + * cases: + * doc is superseded + * _put_and_update_indexes + * else + * _add_conflict + * _delete_conflicts + * delete_index + * create_index + +Couch views and update functions are used in order to achieve atomicity on the +Couch backend. Transactions are stored in the `u1db_transactions` field of the +couch document. Document's content and conflicted versions are stored as couch +document attachments with names, respectivelly, `u1db_content` and +`u1db_conflicts`. + +A map of methods and couch query URI can be found on the `./ddocs/README.txt` +document. + +Notes: + + * Currently, the couch backend does not implement indexing, so what is + depicted as `_put_and_update_indexes` above will be found as `_put_doc` in + the backend. + + * Conflict updates are part of document put using couch update functions, + and as such are part of the same atomic operation as document put. diff --git a/common/src/leap/soledad/common/_version.py b/common/src/leap/soledad/common/_version.py index 1d020a14..7d4262b5 100644 --- a/common/src/leap/soledad/common/_version.py +++ b/common/src/leap/soledad/common/_version.py @@ -17,6 +17,7 @@ git_full = "$Format:%H$" import subprocess import sys + def run_command(args, cwd=None, verbose=False): try: # remember shell=False, so use git.cmd on windows, not just git @@ -36,11 +37,10 @@ def run_command(args, cwd=None, verbose=False): return None return stdout - -import sys import re import os.path + def get_expanded_variables(versionfile_source): # the code embedded in _version.py can just fetch the value of these # variables. When used from setup.py, we don't want to import @@ -48,7 +48,7 @@ def get_expanded_variables(versionfile_source): # used from _version.py. variables = {} try: - f = open(versionfile_source,"r") + f = open(versionfile_source, "r") for line in f.readlines(): if line.strip().startswith("git_refnames ="): mo = re.search(r'=\s*"(.*)"', line) @@ -63,12 +63,13 @@ def get_expanded_variables(versionfile_source): pass return variables + def versions_from_expanded_variables(variables, tag_prefix, verbose=False): refnames = variables["refnames"].strip() if refnames.startswith("$Format"): if verbose: print("variables are unexpanded, not using") - return {} # unexpanded, so not in an unpacked git-archive tarball + return {} # unexpanded, so not in an unpacked git-archive tarball refs = set([r.strip() for r in refnames.strip("()").split(",")]) # starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of # just "foo-1.0". If we see a "tag: " prefix, prefer those. @@ -93,13 +94,14 @@ def versions_from_expanded_variables(variables, tag_prefix, verbose=False): r = ref[len(tag_prefix):] if verbose: print("picking %s" % r) - return { "version": r, - "full": variables["full"].strip() } + return {"version": r, + "full": variables["full"].strip()} # no suitable tags, so we use the full revision id if verbose: print("no suitable tags, using full revision id") - return { "version": variables["full"].strip(), - "full": variables["full"].strip() } + return {"version": variables["full"].strip(), + "full": variables["full"].strip()} + def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): # this runs 'git' from the root of the source tree. That either means @@ -116,7 +118,7 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): here = os.path.abspath(__file__) except NameError: # some py2exe/bbfreeze/non-CPython implementations don't do __file__ - return {} # not always correct + return {} # not always correct # versionfile_source is the relative path from the top of the source tree # (where the .git directory might live) to this file. Invert this to find @@ -126,7 +128,16 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): for i in range(len(versionfile_source.split("/"))): root = os.path.dirname(root) else: - root = os.path.dirname(here) + root = os.path.dirname( + os.path.join('..', here)) + + ###################################################### + # XXX patch for our specific configuration with + # the three projects leap.soledad.{common, client, server} + # inside the same repo. + ###################################################### + root = os.path.dirname(os.path.join('..', root)) + if not os.path.exists(os.path.join(root, ".git")): if verbose: print("no .git in %s" % root) @@ -141,7 +152,8 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): return {} if not stdout.startswith(tag_prefix): if verbose: - print("tag '%s' doesn't start with prefix '%s'" % (stdout, tag_prefix)) + print("tag '%s' doesn't start with prefix '%s'" % ( + stdout, tag_prefix)) return {} tag = stdout[len(tag_prefix):] stdout = run_command([GIT, "rev-parse", "HEAD"], cwd=root) @@ -153,7 +165,8 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): return {"version": tag, "full": full} -def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False): +def versions_from_parentdir(parentdir_prefix, versionfile_source, + verbose=False): if IN_LONG_VERSION_PY: # We're running from _version.py. If it's from a source tree # (execute-in-place), we can work upwards to find the root of the @@ -163,7 +176,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False) here = os.path.abspath(__file__) except NameError: # py2exe/bbfreeze/non-CPython don't have __file__ - return {} # without __file__, we have no hope + return {} # without __file__, we have no hope # versionfile_source is the relative path from the top of the source # tree to _version.py. Invert this to find the root from __file__. root = here @@ -180,7 +193,8 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False) dirname = os.path.basename(root) if not dirname.startswith(parentdir_prefix): if verbose: - print("guessing rootdir is '%s', but '%s' doesn't start with prefix '%s'" % + print("guessing rootdir is '%s', but '%s' doesn't start with " + "prefix '%s'" % (root, dirname, parentdir_prefix)) return None return {"version": dirname[len(parentdir_prefix):], "full": ""} @@ -189,8 +203,9 @@ tag_prefix = "" parentdir_prefix = "leap.soledad.common-" versionfile_source = "src/leap/soledad/common/_version.py" + def get_versions(default={"version": "unknown", "full": ""}, verbose=False): - variables = { "refnames": git_refnames, "full": git_full } + variables = {"refnames": git_refnames, "full": git_full} ver = versions_from_expanded_variables(variables, tag_prefix, verbose) if not ver: ver = versions_from_vcs(tag_prefix, versionfile_source, verbose) @@ -200,4 +215,3 @@ def get_versions(default={"version": "unknown", "full": ""}, verbose=False): if not ver: ver = default return ver - diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 1396f4d7..8e8613a1 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -18,189 +18,381 @@ """A U1DB backend that uses CouchDB as its persistence layer.""" -import re import simplejson as json -import socket +import re +import uuid import logging +import binascii +import socket +import time +import sys +import threading -from u1db import errors -from u1db.sync import Synchronizer -from u1db.backends.inmemory import InMemoryIndex -from u1db.remote.server_state import ServerState -from u1db.errors import DatabaseDoesNotExist -from couchdb.client import Server, Document as CouchDocument -from couchdb.http import ResourceNotFound, Unauthorized +from StringIO import StringIO +from collections import defaultdict -from leap.soledad.common import USER_DB_PREFIX -from leap.soledad.common.objectstore import ( - ObjectStoreDatabase, - ObjectStoreSyncTarget, +from couchdb.client import Server +from couchdb.http import ( + ResourceConflict, + ResourceNotFound, + ServerError, + Session, +) +from u1db import query_parser, vectorclock +from u1db.errors import ( + DatabaseDoesNotExist, + InvalidGeneration, + RevisionConflict, + InvalidDocId, + ConflictedDoc, + DocumentDoesNotExist, + DocumentAlreadyDeleted, + Unauthorized, ) +from u1db.backends import CommonBackend, CommonSyncTarget +from u1db.remote import http_app +from u1db.remote.server_state import ServerState + + +from leap.soledad.common import USER_DB_PREFIX, ddocs, errors +from leap.soledad.common.document import SoledadDocument logger = logging.getLogger(__name__) +COUCH_TIMEOUT = 120 # timeout for transfers between Soledad server and Couch + + class InvalidURLError(Exception): """ Exception raised when Soledad encounters a malformed URL. """ -def persistent_class(cls): +class CouchDocument(SoledadDocument): """ - Decorator that modifies a class to ensure u1db metadata persists on - underlying storage. + This is the document used for maintaining the Couch backend. + + A CouchDocument can fetch and manipulate conflicts and also holds a + reference to the couch document revision. This data is used to ensure an + atomic and consistent update of the database. + """ + + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, + syncable=True): + """ + Container for handling a document that is stored in couch backend. + + :param doc_id: The unique document identifier. + :type doc_id: str + :param rev: The revision identifier of the document. + :type rev: str + :param json: The JSON string for this document. + :type json: str + :param has_conflicts: Boolean indicating if this document has conflicts + :type has_conflicts: bool + :param syncable: Should this document be synced with remote replicas? + :type syncable: bool + """ + SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) + self._couch_rev = None + self._conflicts = None + self._transactions = None + + def _ensure_fetch_conflicts(self, get_conflicts_fun): + """ + Ensure conflict data has been fetched from the server. + + :param get_conflicts_fun: A function which, given the document id and + the couch revision, return the conflicted + versions of the current document. + :type get_conflicts_fun: function + """ + if self._conflicts is None: + self._conflicts = get_conflicts_fun(self.doc_id, + couch_rev=self.couch_rev) + self.has_conflicts = len(self._conflicts) > 0 + + def get_conflicts(self): + """ + Get the conflicted versions of the document. + + :return: The conflicted versions of the document. + :rtype: [CouchDocument] + """ + return self._conflicts + + def set_conflicts(self, conflicts): + """ + Set the conflicted versions of the document. + + :param conflicts: The conflicted versions of the document. + :type conflicts: list + """ + self._conflicts = conflicts + self.has_conflicts = len(self._conflicts) > 0 - @param cls: The class that will be modified. - @type cls: type + def add_conflict(self, doc): + """ + Add a conflict to this document. + + :param doc: The conflicted version to be added. + :type doc: CouchDocument + """ + if self._conflicts is None: + raise Exception("Run self._ensure_fetch_conflicts first!") + self._conflicts.append(doc) + self.has_conflicts = len(self._conflicts) > 0 + + def delete_conflicts(self, conflict_revs): + """ + Delete conflicted versions of this document. + + :param conflict_revs: The conflicted revisions to be deleted. + :type conflict_revs: [str] + """ + if self._conflicts is None: + raise Exception("Run self._ensure_fetch_conflicts first!") + conflicts_len = len(self._conflicts) + self._conflicts = filter( + lambda doc: doc.rev not in conflict_revs, + self._conflicts) + self.has_conflicts = len(self._conflicts) > 0 + + def _get_couch_rev(self): + return self._couch_rev + + def _set_couch_rev(self, rev): + self._couch_rev = rev + + couch_rev = property(_get_couch_rev, _set_couch_rev) + + def _get_transactions(self): + return self._transactions + + def _set_transactions(self, rev): + self._transactions = rev + + transactions = property(_get_transactions, _set_transactions) + + +# monkey-patch the u1db http app to use CouchDocument +http_app.Document = CouchDocument + + +def raise_missing_design_doc_error(exc, ddoc_path): + """ + Raise an appropriate exception when catching a ResourceNotFound when + accessing a design document. + + :param exc: The exception cought. + :type exc: ResourceNotFound + :param ddoc_path: A list representing the requested path. + :type ddoc_path: list + + :raise MissingDesignDocError: Raised when tried to access a missing design + document. + :raise MissingDesignDocListFunctionError: Raised when trying to access a + missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a design + document for an yet unknown reason. """ + path = "".join(ddoc_path) + if exc.message[1] == 'missing': + raise errors.MissingDesignDocError(path) + elif exc.message[1] == 'missing function' or \ + exc.message[1].startswith('missing lists function'): + raise errors.MissingDesignDocListFunctionError(path) + elif exc.message[1] == 'missing_named_view': + raise errors.MissingDesignDocNamedViewError(path) + elif exc.message[1] == 'deleted': + raise errors.MissingDesignDocDeletedError(path) + # other errors are unknown for now + raise errors.DesignDocUnknownError("%s: %s" % (path, str(exc.message))) + + +def raise_server_error(exc, ddoc_path): + """ + Raise an appropriate exception when catching a ServerError when + accessing a design document. + + :param exc: The exception cought. + :type exc: ResourceNotFound + :param ddoc_path: A list representing the requested path. + :type ddoc_path: list + + :raise MissingDesignDocListFunctionError: Raised when trying to access a + missing list function on a + design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a design + document for an yet unknown reason. + """ + path = "".join(ddoc_path) + if exc.message[1][0] == 'unnamed_error': + raise errors.MissingDesignDocListFunctionError(path) + # other errors are unknown for now + raise errors.DesignDocUnknownError(path) + - def _create_persistent_method(old_method_name, key, load_method_name, - dump_method_name, store): - """ - Create a persistent method to replace C{old_method_name}. - - The new method will load C{key} using C{load_method_name} and stores - it using C{dump_method_name} depending on the value of C{store}. - """ - # get methods - old_method = getattr(cls, old_method_name) - load_method = getattr(cls, load_method_name) \ - if load_method_name is not None \ - else lambda self, data: setattr(self, key, data) - dump_method = getattr(cls, dump_method_name) \ - if dump_method_name is not None \ - else lambda self: getattr(self, key) - - def _new_method(self, *args, **kwargs): - # get u1db data from couch db - doc = self._get_doc('%s%s' % - (self.U1DB_DATA_DOC_ID_PREFIX, key)) - load_method(self, doc.content['content']) - # run old method - retval = old_method(self, *args, **kwargs) - # store u1db data on couch - if store: - doc.content = {'content': dump_method(self)} - self._put_doc(doc) - return retval - - return _new_method - - # ensure the class has a persistency map - if not hasattr(cls, 'PERSISTENCY_MAP'): - logger.error('Class %s has no PERSISTENCY_MAP attribute, skipping ' - 'persistent methods substitution.' % cls) - return cls - # replace old methods with new persistent ones - for key, ((load_method_name, dump_method_name), - persistent_methods) in cls.PERSISTENCY_MAP.iteritems(): - for (method_name, store) in persistent_methods: - setattr(cls, method_name, - _create_persistent_method( - method_name, - key, - load_method_name, - dump_method_name, - store)) - return cls - - -@persistent_class -class CouchDatabase(ObjectStoreDatabase): +class MultipartWriter(object): """ - A U1DB backend that uses Couch as its persistence layer. + A multipart writer adapted from python-couchdb's one so we can PUT + documents using couch's multipart PUT. + + This stripped down version does not allow for nested structures, and + contains only the essential things we need to PUT SoledadDocuments to the + couch backend. """ - U1DB_TRANSACTION_LOG_KEY = '_transaction_log' - U1DB_CONFLICTS_KEY = '_conflicts' - U1DB_OTHER_GENERATIONS_KEY = '_other_generations' - U1DB_INDEXES_KEY = '_indexes' - U1DB_REPLICA_UID_KEY = '_replica_uid' - - U1DB_DATA_KEYS = [ - U1DB_TRANSACTION_LOG_KEY, - U1DB_CONFLICTS_KEY, - U1DB_OTHER_GENERATIONS_KEY, - U1DB_INDEXES_KEY, - U1DB_REPLICA_UID_KEY, - ] - - COUCH_ID_KEY = '_id' - COUCH_REV_KEY = '_rev' - COUCH_U1DB_ATTACHMENT_KEY = 'u1db_json' - COUCH_U1DB_REV_KEY = 'u1db_rev' - - # the following map describes information about methods usage of - # properties that have to persist on the underlying database. The format - # of the map is assumed to be: - # - # { - # 'property_name': [ - # ('property_load_method_name', 'property_dump_method_name'), - # [('method_1_name', bool), - # ... - # ('method_N_name', bool)]], - # ... - # } - # - # where the booleans indicate if the property should be stored after - # each method execution (i.e. if the method alters the property). Property - # load/dump methods will be run after/before properties are read/written - # to the underlying db. - PERSISTENCY_MAP = { - U1DB_TRANSACTION_LOG_KEY: [ - ('_load_transaction_log_from_json', None), - [('_get_transaction_log', False), - ('_get_generation', False), - ('_get_generation_info', False), - ('_get_trans_id_for_gen', False), - ('whats_changed', False), - ('_put_and_update_indexes', True)]], - U1DB_CONFLICTS_KEY: [ - (None, None), - [('_has_conflicts', False), - ('get_doc_conflicts', False), - ('_prune_conflicts', False), - ('resolve_doc', False), - ('_replace_conflicts', True), - ('_force_doc_sync_conflict', True)]], - U1DB_OTHER_GENERATIONS_KEY: [ - ('_load_other_generations_from_json', None), - [('_get_replica_gen_and_trans_id', False), - ('_do_set_replica_gen_and_trans_id', True)]], - U1DB_INDEXES_KEY: [ - ('_load_indexes_from_json', '_dump_indexes_as_json'), - [('list_indexes', False), - ('get_from_index', False), - ('get_range_from_index', False), - ('get_index_keys', False), - ('_put_and_update_indexes', True), - ('create_index', True), - ('delete_index', True)]], - U1DB_REPLICA_UID_KEY: [ - (None, None), - [('_allocate_doc_rev', False), - ('_put_doc_if_newer', False), - ('_ensure_maximal_rev', False), - ('_prune_conflicts', False), - ('_set_replica_uid', True)]]} + CRLF = '\r\n' + + def __init__(self, fileobj, headers=None, boundary=None): + """ + Initialize the multipart writer. + """ + self.fileobj = fileobj + if boundary is None: + boundary = self._make_boundary() + self._boundary = boundary + self._build_headers('related', headers) + + def add(self, mimetype, content, headers={}): + """ + Add a part to the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + self.fileobj.write(self.CRLF) + headers['Content-Type'] = mimetype + self._write_headers(headers) + if content: + # XXX: throw an exception if a boundary appears in the content?? + self.fileobj.write(content) + self.fileobj.write(self.CRLF) + + def close(self): + """ + Close the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + # be careful not to have anything after '--', otherwise old couch + # versions (including bigcouch) will fail. + self.fileobj.write('--') + + def _make_boundary(self): + """ + Create a boundary to discern multi parts. + """ + try: + from uuid import uuid4 + return '==' + uuid4().hex + '==' + except ImportError: + from random import randrange + token = randrange(sys.maxint) + format = '%%0%dd' % len(repr(sys.maxint - 1)) + return '===============' + (format % token) + '==' + + def _write_headers(self, headers): + """ + Write a part header in the buffer stream. + """ + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.fileobj.write(name) + self.fileobj.write(': ') + self.fileobj.write(value) + self.fileobj.write(self.CRLF) + self.fileobj.write(self.CRLF) + + def _build_headers(self, subtype, headers): + """ + Build the main headers of the multipart stream. + + This is here so we can send headers separete from content using + python-couchdb API. + """ + self.headers = {} + self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \ + (subtype, self._boundary) + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.headers[name] = value + + +class CouchDatabase(CommonBackend): + """ + A U1DB implementation that uses CouchDB as its persistence layer. + """ + + # We spawn threads to parallelize the CouchDatabase.get_docs() method + MAX_GET_DOCS_THREADS = 20 + + update_handler_lock = defaultdict(threading.Lock) + + class _GetDocThread(threading.Thread): + """ + A thread that gets a document from a database. + + TODO: switch this for a twisted deferred to thread. This depends on + replacing python-couchdb for paisley in this module. + """ + + def __init__(self, db, doc_id, check_for_conflicts, + release_fun): + """ + :param db: The database from where to get the document. + :type db: u1db.Database + :param doc_id: The doc_id of the document to be retrieved. + :type doc_id: str + :param check_for_conflicts: Whether the get_doc() method should + check for existing conflicts. + :type check_for_conflicts: bool + :param release_fun: A function that releases a semaphore, to be + called after the document is fetched. + :type release_fun: function + """ + threading.Thread.__init__(self) + self._db = db + self._doc_id = doc_id + self._check_for_conflicts = check_for_conflicts + self._release_fun = release_fun + self._doc = None + + def run(self): + """ + Fetch the document, store it as a property, and call the release + function. + """ + self._doc = self._db._get_doc( + self._doc_id, self._check_for_conflicts) + self._release_fun() @classmethod - def open_database(cls, url, create): + def open_database(cls, url, create, ensure_ddocs=False): """ Open a U1DB database using CouchDB as backend. - @param url: the url of the database replica - @type url: str - @param create: should the replica be created if it does not exist? - @type create: bool + :param url: the url of the database replica + :type url: str + :param create: should the replica be created if it does not exist? + :type create: bool + :param ensure_ddocs: Ensure that the design docs exist on server. + :type ensure_ddocs: bool - @return: the database instance - @rtype: CouchDatabase + :return: the database instance + :rtype: CouchDatabase """ # get database from url m = re.match('(^https?://[^/]+)/(.+)$', url) @@ -214,308 +406,1057 @@ class CouchDatabase(ObjectStoreDatabase): except ResourceNotFound: if not create: raise DatabaseDoesNotExist() - return cls(url, dbname) + return cls(url, dbname, ensure_ddocs=ensure_ddocs) def __init__(self, url, dbname, replica_uid=None, full_commit=True, - session=None): + session=None, ensure_ddocs=True): """ Create a new Couch data container. - @param url: the url of the couch database - @type url: str - @param dbname: the database name - @type dbname: str - @param replica_uid: an optional unique replica identifier - @type replica_uid: str - @param full_commit: turn on the X-Couch-Full-Commit header - @type full_commit: bool - @param session: an http.Session instance or None for a default session - @type session: http.Session + :param url: the url of the couch database + :type url: str + :param dbname: the database name + :type dbname: str + :param replica_uid: an optional unique replica identifier + :type replica_uid: str + :param full_commit: turn on the X-Couch-Full-Commit header + :type full_commit: bool + :param session: an http.Session instance or None for a default session + :type session: http.Session + :param ensure_ddocs: Ensure that the design docs exist on server. + :type ensure_ddocs: bool """ # save params self._url = url self._full_commit = full_commit + if session is None: + session = Session(timeout=COUCH_TIMEOUT) self._session = session + self._factory = CouchDocument + self._real_replica_uid = None # configure couch self._server = Server(url=self._url, full_commit=self._full_commit, session=self._session) self._dbname = dbname - # this will ensure that transaction and sync logs exist and are - # up-to-date. try: self._database = self._server[self._dbname] except ResourceNotFound: self._server.create(self._dbname) self._database = self._server[self._dbname] - ObjectStoreDatabase.__init__(self, replica_uid=replica_uid) + if replica_uid is not None: + self._set_replica_uid(replica_uid) + if ensure_ddocs: + self.ensure_ddocs_on_db() + # initialize a thread pool for parallelizing get_docs() + self._sem_pool = threading.BoundedSemaphore( + value=self.MAX_GET_DOCS_THREADS) + + def ensure_ddocs_on_db(self): + """ + Ensure that the design documents used by the backend exist on the + couch database. + """ + # we check for existence of one of the files, and put all of them if + # that one does not exist + try: + self._database['_design/docs'] + return + except ResourceNotFound: + for ddoc_name in ['docs', 'syncs', 'transactions']: + ddoc = json.loads( + binascii.a2b_base64( + getattr(ddocs, ddoc_name))) + self._database.save(ddoc) + + def get_sync_target(self): + """ + Return a SyncTarget object, for another u1db to synchronize with. + + :return: The sync target. + :rtype: CouchSyncTarget + """ + return CouchSyncTarget(self) - #------------------------------------------------------------------------- - # methods from Database - #------------------------------------------------------------------------- + def delete_database(self): + """ + Delete a U1DB CouchDB database. + """ + del(self._server[self._dbname]) + + def close(self): + """ + Release any resources associated with this database. + + :return: True if db was succesfully closed. + :rtype: bool + """ + self._url = None + self._full_commit = None + self._session = None + self._server = None + self._database = None + return True + + def _set_replica_uid(self, replica_uid): + """ + Force the replica uid to be set. + + :param replica_uid: The new replica uid. + :type replica_uid: str + """ + try: + # set on existent config document + doc = self._database['u1db_config'] + doc['replica_uid'] = replica_uid + except ResourceNotFound: + # or create the config document + doc = { + '_id': 'u1db_config', + 'replica_uid': replica_uid, + } + self._database.save(doc) + self._real_replica_uid = replica_uid + + def _get_replica_uid(self): + """ + Get the replica uid. + + :return: The replica uid. + :rtype: str + """ + if self._real_replica_uid is not None: + return self._real_replica_uid + try: + # grab replica_uid from server + doc = self._database['u1db_config'] + self._real_replica_uid = doc['replica_uid'] + return self._real_replica_uid + except ResourceNotFound: + # create a unique replica_uid + self._real_replica_uid = uuid.uuid4().hex + self._set_replica_uid(self._real_replica_uid) + return self._real_replica_uid + + _replica_uid = property(_get_replica_uid, _set_replica_uid) + + def _get_generation(self): + """ + Return the current generation. + + :return: The current generation. + :rtype: int + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. + """ + # query a couch list function + ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json() + return response[2]['generation'] + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) + except ServerError as e: + raise_server_error(e, ddoc_path) + + def _get_generation_info(self): + """ + Return the current generation. + + :return: A tuple containing the current generation and transaction id. + :rtype: (int, str) + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. + """ + # query a couch list function + ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log'] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json() + return (response[2]['generation'], response[2]['transaction_id']) + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) + except ServerError as e: + raise_server_error(e, ddoc_path) + + def _get_trans_id_for_gen(self, generation): + """ + Get the transaction id corresponding to a particular generation. + + :param generation: The generation for which to get the transaction id. + :type generation: int + + :return: The transaction id for C{generation}. + :rtype: str + + :raise InvalidGeneration: Raised when the generation does not exist. + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. + """ + if generation == 0: + return '' + # query a couch list function + ddoc_path = [ + '_design', 'transactions', '_list', 'trans_id_for_gen', 'log' + ] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json(gen=generation) + if response[2] == {}: + raise InvalidGeneration + return response[2]['transaction_id'] + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) + except ServerError as e: + raise_server_error(e, ddoc_path) + + def _get_transaction_log(self): + """ + This is only for the test suite, it is not part of the api. + + :return: The complete transaction log. + :rtype: [(str, str)] + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. + """ + # query a couch view + ddoc_path = ['_design', 'transactions', '_view', 'log'] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json() + return map( + lambda row: (row['id'], row['value']), + response[2]['rows']) + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) def _get_doc(self, doc_id, check_for_conflicts=False): """ - Get just the document content, without fancy handling. + Extract the document from storage. + + This can return None if the document doesn't exist. - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be + :param doc_id: The unique document identifier + :type doc_id: str + :param check_for_conflicts: If set to False, then the conflict check + will be skipped. + :type check_for_conflicts: bool + + :return: The document. + :rtype: CouchDocument + """ + # get document with all attachments (u1db content and eventual + # conflicts) + try: + result = \ + self._database.resource(doc_id).get_json( + attachments=True)[2] + except ResourceNotFound: + return None + # restrict to u1db documents + if 'u1db_rev' not in result: + return None + doc = self._factory(doc_id, result['u1db_rev']) + # set contents or make tombstone + if '_attachments' not in result \ + or 'u1db_content' not in result['_attachments']: + doc.make_tombstone() + else: + doc.content = json.loads( + binascii.a2b_base64( + result['_attachments']['u1db_content']['data'])) + # determine if there are conflicts + if check_for_conflicts \ + and '_attachments' in result \ + and 'u1db_conflicts' in result['_attachments']: + doc.has_conflicts = True + doc.set_conflicts( + self._build_conflicts( + doc.doc_id, + json.loads(binascii.a2b_base64( + result['_attachments']['u1db_conflicts']['data'])))) + # store couch revision + doc.couch_rev = result['_rev'] + # store transactions + doc.transactions = result['u1db_transactions'] + return doc + + def get_doc(self, doc_id, include_deleted=False): + """ + Get the JSON string for the given document. + + :param doc_id: The unique document identifier + :type doc_id: str + :param include_deleted: If set to True, deleted documents will be returned with empty content. Otherwise asking for a deleted document will return None. - @type include_deleted: bool + :type include_deleted: bool - @return: a Document object. - @type: u1db.Document + :return: A document object. + :rtype: CouchDocument. """ - cdoc = self._database.get(doc_id) - if cdoc is None: + doc = self._get_doc(doc_id, check_for_conflicts=True) + if doc is None: + return None + if doc.is_tombstone() and not include_deleted: return None - has_conflicts = False - if check_for_conflicts: - has_conflicts = self._has_conflicts(doc_id) - doc = self._factory( - doc_id=doc_id, - rev=cdoc[self.COUCH_U1DB_REV_KEY], - has_conflicts=has_conflicts) - contents = self._database.get_attachment( - cdoc, - self.COUCH_U1DB_ATTACHMENT_KEY) - if contents: - doc.content = json.loads(contents.read()) - else: - doc.make_tombstone() return doc def get_all_docs(self, include_deleted=False): """ Get the JSON content for all documents in the database. - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise deleted documents will not - be included in the results. - @type include_deleted: bool + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :type include_deleted: bool - @return: (generation, [Document]) + :return: (generation, [CouchDocument]) The current generation of the database, followed by a list of all the documents in the database. - @rtype: tuple + :rtype: (int, [CouchDocument]) """ + generation = self._get_generation() results = [] - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue - doc = self._get_doc(doc_id, check_for_conflicts=True) - if doc.content is None and not include_deleted: - continue - results.append(doc) + for row in self._database.view('_all_docs'): + doc = self.get_doc(row.id, include_deleted=include_deleted) + if doc is not None: + results.append(doc) return (generation, results) - def _put_doc(self, doc): + def _put_doc(self, old_doc, doc): + """ + Put the document in the Couch backend database. + + Note that C{old_doc} must have been fetched with the parameter + C{check_for_conflicts} equal to True, so we can properly update the + new document using the conflict information from the old one. + + :param old_doc: The old document version. + :type old_doc: CouchDocument + :param doc: The document to be put. + :type doc: CouchDocument + + :raise RevisionConflict: Raised when trying to update a document but + couch revisions mismatch. + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. + """ + attachments = {} # we save content and conflicts as attachments + parts = [] # and we put it using couch's multipart PUT + # save content as attachment + if doc.is_tombstone() is False: + content = doc.get_json() + attachments['u1db_content'] = { + 'follows': True, + 'content_type': 'application/octet-stream', + 'length': len(content), + } + parts.append(content) + # save conflicts as attachment + if doc.has_conflicts is True: + conflicts = json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + attachments['u1db_conflicts'] = { + 'follows': True, + 'content_type': 'application/octet-stream', + 'length': len(conflicts), + } + parts.append(conflicts) + # store old transactions, if any + transactions = old_doc.transactions[:] if old_doc is not None else [] + # create a new transaction id and timestamp it so the transaction log + # is consistent when querying the database. + transactions.append( + # here we store milliseconds to keep consistent with javascript + # Date.prototype.getTime() which was used before inside a couchdb + # update handler. + (int(time.time() * 1000), + self._allocate_transaction_id())) + # build the couch document + couch_doc = { + '_id': doc.doc_id, + 'u1db_rev': doc.rev, + 'u1db_transactions': transactions, + '_attachments': attachments, + } + # if we are updating a doc we have to add the couch doc revision + if old_doc is not None: + couch_doc['_rev'] = old_doc.couch_rev + # prepare the multipart PUT + buf = StringIO() + envelope = MultipartWriter(buf) + envelope.add('application/json', json.dumps(couch_doc)) + for part in parts: + envelope.add('application/octet-stream', part) + envelope.close() + # try to save and fail if there's a revision conflict + try: + self._database.resource.put_json( + doc.doc_id, body=buf.getvalue(), headers=envelope.headers) + self._renew_couch_session() + except ResourceConflict: + raise RevisionConflict() + + def put_doc(self, doc): """ Update a document. - This is called everytime we just want to do a raw put on the db (i.e. - without index updates, document constraint checks, and conflict - checks). - - @param doc: The document to update. - @type doc: u1db.Document - - @return: The new revision identifier for the document. - @rtype: str - """ - # prepare couch's Document - cdoc = CouchDocument() - cdoc[self.COUCH_ID_KEY] = doc.doc_id - # we have to guarantee that couch's _rev is consistent - old_cdoc = self._database.get(doc.doc_id) - if old_cdoc is not None: - cdoc[self.COUCH_REV_KEY] = old_cdoc[self.COUCH_REV_KEY] - # store u1db's rev - cdoc[self.COUCH_U1DB_REV_KEY] = doc.rev - # save doc in db - self._database.save(cdoc) - # store u1db's content as json string - if not doc.is_tombstone(): - self._database.put_attachment( - cdoc, doc.get_json(), - filename=self.COUCH_U1DB_ATTACHMENT_KEY) - else: - self._database.delete_attachment( - cdoc, - self.COUCH_U1DB_ATTACHMENT_KEY) + If the document currently has conflicts, put will fail. + If the database specifies a maximum document size and the document + exceeds it, put will fail and raise a DocumentTooBig exception. - def get_sync_target(self): - """ - Return a SyncTarget object, for another u1db to synchronize with. + :param doc: A Document with new content. + :return: new_doc_rev - The new revision identifier for the document. + The Document object will also be updated. - @return: The sync target. - @rtype: CouchSyncTarget + :raise InvalidDocId: Raised if the document's id is invalid. + :raise DocumentTooBig: Raised if the document size is too big. + :raise ConflictedDoc: Raised if the document has conflicts. """ - return CouchSyncTarget(self) - - def create_index(self, index_name, *index_expressions): + if doc.doc_id is None: + raise InvalidDocId() + self._check_doc_id(doc.doc_id) + self._check_doc_size(doc) + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc and old_doc.has_conflicts: + raise ConflictedDoc() + if old_doc and doc.rev is None and old_doc.is_tombstone(): + new_rev = self._allocate_doc_rev(old_doc.rev) + else: + if old_doc is not None: + if old_doc.rev != doc.rev: + raise RevisionConflict() + else: + if doc.rev is not None: + raise RevisionConflict() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + self._put_doc(old_doc, doc) + return new_rev + + def whats_changed(self, old_generation=0): """ - Create a named index, which can then be queried for future lookups. - - @param index_name: A unique name which can be used as a key prefix. - @param index_expressions: Index expressions defining the index - information. + Return a list of documents that have changed since old_generation. + + :param old_generation: The generation of the database in the old + state. + :type old_generation: int + + :return: (generation, trans_id, [(doc_id, generation, trans_id),...]) + The current generation of the database, its associated + transaction id, and a list of of changed documents since + old_generation, represented by tuples with for each document + its doc_id and the generation and transaction id corresponding + to the last intervening change and sorted by generation (old + changes first) + :rtype: (int, str, [(str, int, str)]) + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ - if index_name in self._indexes: - if self._indexes[index_name]._definition == list( - index_expressions): - return - raise errors.IndexNameTakenError - index = InMemoryIndex(index_name, list(index_expressions)) - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue # skip special files - doc = self._get_doc(doc_id) - if doc.content is not None: - index.add_json(doc_id, doc.get_json()) - self._indexes[index_name] = index - - def close(self): + # query a couch list function + ddoc_path = [ + '_design', 'transactions', '_list', 'whats_changed', 'log' + ] + res = self._database.resource(*ddoc_path) + try: + response = res.get_json(old_gen=old_generation) + results = map( + lambda row: + (row['generation'], row['doc_id'], row['transaction_id']), + response[2]['transactions']) + results.reverse() + cur_gen = old_generation + seen = set() + changes = [] + newest_trans_id = '' + for generation, doc_id, trans_id in results: + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + if changes: + cur_gen = changes[0][1] # max generation + newest_trans_id = changes[0][2] + changes.reverse() + else: + cur_gen, newest_trans_id = self._get_generation_info() + + return cur_gen, newest_trans_id, changes + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) + except ServerError as e: + raise_server_error(e, ddoc_path) + + def delete_doc(self, doc): """ - Release any resources associated with this database. + Mark a document as deleted. + + Will abort if the current revision doesn't match doc.rev. + This will also set doc.content to None. + + :param doc: The document to mark as deleted. + :type doc: CouchDocument. - @return: True if db was succesfully closed. - @rtype: bool + :raise DocumentDoesNotExist: Raised if the document does not + exist. + :raise RevisionConflict: Raised if the revisions do not match. + :raise DocumentAlreadyDeleted: Raised if the document is + already deleted. + :raise ConflictedDoc: Raised if the doc has conflicts. """ - # TODO: fix this method so the connection is properly closed and - # test_close (+tearDown, which deletes the db) works without problems. - self._url = None - self._full_commit = None - self._session = None - #self._server = None - self._database = None - return True + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc is None: + raise DocumentDoesNotExist + if old_doc.rev != doc.rev: + raise RevisionConflict() + if old_doc.is_tombstone(): + raise DocumentAlreadyDeleted + if old_doc.has_conflicts: + raise ConflictedDoc() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + doc.make_tombstone() + self._put_doc(old_doc, doc) + return new_rev + + def _build_conflicts(self, doc_id, attached_conflicts): + """ + Build the conflicted documents list from the conflicts attachment + fetched from a couch document. - def sync(self, url, creds=None, autocreate=True): + :param attached_conflicts: The document's conflicts as fetched from a + couch document attachment. + :type attached_conflicts: dict + """ + conflicts = [] + for doc_rev, content in attached_conflicts: + doc = self._factory(doc_id, doc_rev) + if content is None: + doc.make_tombstone() + else: + doc.content = content + conflicts.append(doc) + return conflicts + + def _get_conflicts(self, doc_id, couch_rev=None): """ - Synchronize documents with remote replica exposed at url. + Get the conflicted versions of a document. - @param url: The url of the target replica to sync with. - @type url: str - @param creds: optional dictionary giving credentials. - to authorize the operation with the server. - @type creds: dict - @param autocreate: Ask the target to create the db if non-existent. - @type autocreate: bool + If the C{couch_rev} parameter is not None, conflicts for a specific + document's couch revision are returned. - @return: The local generation before the synchronisation was performed. - @rtype: int - """ - return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync( - autocreate=autocreate) + :param couch_rev: The couch document revision. + :type couch_rev: str - #------------------------------------------------------------------------- - # methods from ObjectStoreDatabase - #------------------------------------------------------------------------- + :return: A list of conflicted versions of the document. + :rtype: list + """ + # request conflicts attachment from server + params = {} + if couch_rev is not None: + params['rev'] = couch_rev # restric document's couch revision + resource = self._database.resource(doc_id, 'u1db_conflicts') + try: + response = resource.get_json(**params) + return self._build_conflicts( + doc_id, json.loads(response[2].read())) + except ResourceNotFound: + return [] - def _init_u1db_data(self): + def get_doc_conflicts(self, doc_id): """ - Initialize u1db configuration data on backend storage. + Get the list of conflicts for the given document. - A U1DB database needs to keep track of all database transactions, - document conflicts, the generation of other replicas it has seen, - indexes created by users and so on. + The order of the conflicts is such that the first entry is the value + that would be returned by "get_doc". - In this implementation, all this information is stored in special - documents stored in the underlying with doc_id prefix equal to - U1DB_DATA_DOC_ID_PREFIX. Those documents ids are reserved: put_doc(), - get_doc() and delete_doc() will not allow documents with a doc_id with - that prefix to be accessed or modified. + :return: A list of the document entries that are conflicted. + :rtype: [CouchDocument] """ - for key in self.U1DB_DATA_KEYS: - doc_id = '%s%s' % (self.U1DB_DATA_DOC_ID_PREFIX, key) - doc = self._get_doc(doc_id) - if doc is None: - doc = self._factory(doc_id) - doc.content = {'content': getattr(self, key)} - self._put_doc(doc) - - #------------------------------------------------------------------------- - # Couch specific methods - #------------------------------------------------------------------------- + conflict_docs = self._get_conflicts(doc_id) + if len(conflict_docs) == 0: + return [] + this_doc = self._get_doc(doc_id, check_for_conflicts=True) + return [this_doc] + conflict_docs - INDEX_NAME_KEY = 'name' - INDEX_DEFINITION_KEY = 'definition' - INDEX_VALUES_KEY = 'values' + def _get_replica_gen_and_trans_id(self, other_replica_uid): + """ + Return the last known generation and transaction id for the other db + replica. + + When you do a synchronization with another replica, the Database keeps + track of what generation the other database replica was at, and what + the associated transaction id was. This is used to determine what data + needs to be sent, and if two databases are claiming to be the same + replica. + + :param other_replica_uid: The identifier for the other replica. + :type other_replica_uid: str + + :return: A tuple containing the generation and transaction id we + encountered during synchronization. If we've never + synchronized with the replica, this is (0, ''). + :rtype: (int, str) + """ + # query a couch view + result = self._database.view('syncs/log') + if len(result[other_replica_uid].rows) == 0: + return (0, '') + return ( + result[other_replica_uid].rows[0]['value']['known_generation'], + result[other_replica_uid].rows[0]['value']['known_transaction_id'] + ) + + def _set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + self._do_set_replica_gen_and_trans_id( + other_replica_uid, other_generation, other_transaction_id) - def delete_database(self): + def _do_set_replica_gen_and_trans_id( + self, other_replica_uid, other_generation, other_transaction_id): """ - Delete a U1DB CouchDB database. + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. """ - del(self._server[self._dbname]) + # query a couch update function + ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log'] + res = self._database.resource(*ddoc_path) + try: + with CouchDatabase.update_handler_lock[self._get_replica_uid()]: + res.put_json( + body={ + 'other_replica_uid': other_replica_uid, + 'other_generation': other_generation, + 'other_transaction_id': other_transaction_id, + }, + headers={'content-type': 'application/json'}) + except ResourceNotFound as e: + raise_missing_design_doc_error(e, ddoc_path) + + def _add_conflict(self, doc, my_doc_rev, my_content): + """ + Add a conflict to the document. + + Note that this method does not actually update the backend; rather, it + updates the CouchDocument object which will provide the conflict data + when the atomic document update is made. + + :param doc: The document to have conflicts added to. + :type doc: CouchDocument + :param my_doc_rev: The revision of the conflicted document. + :type my_doc_rev: str + :param my_content: The content of the conflicted document as a JSON + serialized string. + :type my_content: str + """ + doc._ensure_fetch_conflicts(self._get_conflicts) + doc.add_conflict( + self._factory(doc_id=doc.doc_id, rev=my_doc_rev, + json=my_content)) - def _dump_indexes_as_json(self): + def _delete_conflicts(self, doc, conflict_revs): """ - Dump index definitions as JSON. + Delete the conflicted revisions from the list of conflicts of C{doc}. + + Note that this method does not actually update the backend; rather, it + updates the CouchDocument object which will provide the conflict data + when the atomic document update is made. + + :param doc: The document to have conflicts deleted. + :type doc: CouchDocument + :param conflict_revs: A list of the revisions to be deleted. + :param conflict_revs: [str] """ - indexes = {} - for name, idx in self._indexes.iteritems(): - indexes[name] = {} - for attr in [self.INDEX_NAME_KEY, self.INDEX_DEFINITION_KEY, - self.INDEX_VALUES_KEY]: - indexes[name][attr] = getattr(idx, '_' + attr) - return indexes + doc._ensure_fetch_conflicts(self._get_conflicts) + doc.delete_conflicts(conflict_revs) - def _load_indexes_from_json(self, indexes): + def _prune_conflicts(self, doc, doc_vcr): """ - Load index definitions from stored JSON. + Prune conflicts that are older then the current document's revision, or + whose content match to the current document's content. + + :param doc: The document to have conflicts pruned. + :type doc: CouchDocument + :param doc_vcr: A vector clock representing the current document's + revision. + :type doc_vcr: u1db.vectorclock.VectorClock + """ + if doc.has_conflicts is True: + autoresolved = False + c_revs_to_prune = [] + for c_doc in doc.get_conflicts(): + c_vcr = vectorclock.VectorClockRev(c_doc.rev) + if doc_vcr.is_newer(c_vcr): + c_revs_to_prune.append(c_doc.rev) + elif doc.same_content_as(c_doc): + c_revs_to_prune.append(c_doc.rev) + doc_vcr.maximize(c_vcr) + autoresolved = True + if autoresolved: + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._delete_conflicts(doc, c_revs_to_prune) + + def _force_doc_sync_conflict(self, doc): + """ + Add a conflict and force a document put. - @param indexes: A JSON representation of indexes as - [('index-name', ['field', 'field2', ...]), ...]. - @type indexes: str + :param doc: The document to be put. + :type doc: CouchDocument """ - self._indexes = {} - for name, idx_dict in indexes.iteritems(): - idx = InMemoryIndex(name, idx_dict[self.INDEX_DEFINITION_KEY]) - idx._values = idx_dict[self.INDEX_VALUES_KEY] - self._indexes[name] = idx + my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + self._add_conflict(doc, my_doc.rev, my_doc.get_json()) + doc.has_conflicts = True + self._put_doc(my_doc, doc) - def _load_transaction_log_from_json(self, transaction_log): + def resolve_doc(self, doc, conflicted_doc_revs): """ - Load transaction log from stored JSON. + Mark a document as no longer conflicted. + + We take the list of revisions that the client knows about that it is + superseding. This may be a different list from the actual current + conflicts, in which case only those are removed as conflicted. This + may fail if the conflict list is significantly different from the + supplied information. (sync could have happened in the background from + the time you GET_DOC_CONFLICTS until the point where you RESOLVE) + + :param doc: A Document with the new content to be inserted. + :type doc: CouchDocument + :param conflicted_doc_revs: A list of revisions that the new content + supersedes. + :type conflicted_doc_revs: [str] + + :raise MissingDesignDocError: Raised when tried to access a missing + design document. + :raise MissingDesignDocListFunctionError: Raised when trying to access + a missing list function on a + design document. + :raise MissingDesignDocNamedViewError: Raised when trying to access a + missing named view on a design + document. + :raise MissingDesignDocDeletedError: Raised when trying to access a + deleted design document. + :raise MissingDesignDocUnknownError: Raised when failed to access a + design document for an yet + unknown reason. + """ + cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + new_rev = self._ensure_maximal_rev(cur_doc.rev, + conflicted_doc_revs) + superseded_revs = set(conflicted_doc_revs) + doc.rev = new_rev + # this backend stores conflicts as properties of the documents, so we + # have to copy these conflicts over to the document being updated. + if cur_doc.rev in superseded_revs: + # the newer doc version will supersede the one in the database, so + # we copy conflicts before updating the backend. + doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over. + self._delete_conflicts(doc, superseded_revs) + self._put_doc(cur_doc, doc) + else: + # the newer doc version does not supersede the one in the + # database, so we will add a conflict to the database and copy + # those over to the document the user has in her hands. + self._add_conflict(cur_doc, new_rev, doc.get_json()) + self._delete_conflicts(cur_doc, superseded_revs) + self._put_doc(cur_doc, cur_doc) # just update conflicts + # backend has been updated with current conflicts, now copy them + # to the current document. + doc.set_conflicts(cur_doc.get_conflicts()) + + def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, + replica_trans_id=''): + """ + Insert/update document into the database with a given revision. + + This api is used during synchronization operations. + + If a document would conflict and save_conflict is set to True, the + content will be selected as the 'current' content for doc.doc_id, + even though doc.rev doesn't supersede the currently stored revision. + The currently stored document will be added to the list of conflict + alternatives for the given doc_id. + + This forces the new content to be 'current' so that we get convergence + after synchronizing, even if people don't resolve conflicts. Users can + then notice that their content is out of date, update it, and + synchronize again. (The alternative is that users could synchronize and + think the data has propagated, but their local copy looks fine, and the + remote copy is never updated again.) + + :param doc: A document object + :type doc: CouchDocument + :param save_conflict: If this document is a conflict, do you want to + save it as a conflict, or just ignore it. + :type save_conflict: bool + :param replica_uid: A unique replica identifier. + :type replica_uid: str + :param replica_gen: The generation of the replica corresponding to the + this document. The replica arguments are optional, + but are used during synchronization. + :type replica_gen: int + :param replica_trans_id: The transaction_id associated with the + generation. + :type replica_trans_id: str + + :return: (state, at_gen) - If we don't have doc_id already, or if + doc_rev supersedes the existing document revision, then the + content will be inserted, and state is 'inserted'. If + doc_rev is less than or equal to the existing revision, then + the put is ignored and state is respecitvely 'superseded' or + 'converged'. If doc_rev is not strictly superseded or + supersedes, then state is 'conflicted'. The document will not + be inserted if save_conflict is False. For 'inserted' or + 'converged', at_gen is the insertion/current generation. + :rtype: (str, int) + """ + cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + # at this point, `doc` has arrived from the other syncing party, and + # we will decide what to do with it. + # First, we prepare the arriving doc to update couch database. + old_doc = doc + doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) + if cur_doc is not None: + doc.couch_rev = cur_doc.couch_rev + # fetch conflicts because we will eventually manipulate them + doc._ensure_fetch_conflicts(self._get_conflicts) + # from now on, it works just like u1db sqlite backend + doc_vcr = vectorclock.VectorClockRev(doc.rev) + if cur_doc is None: + cur_vcr = vectorclock.VectorClockRev(None) + else: + cur_vcr = vectorclock.VectorClockRev(cur_doc.rev) + self._validate_source(replica_uid, replica_gen, replica_trans_id) + if doc_vcr.is_newer(cur_vcr): + rev = doc.rev + self._prune_conflicts(doc, doc_vcr) + if doc.rev != rev: + # conflicts have been autoresolved + state = 'superseded' + else: + state = 'inserted' + self._put_doc(cur_doc, doc) + elif doc.rev == cur_doc.rev: + # magical convergence + state = 'converged' + elif cur_vcr.is_newer(doc_vcr): + # Don't add this to seen_ids, because we have something newer, + # so we should send it back, and we should not generate a + # conflict + state = 'superseded' + elif cur_doc.same_content_as(doc): + # the documents have been edited to the same thing at both ends + doc_vcr.maximize(cur_vcr) + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._put_doc(cur_doc, doc) + state = 'superseded' + else: + state = 'conflicted' + if save_conflict: + self._force_doc_sync_conflict(doc) + if replica_uid is not None and replica_gen is not None: + self._set_replica_gen_and_trans_id( + replica_uid, replica_gen, replica_trans_id) + # update info + old_doc.rev = doc.rev + if doc.is_tombstone(): + old_doc.is_tombstone() + else: + old_doc.content = doc.content + old_doc.has_conflicts = doc.has_conflicts + return state, self._get_generation() - @param transaction_log: A JSON representation of transaction_log as - [('generation', 'transaction_id'), ...]. - @type transaction_log: list + def get_docs(self, doc_ids, check_for_conflicts=True, + include_deleted=False): """ - self._transaction_log = [] - for gen, trans_id in transaction_log: - self._transaction_log.append((gen, trans_id)) + Get the JSON content for many documents. + + :param doc_ids: A list of document identifiers. + :type doc_ids: list + :param check_for_conflicts: If set to False, then the conflict check + will be skipped, and 'None' will be + returned instead of True/False. + :type check_for_conflictsa: bool + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :return: iterable giving the Document object for each document id + in matching doc_ids order. + :rtype: iterable + """ + # Workaround for: + # + # http://bugs.python.org/issue7980 + # https://leap.se/code/issues/5449 + # + # python-couchdb uses time.strptime, which is not thread safe. In + # order to avoid the problem described on the issues above, we preload + # strptime here by evaluating the conversion of an arbitrary date. + # This will not be needed when/if we switch from python-couchdb to + # paisley. + time.strptime('Mar 4 1917', '%b %d %Y') + # spawn threads to retrieve docs + threads = [] + for doc_id in doc_ids: + self._sem_pool.acquire() + t = self._GetDocThread(self, doc_id, check_for_conflicts, + self._sem_pool.release) + t.start() + threads.append(t) + # join threads and yield docs + for t in threads: + t.join() + if t._doc.is_tombstone() and not include_deleted: + continue + yield t._doc - def _load_other_generations_from_json(self, other_generations): + def _renew_couch_session(self): """ - Load other generations from stored JSON. + Create a new couch connection session. - @param other_generations: A JSON representation of other_generations - as {'replica_uid': ('generation', 'transaction_id'), ...}. - @type other_generations: dict + This is a workaround for #5448. Will not be needed once bigcouch is + merged with couchdb. """ - self._other_generations = {} - for replica_uid, [gen, trans_id] in other_generations.iteritems(): - self._other_generations[replica_uid] = (gen, trans_id) + self._database.resource.session = Session(timeout=COUCH_TIMEOUT) -class CouchSyncTarget(ObjectStoreSyncTarget): +class CouchSyncTarget(CommonSyncTarget): """ Functionality for using a CouchDatabase as a synchronization target. """ - pass + def get_sync_info(self, source_replica_uid): + source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( + source_replica_uid) + my_gen, my_trans_id = self._db._get_generation_info() + return ( + self._db._replica_uid, my_gen, my_trans_id, source_gen, + source_trans_id) -class NotEnoughCouchPermissions(Exception): - """ - Raised when failing to assert for enough permissions on underlying Couch - Database. - """ - pass + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + if self._trace_hook: + self._trace_hook('record_sync_info') + self._db._set_replica_gen_and_trans_id( + source_replica_uid, source_replica_generation, + source_replica_transaction_id) class CouchServerState(ServerState): @@ -527,121 +1468,66 @@ class CouchServerState(ServerState): """ Initialize the couch server state. - @param couch_url: The URL for the couch database. - @type couch_url: str - @param shared_db_name: The name of the shared database. - @type shared_db_name: str - @param tokens_db_name: The name of the tokens database. - @type tokens_db_name: str + :param couch_url: The URL for the couch database. + :type couch_url: str + :param shared_db_name: The name of the shared database. + :type shared_db_name: str + :param tokens_db_name: The name of the tokens database. + :type tokens_db_name: str """ self._couch_url = couch_url self._shared_db_name = shared_db_name self._tokens_db_name = tokens_db_name - try: - self._check_couch_permissions() - except NotEnoughCouchPermissions: - logger.error("Not enough permissions on underlying couch " - "database (%s)." % self._couch_url) - except (socket.error, socket.gaierror, socket.herror, - socket.timeout), e: - logger.error("Socket problem while trying to reach underlying " - "couch database: (%s, %s)." % - (self._couch_url, e)) - - def _check_couch_permissions(self): - """ - Assert that Soledad Server has enough permissions on the underlying - couch database. - - Soledad Server has to be able to do the following in the couch server: - - * Create, read and write from/to 'shared' db. - * Create, read and write from/to 'user-<anything>' dbs. - * Read from 'tokens' db. - - This function tries to perform the actions above using the "low level" - couch library to ensure that Soledad Server can do everything it needs - on the underlying couch database. - - @param couch_url: The URL of the couch database. - @type couch_url: str - - @raise NotEnoughCouchPermissions: Raised in case there are not enough - permissions to read/write/create the needed couch databases. - @rtype: bool - """ - - def _open_couch_db(dbname): - server = Server(url=self._couch_url) - try: - server[dbname] - except ResourceNotFound: - server.create(dbname) - return server[dbname] - - def _create_delete_test_doc(db): - doc_id, _ = db.save({'test': 'document'}) - doc = db[doc_id] - db.delete(doc) - - try: - # test read/write auth for shared db - _create_delete_test_doc( - _open_couch_db(self._shared_db_name)) - # test read/write auth for user-<something> db - _create_delete_test_doc( - _open_couch_db('%stest-db' % USER_DB_PREFIX)) - # test read auth for tokens db - tokensdb = _open_couch_db(self._tokens_db_name) - tokensdb.info() - except Unauthorized: - raise NotEnoughCouchPermissions(self._couch_url) def open_database(self, dbname): """ Open a couch database. - @param dbname: The name of the database to open. - @type dbname: str + :param dbname: The name of the database to open. + :type dbname: str - @return: The CouchDatabase object. - @rtype: CouchDatabase + :return: The CouchDatabase object. + :rtype: CouchDatabase """ - # TODO: open couch return CouchDatabase.open_database( self._couch_url + '/' + dbname, - create=False) + create=False, + ensure_ddocs=False) def ensure_database(self, dbname): """ Ensure couch database exists. - @param dbname: The name of the database to ensure. - @type dbname: str + Usually, this method is used by the server to ensure the existence of + a database. In our setup, the Soledad user that accesses the underlying + couch server should never have permission to create (or delete) + databases. But, in case it ever does, by raising an exception here we + have one more guarantee that no modified client will be able to + enforce creation of a database when syncing. + + :param dbname: The name of the database to ensure. + :type dbname: str - @return: The CouchDatabase object and the replica uid. - @rtype: (CouchDatabase, str) + :return: The CouchDatabase object and the replica uid. + :rtype: (CouchDatabase, str) """ - db = CouchDatabase.open_database( - self._couch_url + '/' + dbname, - create=True) - return db, db._replica_uid + raise Unauthorized() def delete_database(self, dbname): """ Delete couch database. - @param dbname: The name of the database to delete. - @type dbname: str + :param dbname: The name of the database to delete. + :type dbname: str """ - CouchDatabase.delete_database(self._couch_url + '/' + dbname) + raise Unauthorized() def _set_couch_url(self, url): """ Set the couchdb URL - @param url: CouchDB URL - @type url: str + :param url: CouchDB URL + :type url: str """ self._couch_url = url @@ -649,7 +1535,7 @@ class CouchServerState(ServerState): """ Return CouchDB URL - @rtype: str + :rtype: str """ return self._couch_url diff --git a/common/src/leap/soledad/common/ddocs/README.txt b/common/src/leap/soledad/common/ddocs/README.txt new file mode 100644 index 00000000..5569d929 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/README.txt @@ -0,0 +1,34 @@ +This directory holds a folder structure containing javascript files that +represent the design documents needed by the CouchDB U1DB backend. These files +are compiled into the `../ddocs.py` file by setuptools when creating the +source distribution. + +The following table depicts the U1DB CouchDB backend method and the URI that +is queried to obtain/update data from/to the server. + + +----------------------------------+------------------------------------------------------------------+ + | u1db backend method | URI | + |----------------------------------+------------------------------------------------------------------| + | _get_generation | _design/transactions/_list/generation/log | + | _get_generation_info | _design/transactions/_list/generation/log | + | _get_trans_id_for_gen | _design/transactions/_list/trans_id_for_gen/log | + | _get_transaction_log | _design/transactions/_view/log | + | _get_doc (*) | _design/docs/_view/get?key=<doc_id> | + | _has_conflicts | _design/docs/_view/get?key=<doc_id> | + | get_all_docs | _design/docs/_view/get | + | _put_doc | _design/docs/_update/put/<doc_id> | + | _whats_changed | _design/transactions/_list/whats_changed/log?old_gen=<gen> | + | _get_conflicts (*) | _design/docs/_view/conflicts?key=<doc_id> | + | _get_replica_gen_and_trans_id | _design/syncs/_view/log?other_replica_uid=<uid> | + | _do_set_replica_gen_and_trans_id | _design/syncs/_update/put/u1db_sync_log | + | _add_conflict | _design/docs/_update/add_conflict/<doc_id> | + | _delete_conflicts | _design/docs/_update/delete_conflicts/<doc_id>?doc_rev=<doc_rev> | + | list_indexes | not implemented | + | _get_index_definition | not implemented | + | delete_index | not implemented | + | _get_indexed_fields | not implemented | + | _put_and_update_indexes | not implemented | + +----------------------------------+------------------------------------------------------------------+ + +(*) These methods also request CouchDB document attachments that store U1DB + document contents. diff --git a/common/src/leap/soledad/common/ddocs/docs/views/get/map.js b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js new file mode 100644 index 00000000..ae08d9e9 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js @@ -0,0 +1,20 @@ +function(doc) { + if (doc.u1db_rev) { + var is_tombstone = true; + var has_conflicts = false; + if (doc._attachments) { + if (doc._attachments.u1db_content) + is_tombstone = false; + if (doc._attachments.u1db_conflicts) + has_conflicts = true; + } + emit(doc._id, + { + "couch_rev": doc._rev, + "u1db_rev": doc.u1db_rev, + "is_tombstone": is_tombstone, + "has_conflicts": has_conflicts, + } + ); + } +} diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js new file mode 100644 index 00000000..722f695a --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -0,0 +1,22 @@ +function(doc, req){ + if (!doc) { + doc = {} + doc['_id'] = 'u1db_sync_log'; + doc['syncs'] = []; + } + body = JSON.parse(req.body); + // remove outdated info + doc['syncs'] = doc['syncs'].filter( + function (entry) { + return entry[0] != body['other_replica_uid']; + } + ); + // store u1db rev + doc['syncs'].push([ + body['other_replica_uid'], + body['other_generation'], + body['other_transaction_id'] + ]); + return [doc, 'ok']; +} + diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js new file mode 100644 index 00000000..a63c7cf4 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js @@ -0,0 +1,12 @@ +function(doc) { + if (doc._id == 'u1db_sync_log') { + if (doc.syncs) + doc.syncs.forEach(function (entry) { + emit(entry[0], + { + 'known_generation': entry[1], + 'known_transaction_id': entry[2] + }); + }); + } +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js new file mode 100644 index 00000000..dbdfff0d --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js @@ -0,0 +1,20 @@ +function(head, req) { + var row; + var rows=[]; + // fetch all rows + while(row = getRow()) { + rows.push(row); + } + if (rows.length > 0) + send(JSON.stringify({ + "generation": rows.length, + "doc_id": rows[rows.length-1]['id'], + "transaction_id": rows[rows.length-1]['value'] + })); + else + send(JSON.stringify({ + "generation": 0, + "doc_id": "", + "transaction_id": "", + })); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js new file mode 100644 index 00000000..2ec91794 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js @@ -0,0 +1,19 @@ +function(head, req) { + var row; + var rows=[]; + var i = 1; + var gen = 1; + if (req.query.gen) + gen = parseInt(req.query['gen']); + // fetch all rows + while(row = getRow()) + rows.push(row); + if (gen <= rows.length) + send(JSON.stringify({ + "generation": gen, + "doc_id": rows[gen-1]['id'], + "transaction_id": rows[gen-1]['value'], + })); + else + send('{}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js new file mode 100644 index 00000000..b35cdf51 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js @@ -0,0 +1,22 @@ +function(head, req) { + var row; + var gen = 1; + var old_gen = 0; + if (req.query.old_gen) + old_gen = parseInt(req.query['old_gen']); + send('{"transactions":[\n'); + // fetch all rows + while(row = getRow()) { + if (gen > old_gen) { + if (gen > old_gen+1) + send(',\n'); + send(JSON.stringify({ + "generation": gen, + "doc_id": row["id"], + "transaction_id": row["value"] + })); + } + gen++; + } + send('\n]}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js new file mode 100644 index 00000000..94ef63ca --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js @@ -0,0 +1,7 @@ +function(doc) { + if (doc.u1db_transactions) + doc.u1db_transactions.forEach(function(t) { + emit(t[0], // use timestamp as key so the results are ordered + t[1]); // value is the transaction_id + }); +} diff --git a/common/src/leap/soledad/common/document.py b/common/src/leap/soledad/common/document.py index cc24b53a..919ade12 100644 --- a/common/src/leap/soledad/common/document.py +++ b/common/src/leap/soledad/common/document.py @@ -29,6 +29,7 @@ from u1db import Document # class SoledadDocument(Document): + """ Encryptable and syncable document. @@ -107,5 +108,3 @@ class SoledadDocument(Document): _get_rev, _set_rev, doc="Wrapper to ensure `doc.rev` is always returned as bytes.") - - diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py index 7c2d7296..3a7eadd2 100644 --- a/common/src/leap/soledad/common/errors.py +++ b/common/src/leap/soledad/common/errors.py @@ -25,11 +25,49 @@ from u1db import errors from u1db.remote import http_errors +def register_exception(cls): + """ + A small decorator that registers exceptions in u1db maps. + """ + # update u1db "wire description to status" and "wire description to + # exception" maps. + http_errors.wire_description_to_status.update({ + cls.wire_description: cls.status}) + errors.wire_description_to_exc.update({ + cls.wire_description: cls}) + # do not modify the exception + return cls + + +class SoledadError(errors.U1DBError): + """ + Base Soledad HTTP errors. + """ + pass + + # -# LockResource: a lock based on a document in the shared database. +# Authorization errors # -class InvalidTokenError(errors.U1DBError): +@register_exception +class InvalidAuthTokenError(errors.Unauthorized): + """ + Exception raised when failing to get authorization for some action because + the provided token either does not exist in the tokens database, has a + distinct structure from the expected one, or is associated with a user + with a distinct uuid than the one provided by the client. + """ + + wire_descrition = "invalid auth token" + status = 401 + +# +# LockResource errors +# + +@register_exception +class InvalidTokenError(SoledadError): """ Exception raised when trying to unlock shared database with invalid token. """ @@ -38,7 +76,8 @@ class InvalidTokenError(errors.U1DBError): status = 401 -class NotLockedError(errors.U1DBError): +@register_exception +class NotLockedError(SoledadError): """ Exception raised when trying to unlock shared database when it is not locked. @@ -48,7 +87,8 @@ class NotLockedError(errors.U1DBError): status = 404 -class AlreadyLockedError(errors.U1DBError): +@register_exception +class AlreadyLockedError(SoledadError): """ Exception raised when trying to lock shared database but it is already locked. @@ -57,13 +97,83 @@ class AlreadyLockedError(errors.U1DBError): wire_description = "lock is locked" status = 403 -# update u1db "wire description to status" and "wire description to exception" -# maps. -for e in [InvalidTokenError, NotLockedError, AlreadyLockedError]: - http_errors.wire_description_to_status.update({ - e.wire_description: e.status}) - errors.wire_description_to_exc.update({ - e.wire_description: e}) + +@register_exception +class LockTimedOutError(SoledadError): + """ + Exception raised when timing out while trying to lock the shared database. + """ + + wire_description = "lock timed out" + status = 408 + + +@register_exception +class CouldNotObtainLockError(SoledadError): + """ + Exception raised when timing out while trying to lock the shared database. + """ + + wire_description = "error obtaining lock" + status = 500 + + +# +# CouchDatabase errors +# + +@register_exception +class MissingDesignDocError(SoledadError): + """ + Raised when trying to access a missing couch design document. + """ + + wire_description = "missing design document" + status = 500 + + +@register_exception +class MissingDesignDocNamedViewError(SoledadError): + """ + Raised when trying to access a missing named view on a couch design + document. + """ + + wire_description = "missing design document named function" + status = 500 + + +@register_exception +class MissingDesignDocListFunctionError(SoledadError): + """ + Raised when trying to access a missing list function on a couch design + document. + """ + + wire_description = "missing design document list function" + status = 500 + + +@register_exception +class MissingDesignDocDeletedError(SoledadError): + """ + Raised when trying to access a deleted couch design document. + """ + + wire_description = "design document was deleted" + status = 500 + + +@register_exception +class DesignDocUnknownError(SoledadError): + """ + Raised when trying to access a couch design document and getting an + unknown error. + """ + + wire_description = "missing design document unknown error" + status = 500 + # u1db error statuses also have to be updated http_errors.ERROR_STATUSES = set( diff --git a/common/src/leap/soledad/common/objectstore.py b/common/src/leap/soledad/common/objectstore.py deleted file mode 100644 index 7aff3e32..00000000 --- a/common/src/leap/soledad/common/objectstore.py +++ /dev/null @@ -1,282 +0,0 @@ -# -*- coding: utf-8 -*- -# objectstore.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -Abstract U1DB backend to handle storage using object stores (like CouchDB, for -example). - -This backend uses special documents to store all U1DB data (replica uid, -indexes, transaction logs and info about other dbs). The id of these documents -are reserved and have prefix equal to ObjectStore.U1DB_DATA_DOC_ID_PREFIX. - -Right now, this is only used by CouchDatabase backend, but can also be -extended to implement OpenStack or Amazon S3 storage, for example. - -See U1DB documentation for more information on how to use databases. -""" - - -from base64 import b64encode, b64decode - - -import uuid -import simplejson as json - - -from u1db import errors -from u1db.backends.inmemory import ( - InMemoryDatabase, - InMemorySyncTarget, -) - - -class ObjectStoreDatabase(InMemoryDatabase): - """ - A backend for storing u1db data in an object store. - """ - - U1DB_DATA_DOC_ID_PREFIX = 'u1db/' - - @classmethod - def open_database(cls, url, create, document_factory=None): - """ - Open a U1DB database using an object store as backend. - - @param url: the url of the database replica - @type url: str - @param create: should the replica be created if it does not exist? - @type create: bool - @param document_factory: A function that will be called with the same - parameters as Document.__init__. - @type document_factory: callable - - @return: the database instance - @rtype: CouchDatabase - """ - raise NotImplementedError(cls.open_database) - - def __init__(self, replica_uid=None, document_factory=None): - """ - Initialize the object store database. - - @param replica_uid: an optional unique replica identifier - @type replica_uid: str - @param document_factory: A function that will be called with the same - parameters as Document.__init__. - @type document_factory: callable - """ - InMemoryDatabase.__init__( - self, - replica_uid, - document_factory=document_factory) - if self._replica_uid is None: - self._replica_uid = uuid.uuid4().hex - self._init_u1db_data() - - def _init_u1db_data(self): - """ - Initialize u1db configuration data on backend storage. - - A U1DB database needs to keep track of all database transactions, - document conflicts, the generation of other replicas it has seen, - indexes created by users and so on. - - In this implementation, all this information is stored in special - documents stored in the couch db with id prefix equal to - U1DB_DATA_DOC_ID_PREFIX. Those documents ids are reserved: - put_doc(), get_doc() and delete_doc() will not allow documents with - a doc_id with that prefix to be accessed or modified. - """ - raise NotImplementedError(self._init_u1db_data) - - #------------------------------------------------------------------------- - # methods from Database - #------------------------------------------------------------------------- - - def put_doc(self, doc): - """ - Update a document. - - If the document currently has conflicts, put will fail. - If the database specifies a maximum document size and the document - exceeds it, put will fail and raise a DocumentTooBig exception. - - This method prevents from updating the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc: A Document with new content. - @type doc: Document - - @return: new_doc_rev - The new revision identifier for the document. - The Document object will also be updated. - @rtype: str - """ - if doc.doc_id is not None and \ - doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - return InMemoryDatabase.put_doc(self, doc) - - def _put_doc(self, doc): - """ - Update a document. - - This is called everytime we just want to do a raw put on the db (i.e. - without index updates, document constraint checks, and conflict - checks). - - @param doc: The document to update. - @type doc: u1db.Document - - @return: The new revision identifier for the document. - @rtype: str - """ - raise NotImplementedError(self._put_doc) - - def get_doc(self, doc_id, include_deleted=False): - """ - Get the JSON string for the given document. - - This method prevents from getting the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise asking for a deleted - document will return None. - @type include_deleted: bool - - @return: a Document object. - @rtype: Document - """ - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - return InMemoryDatabase.get_doc(self, doc_id, include_deleted) - - def _get_doc(self, doc_id): - """ - Get just the document content, without fancy handling. - - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise asking for a deleted - document will return None. - @type include_deleted: bool - - @return: a Document object. - @type: u1db.Document - """ - raise NotImplementedError(self._get_doc) - - def get_all_docs(self, include_deleted=False): - """ - Get the JSON content for all documents in the database. - - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise deleted documents will not - be included in the results. - @type include_deleted: bool - - @return: (generation, [Document]) - The current generation of the database, followed by a list of all - the documents in the database. - @rtype: tuple - """ - generation = self._get_generation() - results = [] - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue - doc = self._get_doc(doc_id, check_for_conflicts=True) - if doc.content is None and not include_deleted: - continue - results.append(doc) - return (generation, results) - - def delete_doc(self, doc): - """ - Mark a document as deleted. - - This method prevents from deleting the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc: The document to mark as deleted. - @type doc: u1db.Document - - @return: The new revision id of the document. - @type: str - """ - if doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) - if old_doc is None: - raise errors.DocumentDoesNotExist - if old_doc.rev != doc.rev: - raise errors.RevisionConflict() - if old_doc.is_tombstone(): - raise errors.DocumentAlreadyDeleted - if old_doc.has_conflicts: - raise errors.ConflictedDoc() - new_rev = self._allocate_doc_rev(doc.rev) - doc.rev = new_rev - doc.make_tombstone() - self._put_and_update_indexes(old_doc, doc) - return new_rev - - # index-related methods - - def create_index(self, index_name, *index_expressions): - """ - Create a named index, which can then be queried for future lookups. - - See U1DB documentation for more information. - - @param index_name: A unique name which can be used as a key prefix. - @param index_expressions: Index expressions defining the index - information. - """ - raise NotImplementedError(self.create_index) - - #------------------------------------------------------------------------- - # implemented methods from CommonBackend - #------------------------------------------------------------------------- - - def _put_and_update_indexes(self, old_doc, doc): - """ - Update a document and all indexes related to it. - - @param old_doc: The old version of the document. - @type old_doc: u1db.Document - @param doc: The new version of the document. - @type doc: u1db.Document - """ - for index in self._indexes.itervalues(): - if old_doc is not None and not old_doc.is_tombstone(): - index.remove_json(old_doc.doc_id, old_doc.get_json()) - if not doc.is_tombstone(): - index.add_json(doc.doc_id, doc.get_json()) - trans_id = self._allocate_transaction_id() - self._put_doc(doc) - self._transaction_log.append((doc.doc_id, trans_id)) - - -class ObjectStoreSyncTarget(InMemorySyncTarget): - """ - Functionality for using an ObjectStore as a synchronization target. - """ diff --git a/common/src/leap/soledad/common/tests/couchdb.ini.template b/common/src/leap/soledad/common/tests/couchdb.ini.template index 7d0316f0..1fc2205b 100644 --- a/common/src/leap/soledad/common/tests/couchdb.ini.template +++ b/common/src/leap/soledad/common/tests/couchdb.ini.template @@ -6,7 +6,7 @@ database_dir = %(tempdir)s/lib view_index_dir = %(tempdir)s/lib max_document_size = 4294967296 ; 4 GB -os_process_timeout = 5000 ; 5 seconds. for view and external servers. +os_process_timeout = 120000 ; 120 seconds. for view and external servers. max_dbs_open = 100 delayed_commits = true ; set this to false to ensure an fsync before 201 Created is returned uri_file = %(tempdir)s/lib/couch.uri @@ -74,6 +74,8 @@ use_users_db = false [query_servers] ; javascript = %(tempdir)s/server/main.js +javascript = /usr/bin/couchjs /usr/share/couchdb/server/main.js +coffeescript = /usr/bin/couchjs /usr/share/couchdb/server/main-coffee.js ; Changing reduce_limit to false will disable reduce_limit. @@ -219,4 +221,4 @@ min_file_size = 131072 ;[admins] ;testuser = -hashed-f50a252c12615697c5ed24ec5cd56b05d66fe91e,b05471ba260132953930cf9f97f327f5 -; pass for above user is 'testpass'
\ No newline at end of file +; pass for above user is 'testpass' diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 42edf9fe..86bb4b93 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -24,14 +24,17 @@ import re import copy import shutil from base64 import b64decode +from mock import Mock + +from couchdb.client import Server +from u1db import errors as u1db_errors from leap.common.files import mkdir_p -from leap.soledad.common.document import SoledadDocument from leap.soledad.common.tests import u1db_tests as tests from leap.soledad.common.tests.u1db_tests import test_backends from leap.soledad.common.tests.u1db_tests import test_sync -from leap.soledad.common import couch +from leap.soledad.common import couch, errors import simplejson as json @@ -78,9 +81,10 @@ class CouchDBWrapper(object): mkdir_p(os.path.join(self.tempdir, 'lib')) mkdir_p(os.path.join(self.tempdir, 'log')) args = ['couchdb', '-n', '-a', confPath] - #null = open('/dev/null', 'w') + null = open('/dev/null', 'w') + self.process = subprocess.Popen( - args, env=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + args, env=None, stdout=null.fileno(), stderr=null.fileno(), close_fds=True) # find port logPath = os.path.join(self.tempdir, 'log', 'couch.log') @@ -123,21 +127,21 @@ class CouchDBTestCase(unittest.TestCase): TestCase base class for tests against a real CouchDB server. """ - def setUp(self): + @classmethod + def setUpClass(cls): """ Make sure we have a CouchDB instance for a test. """ - self.wrapper = CouchDBWrapper() - self.wrapper.start() + cls.wrapper = CouchDBWrapper() + cls.wrapper.start() #self.db = self.wrapper.db - unittest.TestCase.setUp(self) - def tearDown(self): + @classmethod + def tearDownClass(cls): """ Stop CouchDB instance for test. """ - self.wrapper.stop() - unittest.TestCase.tearDown(self) + cls.wrapper.stop() #----------------------------------------------------------------------------- @@ -148,7 +152,7 @@ class TestCouchBackendImpl(CouchDBTestCase): def test__allocate_doc_id(self): db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') + 'u1db_tests', ensure_ddocs=True) doc_id1 = db._allocate_doc_id() self.assertTrue(doc_id1.startswith('D-')) self.assertEqual(34, len(doc_id1)) @@ -163,32 +167,51 @@ class TestCouchBackendImpl(CouchDBTestCase): def make_couch_database_for_test(test, replica_uid): port = str(test.wrapper.port) return couch.CouchDatabase('http://localhost:' + port, replica_uid, - replica_uid=replica_uid or 'test') + replica_uid=replica_uid or 'test', + ensure_ddocs=True) def copy_couch_database_for_test(test, db): port = str(test.wrapper.port) - new_db = couch.CouchDatabase('http://localhost:' + port, - db._replica_uid + '_copy', + couch_url = 'http://localhost:' + port + new_dbname = db._replica_uid + '_copy' + new_db = couch.CouchDatabase(couch_url, + new_dbname, replica_uid=db._replica_uid or 'test') - gen, docs = db.get_all_docs(include_deleted=True) - for doc in docs: - new_db._put_doc(doc) - new_db._transaction_log = copy.deepcopy(db._transaction_log) - new_db._conflicts = copy.deepcopy(db._conflicts) - new_db._other_generations = copy.deepcopy(db._other_generations) - new_db._indexes = copy.deepcopy(db._indexes) - # save u1db data on couch - for key in new_db.U1DB_DATA_KEYS: - doc_id = '%s%s' % (new_db.U1DB_DATA_DOC_ID_PREFIX, key) - doc = new_db._get_doc(doc_id) - doc.content = {'content': getattr(new_db, key)} - new_db._put_doc(doc) + # copy all docs + old_couch_db = Server(couch_url)[db._replica_uid] + new_couch_db = Server(couch_url)[new_dbname] + for doc_id in old_couch_db: + doc = old_couch_db.get(doc_id) + # copy design docs + if ('u1db_rev' not in doc): + new_couch_db.save(doc) + # copy u1db docs + else: + new_doc = { + '_id': doc['_id'], + 'u1db_transactions': doc['u1db_transactions'], + 'u1db_rev': doc['u1db_rev'] + } + attachments = [] + if ('u1db_conflicts' in doc): + new_doc['u1db_conflicts'] = doc['u1db_conflicts'] + for c_rev in doc['u1db_conflicts']: + attachments.append('u1db_conflict_%s' % c_rev) + new_couch_db.save(new_doc) + # save conflict data + attachments.append('u1db_content') + for att_name in attachments: + att = old_couch_db.get_attachment(doc_id, att_name) + if (att is not None): + new_couch_db.put_attachment(new_doc, att, + filename=att_name) return new_db def make_document_for_test(test, doc_id, rev, content, has_conflicts=False): - return SoledadDocument(doc_id, rev, content, has_conflicts=has_conflicts) + return couch.CouchDocument( + doc_id, rev, content, has_conflicts=has_conflicts) COUCH_SCENARIOS = [ @@ -202,8 +225,22 @@ class CouchTests(test_backends.AllDatabaseTests, CouchDBTestCase): scenarios = COUCH_SCENARIOS + def setUp(self): + test_backends.AllDatabaseTests.setUp(self) + # save db info because of test_close + self._server = self.db._server + self._dbname = self.db._dbname + def tearDown(self): - self.db.delete_database() + # if current test is `test_close` we have to use saved objects to + # delete the database because the close() method will have removed the + # references needed to do it using the CouchDatabase. + if self.id() == \ + 'leap.soledad.common.tests.test_couch.CouchTests.' \ + 'test_close(couch)': + del(self._server[self._dbname]) + else: + self.db.delete_database() test_backends.AllDatabaseTests.tearDown(self) @@ -246,17 +283,16 @@ class CouchWithConflictsTests( test_backends.LocalDatabaseWithConflictsTests.tearDown(self) -# Notice: the CouchDB backend is currently used for storing encrypted data in -# the server, so indexing makes no sense. Thus, we ignore index testing for -# now. - -class CouchIndexTests(test_backends.DatabaseIndexTests, CouchDBTestCase): - - scenarios = COUCH_SCENARIOS +# Notice: the CouchDB backend does not have indexing capabilities, so we do +# not test indexing now. - def tearDown(self): - self.db.delete_database() - test_backends.DatabaseIndexTests.tearDown(self) +#class CouchIndexTests(test_backends.DatabaseIndexTests, CouchDBTestCase): +# +# scenarios = COUCH_SCENARIOS +# +# def tearDown(self): +# self.db.delete_database() +# test_backends.DatabaseIndexTests.tearDown(self) #----------------------------------------------------------------------------- @@ -311,6 +347,89 @@ class CouchDatabaseSyncTargetTests(test_sync.DatabaseSyncTargetTests, [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]}) +# The following tests need that the database have an index, so we fake one. +old_class = couch.CouchDatabase + +from u1db.backends.inmemory import InMemoryIndex + + +class IndexedCouchDatabase(couch.CouchDatabase): + + def __init__(self, url, dbname, replica_uid=None, full_commit=True, + session=None, ensure_ddocs=True): + old_class.__init__(self, url, dbname, replica_uid, full_commit, + session, ensure_ddocs=ensure_ddocs) + self._indexes = {} + + def _put_doc(self, old_doc, doc): + for index in self._indexes.itervalues(): + if old_doc is not None and not old_doc.is_tombstone(): + index.remove_json(old_doc.doc_id, old_doc.get_json()) + if not doc.is_tombstone(): + index.add_json(doc.doc_id, doc.get_json()) + old_class._put_doc(self, old_doc, doc) + + def create_index(self, index_name, *index_expressions): + if index_name in self._indexes: + if self._indexes[index_name]._definition == list( + index_expressions): + return + raise u1db_errors.IndexNameTakenError + index = InMemoryIndex(index_name, list(index_expressions)) + _, all_docs = self.get_all_docs() + for doc in all_docs: + index.add_json(doc.doc_id, doc.get_json()) + self._indexes[index_name] = index + + def delete_index(self, index_name): + del self._indexes[index_name] + + def list_indexes(self): + definitions = [] + for idx in self._indexes.itervalues(): + definitions.append((idx._name, idx._definition)) + return definitions + + def get_from_index(self, index_name, *key_values): + try: + index = self._indexes[index_name] + except KeyError: + raise u1db_errors.IndexDoesNotExist + doc_ids = index.lookup(key_values) + result = [] + for doc_id in doc_ids: + result.append(self._get_doc(doc_id, check_for_conflicts=True)) + return result + + def get_range_from_index(self, index_name, start_value=None, + end_value=None): + """Return all documents with key values in the specified range.""" + try: + index = self._indexes[index_name] + except KeyError: + raise u1db_errors.IndexDoesNotExist + if isinstance(start_value, basestring): + start_value = (start_value,) + if isinstance(end_value, basestring): + end_value = (end_value,) + doc_ids = index.lookup_range(start_value, end_value) + result = [] + for doc_id in doc_ids: + result.append(self._get_doc(doc_id, check_for_conflicts=True)) + return result + + def get_index_keys(self, index_name): + try: + index = self._indexes[index_name] + except KeyError: + raise u1db_errors.IndexDoesNotExist + keys = index.keys() + # XXX inefficiency warning + return list(set([tuple(key.split('\x01')) for key in keys])) + + +couch.CouchDatabase = IndexedCouchDatabase + sync_scenarios = [] for name, scenario in COUCH_SCENARIOS: scenario = dict(scenario) @@ -344,98 +463,184 @@ class CouchDatabaseSyncTests(test_sync.DatabaseSyncTests, CouchDBTestCase): test_sync.DatabaseSyncTests.tearDown(self) -#----------------------------------------------------------------------------- -# The following tests test extra functionality introduced by our backends -#----------------------------------------------------------------------------- - -class CouchDatabaseStorageTests(CouchDBTestCase): - - def _listify(self, l): - if type(l) is dict: - return { - self._listify(a): self._listify(b) for a, b in l.iteritems()} - if hasattr(l, '__iter__'): - return [self._listify(i) for i in l] - return l - - def _fetch_u1db_data(self, db, key): - doc = db._get_doc("%s%s" % (db.U1DB_DATA_DOC_ID_PREFIX, key)) - return doc.content['content'] - - def test_transaction_log_storage_after_put(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - db.create_doc({'simple': 'doc'}) - content = self._fetch_u1db_data(db, db.U1DB_TRANSACTION_LOG_KEY) - self.assertEqual( - self._listify(db._transaction_log), - self._listify(content)) - - def test_conflict_log_storage_after_put_if_newer(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - doc = db.create_doc({'simple': 'doc'}) - doc.set_json(nested_doc) - doc.rev = db._replica_uid + ':2' - db._force_doc_sync_conflict(doc) - content = self._fetch_u1db_data(db, db.U1DB_CONFLICTS_KEY) - self.assertEqual( - self._listify(db._conflicts), - self._listify(content)) - - def test_other_gens_storage_after_set(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - doc = db.create_doc({'simple': 'doc'}) - db._set_replica_gen_and_trans_id('a', 'b', 'c') - content = self._fetch_u1db_data(db, db.U1DB_OTHER_GENERATIONS_KEY) - self.assertEqual( - self._listify(db._other_generations), - self._listify(content)) +class CouchDatabaseExceptionsTests(CouchDBTestCase): - def test_index_storage_after_create(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - doc = db.create_doc({'name': 'john'}) - db.create_index('myindex', 'name') - content = self._fetch_u1db_data(db, db.U1DB_INDEXES_KEY) - myind = db._indexes['myindex'] - index = { - 'myindex': { - 'definition': myind._definition, - 'name': myind._name, - 'values': myind._values, - } - } - self.assertEqual( - self._listify(index), - self._listify(content)) + def setUp(self): + CouchDBTestCase.setUp(self) + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=False) # note that we don't enforce ddocs here - def test_index_storage_after_delete(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - doc = db.create_doc({'name': 'john'}) - db.create_index('myindex', 'name') - db.create_index('myindex2', 'name') - db.delete_index('myindex') - content = self._fetch_u1db_data(db, db.U1DB_INDEXES_KEY) - myind = db._indexes['myindex2'] - index = { - 'myindex2': { - 'definition': myind._definition, - 'name': myind._name, - 'values': myind._values, - } - } - self.assertEqual( - self._listify(index), - self._listify(content)) + def tearDown(self): + self.db.delete_database() - def test_replica_uid_storage_after_db_creation(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - content = self._fetch_u1db_data(db, db.U1DB_REPLICA_UID_KEY) - self.assertEqual(db._replica_uid, content) + def test_missing_design_doc_raises(self): + """ + Test that all methods that access design documents will raise if the + design docs are not present. + """ + # _get_generation() + self.assertRaises( + errors.MissingDesignDocError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocError, + self.db._get_trans_id_for_gen, 1) + # _get_transaction_log() + self.assertRaises( + errors.MissingDesignDocError, + self.db._get_transaction_log) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocError, + self.db.whats_changed) + # _do_set_replica_gen_and_trans_id() + self.assertRaises( + errors.MissingDesignDocError, + self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) + + def test_missing_design_doc_functions_raises(self): + """ + Test that all methods that access design documents list functions + will raise if the functions are not present. + """ + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=True) + # erase views from _design/transactions + transactions = self.db._database['_design/transactions'] + transactions['lists'] = {} + self.db._database.save(transactions) + # _get_generation() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_trans_id_for_gen, 1) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db.whats_changed) + + def test_absent_design_doc_functions_raises(self): + """ + Test that all methods that access design documents list functions + will raise if the functions are not present. + """ + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=True) + # erase views from _design/transactions + transactions = self.db._database['_design/transactions'] + del transactions['lists'] + self.db._database.save(transactions) + # _get_generation() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db._get_trans_id_for_gen, 1) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db.whats_changed) + + def test_missing_design_doc_named_views_raises(self): + """ + Test that all methods that access design documents' named views will + raise if the views are not present. + """ + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=True) + # erase views from _design/docs + docs = self.db._database['_design/docs'] + del docs['views'] + self.db._database.save(docs) + # erase views from _design/syncs + syncs = self.db._database['_design/syncs'] + del syncs['views'] + self.db._database.save(syncs) + # erase views from _design/transactions + transactions = self.db._database['_design/transactions'] + del transactions['views'] + self.db._database.save(transactions) + # _get_generation() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db._get_trans_id_for_gen, 1) + # _get_transaction_log() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db._get_transaction_log) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db.whats_changed) + + def test_deleted_design_doc_raises(self): + """ + Test that all methods that access design documents will raise if the + design docs are not present. + """ + self.db = couch.CouchDatabase( + 'http://127.0.0.1:%d' % self.wrapper.port, 'test', + ensure_ddocs=True) + # delete _design/docs + del self.db._database['_design/docs'] + # delete _design/syncs + del self.db._database['_design/syncs'] + # delete _design/transactions + del self.db._database['_design/transactions'] + # _get_generation() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._get_generation) + # _get_generation_info() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._get_trans_id_for_gen, 1) + # _get_transaction_log() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._get_transaction_log) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db.whats_changed) + # _do_set_replica_gen_and_trans_id() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db._do_set_replica_gen_and_trans_id, 1, 2, 3) load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py new file mode 100644 index 00000000..3c457cc5 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -0,0 +1,389 @@ +# -*- coding: utf-8 -*- +# test_soledad.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +""" + +import os +import mock +import tempfile +import threading + +from leap.soledad.client import Soledad +from leap.soledad.common.couch import CouchDatabase, CouchServerState +from leap.soledad.common.tests.test_couch import CouchDBTestCase +from leap.soledad.common.tests.u1db_tests import TestCaseWithServer +from leap.soledad.common.tests.test_target import ( + make_token_soledad_app, + make_leap_document_for_test, + token_leap_sync_target, +) +from leap.soledad.common.tests.test_server import _couch_ensure_database + + +REPEAT_TIMES = 20 + + +# monkey path CouchServerState so it can ensure databases. + +CouchServerState.ensure_database = _couch_ensure_database + + +class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): + + @staticmethod + def make_app_after_state(state): + return make_token_soledad_app(state) + + make_document_for_test = make_leap_document_for_test + + sync_target = token_leap_sync_target + + def _soledad_instance(self, user='user-uuid', passphrase=u'123', + prefix='', + secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME, + local_db_path='soledad.u1db', server_url='', + cert_file=None, auth_token=None, secret_id=None): + """ + Instantiate Soledad. + """ + + # this callback ensures we save a document which is sent to the shared + # db. + def _put_doc_side_effect(doc): + self._doc_put = doc + + # we need a mocked shared db or else Soledad will try to access the + # network to find if there are uploaded secrets. + class MockSharedDB(object): + + get_doc = mock.Mock(return_value=None) + put_doc = mock.Mock(side_effect=_put_doc_side_effect) + lock = mock.Mock(return_value=('atoken', 300)) + unlock = mock.Mock() + + def __call__(self): + return self + + Soledad._shared_db = MockSharedDB() + return Soledad( + user, + passphrase, + secrets_path=os.path.join(self.tempdir, prefix, secrets_path), + local_db_path=os.path.join( + self.tempdir, prefix, local_db_path), + server_url=server_url, + cert_file=cert_file, + auth_token=auth_token, + secret_id=secret_id) + + def make_app(self): + self.request_state = CouchServerState(self._couch_url, 'shared', + 'tokens') + return self.make_app_after_state(self.request_state) + + def setUp(self): + TestCaseWithServer.setUp(self) + CouchDBTestCase.setUp(self) + self._couch_url = 'http://localhost:' + str(self.wrapper.port) + self.db = CouchDatabase( + self._couch_url, 'user-user-uuid', replica_uid='replica') + self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") + + def tearDown(self): + self.db.delete_database() + CouchDBTestCase.tearDown(self) + TestCaseWithServer.tearDown(self) + + # + # Sequential tests + # + + def test_correct_transaction_log_after_sequential_puts(self): + """ + Assert that the transaction_log increases accordingly with sequential + puts. + """ + doc = self.db.create_doc({'ops': 0}) + ops = 1 + docs = [doc.doc_id] + for i in range(0, REPEAT_TIMES): + self.assertEqual( + i+1, len(self.db._get_transaction_log())) + doc.content['ops'] += 1 + self.db.put_doc(doc) + docs.append(doc.doc_id) + + # assert length of transaction_log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + REPEAT_TIMES+1, len(transaction_log)) + + # assert that all entries in the log belong to the same doc + self.assertEqual(REPEAT_TIMES+1, len(docs)) + for doc_id in docs: + self.assertEqual( + REPEAT_TIMES+1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_transaction_log_after_sequential_deletes(self): + """ + Assert that the transaction_log increases accordingly with sequential + puts and deletes. + """ + docs = [] + for i in range(0, REPEAT_TIMES): + doc = self.db.create_doc({'ops': 0}) + self.assertEqual( + 2*i+1, len(self.db._get_transaction_log())) + docs.append(doc.doc_id) + self.db.delete_doc(doc) + self.assertEqual( + 2*i+2, len(self.db._get_transaction_log())) + + # assert length of transaction_log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + 2*REPEAT_TIMES, len(transaction_log)) + + # assert that each doc appears twice in the transaction_log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 2, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_sync_log_after_sequential_syncs(self): + """ + Assert that the sync_log increases accordingly with sequential syncs. + """ + self.startServer() + sol = self._soledad_instance( + auth_token='auth-token', + server_url=self.getURL()) + + def _create_docs_and_sync(sol, syncs): + # create a lot of documents + for i in range(0, REPEAT_TIMES): + sol.create_doc({}) + # assert sizes of transaction and sync logs + self.assertEqual( + syncs*REPEAT_TIMES, + len(self.db._get_transaction_log())) + self.assertEqual( + 1 if syncs > 0 else 0, + len(self.db._database.view('syncs/log').rows)) + # sync to the remote db + sol.sync() + gen, docs = self.db.get_all_docs() + self.assertEqual((syncs+1)*REPEAT_TIMES, gen) + self.assertEqual((syncs+1)*REPEAT_TIMES, len(docs)) + # assert sizes of transaction and sync logs + self.assertEqual((syncs+1)*REPEAT_TIMES, + len(self.db._get_transaction_log())) + sync_log_rows = self.db._database.view('syncs/log').rows + sync_log = sync_log_rows[0].value + replica_uid = sync_log_rows[0].key + known_gen = sync_log['known_generation'] + known_trans_id = sync_log['known_transaction_id'] + # assert sync_log has exactly 1 row + self.assertEqual(1, len(sync_log_rows)) + # assert it has the correct replica_uid, gen and trans_id + self.assertEqual(sol._db._replica_uid, replica_uid) + sol_gen, sol_trans_id = sol._db._get_generation_info() + self.assertEqual(sol_gen, known_gen) + self.assertEqual(sol_trans_id, known_trans_id) + + _create_docs_and_sync(sol, 0) + _create_docs_and_sync(sol, 1) + + # + # Concurrency tests + # + + class _WorkerThread(threading.Thread): + + def __init__(self, params, run_method): + threading.Thread.__init__(self) + self._params = params + self._run_method = run_method + + def run(self): + self._run_method(self) + + def test_correct_transaction_log_after_concurrent_puts(self): + """ + Assert that the transaction_log increases accordingly with concurrent + puts. + """ + pool = threading.BoundedSemaphore(value=1) + threads = [] + docs = [] + + def _run_method(self): + doc = self._params['db'].create_doc({}) + pool.acquire() + self._params['docs'].append(doc.doc_id) + pool.release() + + + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread( + {'docs': docs, 'db': self.db}, + _run_method) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + # assert length of transaction_log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + REPEAT_TIMES, len(transaction_log)) + + # assert all documents are in the log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_transaction_log_after_concurrent_deletes(self): + """ + Assert that the transaction_log increases accordingly with concurrent + puts and deletes. + """ + threads = [] + docs = [] + pool = threading.BoundedSemaphore(value=1) + + # create/delete method that will be run concurrently + def _run_method(self): + doc = self._params['db'].create_doc({}) + pool.acquire() + docs.append(doc.doc_id) + pool.release() + self._params['db'].delete_doc(doc) + + # launch concurrent threads + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread({'db': self.db}, _run_method) + thread.start() + threads.append(thread) + + # wait for threads to finish + for thread in threads: + thread.join() + + # assert transaction log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + 2*REPEAT_TIMES, len(transaction_log)) + # assert that each doc appears twice in the transaction_log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 2, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_sync_log_after_concurrent_puts_and_sync(self): + """ + Assert that the sync_log is correct after concurrent syncs. + """ + threads = [] + docs = [] + pool = threading.BoundedSemaphore(value=1) + self.startServer() + sol = self._soledad_instance( + auth_token='auth-token', + server_url=self.getURL()) + + def _run_method(self): + # create a lot of documents + doc = self._params['sol'].create_doc({}) + pool.acquire() + docs.append(doc.doc_id) + pool.release() + + # launch threads to create documents in parallel + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread( + {'sol': sol, 'syncs': i}, + _run_method) + thread.start() + threads.append(thread) + + # wait for threads to finish + for thread in threads: + thread.join() + + # do the sync! + sol.sync() + + transaction_log = self.db._get_transaction_log() + self.assertEqual(REPEAT_TIMES, len(transaction_log)) + # assert all documents are in the remote log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_concurrent_syncs_do_not_fail(self): + """ + Assert that concurrent attempts to sync end up being executed + sequentially and do not fail. + """ + threads = [] + docs = [] + pool = threading.BoundedSemaphore(value=1) + self.startServer() + sol = self._soledad_instance( + auth_token='auth-token', + server_url=self.getURL()) + + def _run_method(self): + # create a lot of documents + doc = self._params['sol'].create_doc({}) + # do the sync! + sol.sync() + pool.acquire() + docs.append(doc.doc_id) + pool.release() + + # launch threads to create documents in parallel + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread( + {'sol': sol, 'syncs': i}, + _run_method) + thread.start() + threads.append(thread) + + # wait for threads to finish + for thread in threads: + thread.join() + + transaction_log = self.db._get_transaction_log() + self.assertEqual(REPEAT_TIMES, len(transaction_log)) + # assert all documents are in the remote log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index 83df192b..f8d2a64f 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -25,6 +25,7 @@ import tempfile import simplejson as json import mock import time +import binascii from leap.common.testing.basetest import BaseLeapTest @@ -50,6 +51,17 @@ from leap.soledad.server import SoledadApp, LockResource from leap.soledad.server.auth import URLToAuthorization +# monkey path CouchServerState so it can ensure databases. + +def _couch_ensure_database(self, dbname): + db = CouchDatabase.open_database( + self._couch_url + '/' + dbname, + create=True) + return db, db._replica_uid + +CouchServerState.ensure_database = _couch_ensure_database + + class ServerAuthorizationTestCase(BaseLeapTest): """ Tests related to Soledad server authorization. @@ -339,15 +351,16 @@ class EncryptedSyncTestCase( _, doclist = sol1.get_all_docs() self.assertEqual([], doclist) doc1 = sol1.create_doc(json.loads(simple_doc)) - # sync with server - sol1._server_url = self.getURL() - sol1.sync() - # assert doc was sent to couch db + # ensure remote db exists before syncing db = CouchDatabase( self._couch_url, # the name of the user database is "user-<uuid>". 'user-user-uuid', ) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # assert doc was sent to couch db _, doclist = db.get_all_docs() self.assertEqual(1, len(doclist)) couchdoc = doclist[0] @@ -376,6 +389,7 @@ class EncryptedSyncTestCase( doc2 = doclist[0] # assert incoming doc is equal to the first sent doc self.assertEqual(doc1, doc2) + db.delete_database() def test_encrypted_sym_sync_with_unicode_passphrase(self): """ @@ -393,15 +407,16 @@ class EncryptedSyncTestCase( _, doclist = sol1.get_all_docs() self.assertEqual([], doclist) doc1 = sol1.create_doc(json.loads(simple_doc)) - # sync with server - sol1._server_url = self.getURL() - sol1.sync() - # assert doc was sent to couch db + # ensure remote db exists before syncing db = CouchDatabase( self._couch_url, # the name of the user database is "user-<uuid>". 'user-user-uuid', ) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # assert doc was sent to couch db _, doclist = db.get_all_docs() self.assertEqual(1, len(doclist)) couchdoc = doclist[0] @@ -434,7 +449,94 @@ class EncryptedSyncTestCase( doc2 = doclist[0] # assert incoming doc is equal to the first sent doc self.assertEqual(doc1, doc2) + db.delete_database() + def test_sync_very_large_files(self): + """ + Test if Soledad can sync very large files. + """ + # define the size of the "very large file" + length = 100*(10**6) # 100 MB + self.startServer() + # instantiate soledad and create a document + sol1 = self._soledad_instance( + # token is verified in test_target.make_token_soledad_app + auth_token='auth-token' + ) + _, doclist = sol1.get_all_docs() + self.assertEqual([], doclist) + content = binascii.hexlify(os.urandom(length/2)) # len() == length + doc1 = sol1.create_doc({'data': content}) + # ensure remote db exists before syncing + db = CouchDatabase( + self._couch_url, + # the name of the user database is "user-<uuid>". + 'user-user-uuid', + ) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # instantiate soledad with empty db, but with same secrets path + sol2 = self._soledad_instance(prefix='x', auth_token='auth-token') + _, doclist = sol2.get_all_docs() + self.assertEqual([], doclist) + sol2._secrets_path = sol1.secrets_path + sol2._load_secrets() + sol2._set_secret_id(sol1._secret_id) + # sync the new instance + sol2._server_url = self.getURL() + sol2.sync() + _, doclist = sol2.get_all_docs() + self.assertEqual(1, len(doclist)) + doc2 = doclist[0] + # assert incoming doc is equal to the first sent doc + self.assertEqual(doc1, doc2) + # delete remote database + db.delete_database() + + + def test_sync_many_small_files(self): + """ + Test if Soledad can sync many smallfiles. + """ + number_of_docs = 100 + self.startServer() + # instantiate soledad and create a document + sol1 = self._soledad_instance( + # token is verified in test_target.make_token_soledad_app + auth_token='auth-token' + ) + _, doclist = sol1.get_all_docs() + self.assertEqual([], doclist) + # create many small files + for i in range(0, number_of_docs): + sol1.create_doc(json.loads(simple_doc)) + # ensure remote db exists before syncing + db = CouchDatabase( + self._couch_url, + # the name of the user database is "user-<uuid>". + 'user-user-uuid', + ) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # instantiate soledad with empty db, but with same secrets path + sol2 = self._soledad_instance(prefix='x', auth_token='auth-token') + _, doclist = sol2.get_all_docs() + self.assertEqual([], doclist) + sol2._secrets_path = sol1.secrets_path + sol2._load_secrets() + sol2._set_secret_id(sol1._secret_id) + # sync the new instance + sol2._server_url = self.getURL() + sol2.sync() + _, doclist = sol2.get_all_docs() + self.assertEqual(number_of_docs, len(doclist)) + # assert incoming docs are equal to sent docs + for doc in doclist: + self.assertEqual(sol1.get_doc(doc.doc_id), doc) + # delete remote database + db.delete_database() class LockResourceTestCase( CouchDBTestCase, TestCaseWithServer): @@ -455,12 +557,21 @@ class LockResourceTestCase( CouchDBTestCase.setUp(self) self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") self._couch_url = 'http://localhost:' + str(self.wrapper.port) + # create the databases + CouchDatabase(self._couch_url, 'shared') + CouchDatabase(self._couch_url, 'tokens') self._state = CouchServerState( self._couch_url, 'shared', 'tokens') def tearDown(self): CouchDBTestCase.tearDown(self) TestCaseWithServer.tearDown(self) + # delete remote database + db = CouchDatabase( + self._couch_url, + 'shared', + ) + db.delete_database() def test__try_obtain_filesystem_lock(self): responder = mock.Mock() diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_http_database.py b/common/src/leap/soledad/common/tests/u1db_tests/test_http_database.py index 3f3c7bba..9251000e 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_http_database.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_http_database.py @@ -133,12 +133,11 @@ class TestHTTPDatabaseSimpleOperations(tests.TestCase): None), self.got) def test_get_doc_deleted_include_deleted(self): - self.response_val = errors.HTTPError(404, - json.dumps( - {"error": errors.DOCUMENT_DELETED} - ), - {'x-u1db-rev': 'doc-rev-gone', - 'x-u1db-has-conflicts': 'false'}) + self.response_val = errors.HTTPError( + 404, + json.dumps({"error": errors.DOCUMENT_DELETED}), + {'x-u1db-rev': 'doc-rev-gone', + 'x-u1db-has-conflicts': 'false'}) doc = self.db.get_doc('deleted', include_deleted=True) self.assertEqual('deleted', doc.doc_id) self.assertEqual('doc-rev-gone', doc.rev) diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_open.py b/common/src/leap/soledad/common/tests/u1db_tests/test_open.py index 13425b4f..63406245 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_open.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_open.py @@ -24,7 +24,8 @@ from u1db import ( ) from leap.soledad.common.tests import u1db_tests as tests from u1db.backends import sqlite_backend -from leap.soledad.common.tests.u1db_tests.test_backends import TestAlternativeDocument +from leap.soledad.common.tests.u1db_tests.test_backends \ + import TestAlternativeDocument class TestU1DBOpen(tests.TestCase): diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_sqlite_backend.py b/common/src/leap/soledad/common/tests/u1db_tests/test_sqlite_backend.py index a53ea6cc..8292dd07 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_sqlite_backend.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_sqlite_backend.py @@ -30,7 +30,8 @@ from u1db import ( from leap.soledad.common.tests import u1db_tests as tests from u1db.backends import sqlite_backend -from leap.soledad.common.tests.u1db_tests.test_backends import TestAlternativeDocument +from leap.soledad.common.tests.u1db_tests.test_backends \ + import TestAlternativeDocument simple_doc = '{"key": "value"}' diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py index 5346d540..1f78f912 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py @@ -760,7 +760,7 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, {'docs': [], 'last_known_gen': 0}, 'return': {'docs': [(doc.doc_id, doc.rev)], - 'last_gen': 1}}) + 'last_gen': 1}}) self.assertEqual([doc], self.db1.get_from_index('test-idx', 'value')) def test_sync_pulling_doesnt_update_other_if_changed(self): @@ -785,7 +785,7 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, {'docs': [], 'last_known_gen': 0}, 'return': {'docs': [(doc.doc_id, doc.rev)], - 'last_gen': 1}}) + 'last_gen': 1}}) self.assertEqual(1, self.db1._get_replica_gen_and_trans_id('test2')[0]) # c2 should not have gotten a '_record_sync_info' call, because the # local database had been updated more than just by the messages @@ -819,8 +819,8 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, self.assertLastExchangeLog(self.db2, {'receive': {'docs': [(doc.doc_id, doc.rev)], - 'source_uid': 'test1', - 'source_gen': 1, 'last_known_gen': 0}, + 'source_uid': 'test1', + 'source_gen': 1, 'last_known_gen': 0}, 'return': {'docs': [], 'last_gen': 1}}) def test_sync_ignores_superseded(self): @@ -839,11 +839,11 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, self.assertLastExchangeLog(self.db1, {'receive': {'docs': [(doc.doc_id, doc_rev1)], - 'source_uid': 'test2', - 'source_gen': 1, 'last_known_gen': 0}, + 'source_uid': 'test2', + 'source_gen': 1, 'last_known_gen': 0}, 'return': {'docs': [(doc.doc_id, doc_rev2)], - 'last_gen': 2}}) + 'last_gen': 2}}) self.assertGetDoc(self.db1, doc.doc_id, doc_rev2, new_content, False) def test_sync_sees_remote_conflicted(self): @@ -861,11 +861,11 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, self.assertLastExchangeLog(self.db2, {'receive': {'docs': [(doc_id, doc1_rev)], - 'source_uid': 'test1', - 'source_gen': 1, 'last_known_gen': 0}, + 'source_uid': 'test1', + 'source_gen': 1, 'last_known_gen': 0}, 'return': {'docs': [(doc_id, doc2_rev)], - 'last_gen': 1}}) + 'last_gen': 1}}) self.assertTransactionLog([doc_id, doc_id], self.db1) self.assertGetDoc(self.db1, doc_id, doc2_rev, new_doc, True) self.assertGetDoc(self.db2, doc_id, doc2_rev, new_doc, False) @@ -892,10 +892,10 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, self.assertLastExchangeLog(self.db2, {'receive': {'docs': [(doc_id, doc1.rev)], - 'source_uid': 'test1', - 'source_gen': 2, 'last_known_gen': 1}, + 'source_uid': 'test1', + 'source_gen': 2, 'last_known_gen': 1}, 'return': {'docs': [(doc_id, doc2.rev)], - 'last_gen': 2}}) + 'last_gen': 2}}) self.assertTransactionLog([doc_id, doc_id, doc_id], self.db1) self.assertGetDocIncludeDeleted(self.db1, doc_id, doc2.rev, None, True) self.assertGetDocIncludeDeleted( @@ -950,8 +950,8 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, self.assertLastExchangeLog(self.db2, {'receive': {'docs': [(doc_id, deleted_rev)], - 'source_uid': 'test1', - 'source_gen': 2, 'last_known_gen': 1}, + 'source_uid': 'test1', + 'source_gen': 2, 'last_known_gen': 1}, 'return': {'docs': [], 'last_gen': 2}}) self.assertGetDocIncludeDeleted( self.db1, doc_id, deleted_rev, None, False) @@ -1121,6 +1121,7 @@ class DatabaseSyncTests(tests.DatabaseBaseTests, class TestDbSync(tests.TestCaseWithServer): + """Test db.sync remote sync shortcut""" scenarios = [ @@ -1189,6 +1190,7 @@ class TestDbSync(tests.TestCaseWithServer): class TestRemoteSyncIntegration(tests.TestCaseWithServer): + """Integration tests for the most common sync scenario local -> remote""" make_app_with_state = staticmethod(make_http_app) @@ -1204,7 +1206,7 @@ class TestRemoteSyncIntegration(tests.TestCaseWithServer): doc12 = self.db1.create_doc_from_json('{"a": 2}') doc21 = self.db2.create_doc_from_json('{"b": 1}') doc22 = self.db2.create_doc_from_json('{"b": 2}') - #sanity + # sanity self.assertEqual(2, len(self.db1._get_transaction_log())) self.assertEqual(2, len(self.db2._get_transaction_log())) progress1 = [] diff --git a/common/versioneer.py b/common/versioneer.py index b43ab062..18dfd923 100644 --- a/common/versioneer.py +++ b/common/versioneer.py @@ -115,7 +115,7 @@ import sys def run_command(args, cwd=None, verbose=False): try: - # remember shell=False, so use git.cmd on windows, not just git + # remember shell=False, so use git.exe on windows, not just git p = subprocess.Popen(args, stdout=subprocess.PIPE, cwd=cwd) except EnvironmentError: e = sys.exc_info()[1] @@ -230,7 +230,7 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): GIT = "git" if sys.platform == "win32": - GIT = "git.cmd" + GIT = "git.exe" stdout = run_command([GIT, "describe", "--tags", "--dirty", "--always"], cwd=root) if stdout is None: @@ -305,7 +305,7 @@ import sys def run_command(args, cwd=None, verbose=False): try: - # remember shell=False, so use git.cmd on windows, not just git + # remember shell=False, so use git.exe on windows, not just git p = subprocess.Popen(args, stdout=subprocess.PIPE, cwd=cwd) except EnvironmentError: e = sys.exc_info()[1] @@ -430,7 +430,7 @@ def versions_from_vcs(tag_prefix, versionfile_source, verbose=False): GIT = "git" if sys.platform == "win32": - GIT = "git.cmd" + GIT = "git.exe" stdout = run_command([GIT, "describe", "--tags", "--dirty", "--always"], cwd=root) if stdout is None: @@ -486,7 +486,7 @@ import sys def do_vcs_install(versionfile_source, ipy): GIT = "git" if sys.platform == "win32": - GIT = "git.cmd" + GIT = "git.exe" run_command([GIT, "add", "versioneer.py"]) run_command([GIT, "add", versionfile_source]) run_command([GIT, "add", ipy]) |