From ae2894ba47ee5bc905f298db6b67ae40af6ebd74 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 4 Dec 2013 13:42:59 -0200 Subject: Make couch backend consistent and lightweight. This commit introduces the use of couch view, list and update functions to put and get data from the couch database. This avoids loads of metadata transferring and encapsulates operations in atomic PUTs. --- .gitignore | 2 - common/setup.py | 86 ++ common/src/leap/soledad/common/README.txt | 79 ++ common/src/leap/soledad/common/couch.py | 1229 ++++++++++++++------ common/src/leap/soledad/common/ddocs/README.txt | 29 + common/src/leap/soledad/common/ddocs/__init__.py | 138 +++ .../leap/soledad/common/ddocs/docs/updates/put.js | 64 + .../common/ddocs/docs/updates/resolve_doc.js | 39 + .../soledad/common/ddocs/docs/views/get/map.js | 20 + .../leap/soledad/common/ddocs/syncs/updates/put.js | 22 + .../soledad/common/ddocs/syncs/views/log/map.js | 12 + .../common/ddocs/transactions/lists/generation.js | 20 + .../ddocs/transactions/lists/trans_id_for_gen.js | 19 + .../ddocs/transactions/lists/whats_changed.js | 22 + .../common/ddocs/transactions/views/log/map.js | 7 + common/src/leap/soledad/common/objectstore.py | 282 ----- common/src/leap/soledad/common/tests/test_couch.py | 3 +- 17 files changed, 1411 insertions(+), 662 deletions(-) create mode 100644 common/src/leap/soledad/common/README.txt create mode 100644 common/src/leap/soledad/common/ddocs/README.txt create mode 100644 common/src/leap/soledad/common/ddocs/__init__.py create mode 100644 common/src/leap/soledad/common/ddocs/docs/updates/put.js create mode 100644 common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js create mode 100644 common/src/leap/soledad/common/ddocs/docs/views/get/map.js create mode 100644 common/src/leap/soledad/common/ddocs/syncs/updates/put.js create mode 100644 common/src/leap/soledad/common/ddocs/syncs/views/log/map.js create mode 100644 common/src/leap/soledad/common/ddocs/transactions/lists/generation.js create mode 100644 common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js create mode 100644 common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js create mode 100644 common/src/leap/soledad/common/ddocs/transactions/views/log/map.js delete mode 100644 common/src/leap/soledad/common/objectstore.py diff --git a/.gitignore b/.gitignore index 701f48fc..bd170f79 100644 --- a/.gitignore +++ b/.gitignore @@ -10,5 +10,3 @@ MANIFEST *.pyc *.log *.*~ - - diff --git a/common/setup.py b/common/setup.py index bcc2b4b3..42bf272a 100644 --- a/common/setup.py +++ b/common/setup.py @@ -103,6 +103,92 @@ def get_versions(default={}, verbose=False): f.write(subst_template) +from os import listdir +from os.path import realpath, dirname, isdir, join, isfile, basename +import json +import logging +import binascii + + +old_cmd_sdist = cmdclass["sdist"] + + +class cmd_sdist(old_cmd_sdist): + """ + Generate 'src/leap/soledad/common/ddocs.py' which contains coush design + documents scripts. + """ + + def run(self): + old_cmd_sdist.run(self) + self.build_ddocs_py() + + def build_ddocs_py(self): + """ + Build couch design documents based on content from subdirectories in + 'src/leap/soledad/common/ddocs'. + """ + prefix = join( + dirname(realpath(__file__)), + 'src', 'leap', 'soledad', 'common') + ddocs_prefix = join(prefix, 'ddocs') + ddocs = {} + + # design docs are represented by subdirectories of `ddocs_prefix` + for ddoc in [f for f in listdir(ddocs_prefix) + if isdir(join(ddocs_prefix, f))]: + + ddocs[ddoc] = {'_id': '_design/%s' % ddoc} + + for t in ['views', 'lists', 'updates']: + tdir = join(ddocs_prefix, ddoc, t) + if isdir(tdir): + + ddocs[ddoc][t] = {} + + if t == 'views': # handle views (with map/reduce functions) + for view in [f for f in listdir(tdir) \ + if isdir(join(tdir, f))]: + # look for map.js and reduce.js + mapfile = join(tdir, view, 'map.js') + reducefile = join(tdir, view, 'reduce.js') + mapfun = None + reducefun = None + try: + with open(mapfile) as f: + mapfun = f.read() + except IOError: + pass + try: + with open(reducefile) as f: + reducefun = f.read() + except IOError: + pass + ddocs[ddoc]['views'][view] = {} + + if mapfun is not None: + ddocs[ddoc]['views'][view]['map'] = mapfun + if reducefun is not None: + ddocs[ddoc]['views'][view]['reduce'] = reducefun + + else: # handle lists, updates, etc + for fun in [f for f in listdir(tdir) \ + if isfile(join(tdir, f))]: + funfile = join(tdir, fun) + funname = basename(funfile).replace('.js', '') + try: + with open(funfile) as f: + ddocs[ddoc][t][funname] = f.read() + except IOError: + pass + # write file containing design docs strings + with open(join(prefix, 'ddocs.py'), 'w') as f: + for ddoc in ddocs: + f.write( + "%s = '%s'\n" % + (ddoc, binascii.b2a_base64(json.dumps(ddocs[ddoc]))[:-1])) + + cmdclass["freeze_debianver"] = freeze_debianver # XXX add ref to docs diff --git a/common/src/leap/soledad/common/README.txt b/common/src/leap/soledad/common/README.txt new file mode 100644 index 00000000..106efb5e --- /dev/null +++ b/common/src/leap/soledad/common/README.txt @@ -0,0 +1,79 @@ +Soledad common package +====================== + +This package contains Soledad bits used by both server and client. + +Couch U1DB Backend +------------------ + +U1DB backends rely on some atomic operations that modify documents contents +and metadata (conflicts, transaction ids and indexes). The only atomic +operation in Couch is a document put, so every u1db atomic operation has to be +mapped to a couch document put. + +The atomic operations in the U1DB SQLite reference backend implementation may +be identified by the use of a context manager to access the underlying +database. A listing of the methods involved in each atomic operation are +depiced below. The top-level elements correpond to the atomic operations that +have to be mapped, and items on deeper levels of the list have to be +implemented in a way that all changes will be pushed with just one operation. + + * _set_replica_uid + * put_doc: + * _get_doc + * _put_and_update_indexes + * insert/update the document + * insert into transaction log + * delete_doc + * _get_doc + * _put_and_update_indexes + * get_doc_conflicts + * _get_conflicts + * _set_replica_gen_and_trans_id + * _do_set_replica_gen_and_trans_id + * _put_doc_if_newer + * _get_doc + * _validate_source (**) + * _get_replica_gen_and_trans_id + * cases: + * is newer: + * _prune_conflicts (**) + * _has_conflicts + * _delete_conflicts + * _put_and_update_indexes + * same content as: + * _put_and_update_indexes + * conflicted: + * _force_doc_sync_conflict + * _prune_conflicts + * _add_conflict + * _put_and_update_indexes + * _do_set_replica_gen_and_trans_id + * resolve_doc + * _get_doc + * cases: + * doc is superseded + * _put_and_update_indexes + * else + * _add_conflict + * _delete_conflicts + * delete_index + * create_index + +Couch views and update functions are used in order to achieve atomicity on the +Couch backend. Transactions are stored in the `u1db_transactions` field of the +couch document. Document's content and conflicted versions are stored as couch +document attachments with names, respectivelly, `u1db_content` and +`u1db_conflicts`. + +A map of methods and couch query URI can be found on the `./ddocs/README.txt` +document. + +Notes: + + * Currently, the couch backend does not implement indexing, so what is + depicted as `_put_and_update_indexes` above will be found as `_put_doc` in + the backend. + + * Conflict updates are part of document put using couch update functions, + and as such are part of the same atomic operation as document put. diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 1396f4d7..3b0e042a 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -18,26 +18,34 @@ """A U1DB backend that uses CouchDB as its persistence layer.""" -import re +import os import simplejson as json -import socket +import sys +import time +import uuid import logging +import binascii +import socket -from u1db import errors -from u1db.sync import Synchronizer -from u1db.backends.inmemory import InMemoryIndex +from u1db.backends import CommonBackend, CommonSyncTarget from u1db.remote.server_state import ServerState -from u1db.errors import DatabaseDoesNotExist -from couchdb.client import Server, Document as CouchDocument +from u1db import ( + errors, + query_parser, + vectorclock, +) +from couchdb.client import ( + Server, + Document as CouchDocument, + _doc_resource, +) from couchdb.http import ResourceNotFound, Unauthorized from leap.soledad.common import USER_DB_PREFIX -from leap.soledad.common.objectstore import ( - ObjectStoreDatabase, - ObjectStoreSyncTarget, -) +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common.ddocs import ensure_ddocs_on_remote_db logger = logging.getLogger(__name__) @@ -49,158 +57,125 @@ class InvalidURLError(Exception): """ -def persistent_class(cls): +class CouchDocument(SoledadDocument): """ - Decorator that modifies a class to ensure u1db metadata persists on - underlying storage. + This is the document used for maintaining the Couch backend. - @param cls: The class that will be modified. - @type cls: type + A CouchDocument can fetch and manipulate conflicts and also holds a + reference to the couch document revision. This data is used to ensure an + atomic and consistent update of the database. """ - def _create_persistent_method(old_method_name, key, load_method_name, - dump_method_name, store): - """ - Create a persistent method to replace C{old_method_name}. - - The new method will load C{key} using C{load_method_name} and stores - it using C{dump_method_name} depending on the value of C{store}. - """ - # get methods - old_method = getattr(cls, old_method_name) - load_method = getattr(cls, load_method_name) \ - if load_method_name is not None \ - else lambda self, data: setattr(self, key, data) - dump_method = getattr(cls, dump_method_name) \ - if dump_method_name is not None \ - else lambda self: getattr(self, key) - - def _new_method(self, *args, **kwargs): - # get u1db data from couch db - doc = self._get_doc('%s%s' % - (self.U1DB_DATA_DOC_ID_PREFIX, key)) - load_method(self, doc.content['content']) - # run old method - retval = old_method(self, *args, **kwargs) - # store u1db data on couch - if store: - doc.content = {'content': dump_method(self)} - self._put_doc(doc) - return retval - - return _new_method - - # ensure the class has a persistency map - if not hasattr(cls, 'PERSISTENCY_MAP'): - logger.error('Class %s has no PERSISTENCY_MAP attribute, skipping ' - 'persistent methods substitution.' % cls) - return cls - # replace old methods with new persistent ones - for key, ((load_method_name, dump_method_name), - persistent_methods) in cls.PERSISTENCY_MAP.iteritems(): - for (method_name, store) in persistent_methods: - setattr(cls, method_name, - _create_persistent_method( - method_name, - key, - load_method_name, - dump_method_name, - store)) - return cls - - -@persistent_class -class CouchDatabase(ObjectStoreDatabase): + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, + syncable=True): + """ + Container for handling a document that is stored in couch backend. + + :param doc_id: The unique document identifier. + :type doc_id: str + :param rev: The revision identifier of the document. + :type rev: str + :param json: The JSON string for this document. + :type json: str + :param has_conflicts: Boolean indicating if this document has conflicts + :type has_conflicts: bool + :param syncable: Should this document be synced with remote replicas? + :type syncable: bool + """ + SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) + self._couch_rev = None + self._conflicts = None + self._modified_conflicts = False + + def ensure_fetch_conflicts(self, get_conflicts_fun): + """ + Ensure conflict data has been fetched from the server. + + :param get_conflicts_fun: A function which, given the document id and + the couch revision, return the conflicted + versions of the current document. + :type get_conflicts_fun: function + """ + if self._conflicts is None: + self._conflicts = get_conflicts_fun(self.doc_id, + couch_rev=self.couch_rev) + self.has_conflicts = len(self._conflicts) > 0 + + def get_conflicts(self): + """ + Get the conflicted versions of the document. + + :return: The conflicted versions of the document. + :rtype: [CouchDocument] + """ + return self._conflicts + + def add_conflict(self, doc): + """ + Add a conflict to this document. + + :param doc: The conflicted version to be added. + :type doc: CouchDocument + """ + if self._conflicts is None: + raise Exception("Run self.ensure_fetch_conflicts first!") + self._modified_conflicts = True + self._conflicts.append(doc) + self.has_conflicts = len(self._conflicts) > 0 + + def delete_conflicts(self, conflict_revs): + """ + Delete conflicted versions of this document. + + :param conflict_revs: The conflicted revisions to be deleted. + :type conflict_revs: [str] + """ + if self._conflicts is None: + raise Exception("Run self.ensure_fetch_conflicts first!") + conflicts_len = len(self._conflicts) + self._conflicts = filter( + lambda doc: doc.rev not in conflict_revs, + self._conflicts) + if len(self._conflicts) < conflicts_len: + self._modified_conflicts = True + self.has_conflicts = len(self._conflicts) > 0 + + def modified_conflicts(self): + """ + Return whether this document's conflicts have been modified. + + :return: Whether this document's conflicts have been modified. + :rtype: bool + """ + return self._conflicts is not None and \ + self._modified_conflicts is True + + def _get_couch_rev(self): + return self._couch_rev + + def _set_couch_rev(self, rev): + self._couch_rev = rev + + couch_rev = property(_get_couch_rev, _set_couch_rev) + + +class CouchDatabase(CommonBackend): """ - A U1DB backend that uses Couch as its persistence layer. + A U1DB implementation that uses CouchDB as its persistence layer. """ - U1DB_TRANSACTION_LOG_KEY = '_transaction_log' - U1DB_CONFLICTS_KEY = '_conflicts' - U1DB_OTHER_GENERATIONS_KEY = '_other_generations' - U1DB_INDEXES_KEY = '_indexes' - U1DB_REPLICA_UID_KEY = '_replica_uid' - - U1DB_DATA_KEYS = [ - U1DB_TRANSACTION_LOG_KEY, - U1DB_CONFLICTS_KEY, - U1DB_OTHER_GENERATIONS_KEY, - U1DB_INDEXES_KEY, - U1DB_REPLICA_UID_KEY, - ] - - COUCH_ID_KEY = '_id' - COUCH_REV_KEY = '_rev' - COUCH_U1DB_ATTACHMENT_KEY = 'u1db_json' - COUCH_U1DB_REV_KEY = 'u1db_rev' - - # the following map describes information about methods usage of - # properties that have to persist on the underlying database. The format - # of the map is assumed to be: - # - # { - # 'property_name': [ - # ('property_load_method_name', 'property_dump_method_name'), - # [('method_1_name', bool), - # ... - # ('method_N_name', bool)]], - # ... - # } - # - # where the booleans indicate if the property should be stored after - # each method execution (i.e. if the method alters the property). Property - # load/dump methods will be run after/before properties are read/written - # to the underlying db. - PERSISTENCY_MAP = { - U1DB_TRANSACTION_LOG_KEY: [ - ('_load_transaction_log_from_json', None), - [('_get_transaction_log', False), - ('_get_generation', False), - ('_get_generation_info', False), - ('_get_trans_id_for_gen', False), - ('whats_changed', False), - ('_put_and_update_indexes', True)]], - U1DB_CONFLICTS_KEY: [ - (None, None), - [('_has_conflicts', False), - ('get_doc_conflicts', False), - ('_prune_conflicts', False), - ('resolve_doc', False), - ('_replace_conflicts', True), - ('_force_doc_sync_conflict', True)]], - U1DB_OTHER_GENERATIONS_KEY: [ - ('_load_other_generations_from_json', None), - [('_get_replica_gen_and_trans_id', False), - ('_do_set_replica_gen_and_trans_id', True)]], - U1DB_INDEXES_KEY: [ - ('_load_indexes_from_json', '_dump_indexes_as_json'), - [('list_indexes', False), - ('get_from_index', False), - ('get_range_from_index', False), - ('get_index_keys', False), - ('_put_and_update_indexes', True), - ('create_index', True), - ('delete_index', True)]], - U1DB_REPLICA_UID_KEY: [ - (None, None), - [('_allocate_doc_rev', False), - ('_put_doc_if_newer', False), - ('_ensure_maximal_rev', False), - ('_prune_conflicts', False), - ('_set_replica_uid', True)]]} - @classmethod def open_database(cls, url, create): """ Open a U1DB database using CouchDB as backend. - @param url: the url of the database replica - @type url: str - @param create: should the replica be created if it does not exist? - @type create: bool + :param url: the url of the database replica + :type url: str + :param create: should the replica be created if it does not exist? + :type create: bool - @return: the database instance - @rtype: CouchDatabase + :return: the database instance + :rtype: CouchDatabase """ # get database from url m = re.match('(^https?://[^/]+)/(.+)$', url) @@ -221,293 +196,795 @@ class CouchDatabase(ObjectStoreDatabase): """ Create a new Couch data container. - @param url: the url of the couch database - @type url: str - @param dbname: the database name - @type dbname: str - @param replica_uid: an optional unique replica identifier - @type replica_uid: str - @param full_commit: turn on the X-Couch-Full-Commit header - @type full_commit: bool - @param session: an http.Session instance or None for a default session - @type session: http.Session + :param url: the url of the couch database + :type url: str + :param dbname: the database name + :type dbname: str + :param replica_uid: an optional unique replica identifier + :type replica_uid: str + :param full_commit: turn on the X-Couch-Full-Commit header + :type full_commit: bool + :param session: an http.Session instance or None for a default session + :type session: http.Session + :param ensure_ddocs: Ensure that the design docs exist on server. + :type ensure_ddocs: bool """ # save params self._url = url self._full_commit = full_commit self._session = session + self._factory = CouchDocument + self._real_replica_uid = None # configure couch self._server = Server(url=self._url, full_commit=self._full_commit, session=self._session) self._dbname = dbname - # this will ensure that transaction and sync logs exist and are - # up-to-date. try: self._database = self._server[self._dbname] except ResourceNotFound: self._server.create(self._dbname) self._database = self._server[self._dbname] - ObjectStoreDatabase.__init__(self, replica_uid=replica_uid) + self._initialize(replica_uid or uuid.uuid4().hex) + + def get_sync_target(self): + """ + Return a SyncTarget object, for another u1db to synchronize with. + + :return: The sync target. + :rtype: CouchSyncTarget + """ + return CouchSyncTarget(self) + + def delete_database(self): + """ + Delete a U1DB CouchDB database. + """ + del(self._server[self._dbname]) + + def close(self): + """ + Release any resources associated with this database. + + :return: True if db was succesfully closed. + :rtype: bool + """ + self._url = None + self._full_commit = None + self._session = None + self._server = None + self._database = None + return True + + def _set_replica_uid(self, replica_uid): + """ + Force the replica uid to be set. + + :param replica_uid: The new replica uid. + :type replica_uid: str + """ + try: + doc = self._database['u1db_config'] + except ResourceNotFound: + doc = { + '_id': 'u1db_config', + 'replica_uid': replica_uid, + } + self._database.save(doc) + + def _ensure_design_docs(self): + """ + Ensure that the design docs have been created. + """ + if self._is_initialized(): + return + self._initialize() + + def _set_replica_uid(self, replica_uid): + """Force the replica_uid to be set.""" + doc = self._database['u1db_config'] + doc['replica_uid'] = replica_uid + self._database.save(doc) + self._real_replica_uid = replica_uid + + def _get_replica_uid(self): + """ + Get the replica uid. + + :return: The replica uid. + :rtype: str + """ + if self._real_replica_uid is not None: + return self._real_replica_uid + doc = self._database['u1db_config'] + self._real_replica_uid = doc['replica_uid'] + return self._real_replica_uid + + _replica_uid = property(_get_replica_uid, _set_replica_uid) + + def _get_generation(self): + """ + Return the current generation. + + :return: The current generation. + :rtype: int + """ + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'generation', 'log') + response = res.get_json() + return response[2]['generation'] + + def _get_generation_info(self): + """ + Return the current generation. - #------------------------------------------------------------------------- - # methods from Database - #------------------------------------------------------------------------- + :return: A tuple containing the current generation and transaction id. + :rtype: (int, str) + """ + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'generation', 'log') + response = res.get_json() + return (response[2]['generation'], response[2]['transaction_id']) + + def _get_trans_id_for_gen(self, generation): + """ + Get the transaction id corresponding to a particular generation. + + :param generation: The generation for which to get the transaction id. + :type generation: int + + :return: The transaction id for C{generation}. + :rtype: str + + :raise InvalidGeneration: Raised when the generation does not exist. + """ + if generation == 0: + return '' + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'trans_id_for_gen', 'log') + response = res.get_json(gen=generation) + if response[2] == {}: + raise errors.InvalidGeneration + return response[2]['transaction_id'] + + def _get_transaction_log(self): + """ + This is only for the test suite, it is not part of the api. + + :return: The complete transaction log. + :rtype: [(str, str)] + """ + # query a couch view + res = self._database.resource( + '_design', 'transactions', '_view', 'log') + response = res.get_json() + return map(lambda row: (row['id'], row['value']), response[2]['rows']) def _get_doc(self, doc_id, check_for_conflicts=False): """ - Get just the document content, without fancy handling. + Extract the document from storage. + + This can return None if the document doesn't exist. + + :param doc_id: The unique document identifier + :type doc_id: str + :param check_for_conflicts: If set to False, then the conflict check + will be skipped. + :type check_for_conflicts: bool + + :return: The document. + :rtype: CouchDocument + """ + # get document with all attachments (u1db content and eventual + # conflicts) + try: + result = \ + self._database.resource(doc_id).get_json( + attachments=True)[2] + except ResourceNotFound: + return None + # restrict to u1db documents + if 'u1db_rev' not in result: + return None + doc = self._factory(doc_id, result['u1db_rev']) + # set contents or make tombstone + if '_attachments' not in result \ + or 'u1db_content' not in result['_attachments']: + doc.make_tombstone() + else: + doc.content = json.loads( + binascii.a2b_base64( + result['_attachments']['u1db_content']['data'])) + # determine if there are conflicts + if check_for_conflicts \ + and '_attachments' in result \ + and 'u1db_conflicts' in result['_attachments']: + doc.has_conflicts = True + # store couch revision + doc.couch_rev = result['_rev'] + return doc + + def get_doc(self, doc_id, include_deleted=False): + """ + Get the JSON string for the given document. - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be + :param doc_id: The unique document identifier + :type doc_id: str + :param include_deleted: If set to True, deleted documents will be returned with empty content. Otherwise asking for a deleted document will return None. - @type include_deleted: bool + :type include_deleted: bool - @return: a Document object. - @type: u1db.Document + :return: A document object. + :rtype: CouchDocument. """ - cdoc = self._database.get(doc_id) - if cdoc is None: + doc = self._get_doc(doc_id, check_for_conflicts=True) + if doc is None: + return None + if doc.is_tombstone() and not include_deleted: return None - has_conflicts = False - if check_for_conflicts: - has_conflicts = self._has_conflicts(doc_id) - doc = self._factory( - doc_id=doc_id, - rev=cdoc[self.COUCH_U1DB_REV_KEY], - has_conflicts=has_conflicts) - contents = self._database.get_attachment( - cdoc, - self.COUCH_U1DB_ATTACHMENT_KEY) - if contents: - doc.content = json.loads(contents.read()) - else: - doc.make_tombstone() return doc def get_all_docs(self, include_deleted=False): """ Get the JSON content for all documents in the database. - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise deleted documents will not - be included in the results. - @type include_deleted: bool + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :type include_deleted: bool - @return: (generation, [Document]) + :return: (generation, [CouchDocument]) The current generation of the database, followed by a list of all the documents in the database. - @rtype: tuple + :rtype: (int, [CouchDocument]) """ + generation = self._get_generation() results = [] - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue - doc = self._get_doc(doc_id, check_for_conflicts=True) - if doc.content is None and not include_deleted: - continue - results.append(doc) + for row in self._database.view('_all_docs'): + doc = self.get_doc(row.id, include_deleted=include_deleted) + if doc is not None: + results.append(doc) return (generation, results) - def _put_doc(self, doc): + def _put_doc(self, old_doc, doc): """ - Update a document. + Put the document in the Couch backend database. - This is called everytime we just want to do a raw put on the db (i.e. - without index updates, document constraint checks, and conflict - checks). - - @param doc: The document to update. - @type doc: u1db.Document - - @return: The new revision identifier for the document. - @rtype: str - """ - # prepare couch's Document - cdoc = CouchDocument() - cdoc[self.COUCH_ID_KEY] = doc.doc_id - # we have to guarantee that couch's _rev is consistent - old_cdoc = self._database.get(doc.doc_id) - if old_cdoc is not None: - cdoc[self.COUCH_REV_KEY] = old_cdoc[self.COUCH_REV_KEY] - # store u1db's rev - cdoc[self.COUCH_U1DB_REV_KEY] = doc.rev - # save doc in db - self._database.save(cdoc) - # store u1db's content as json string - if not doc.is_tombstone(): - self._database.put_attachment( - cdoc, doc.get_json(), - filename=self.COUCH_U1DB_ATTACHMENT_KEY) - else: - self._database.delete_attachment( - cdoc, - self.COUCH_U1DB_ATTACHMENT_KEY) + :param old_doc: The old document version. + :type old_doc: CouchDocument + :param doc: The document to be put. + :type doc: CouchDocument - def get_sync_target(self): + :raise RevisionConflict: Raised when trying to update a document but + couch revisions mismatch. """ - Return a SyncTarget object, for another u1db to synchronize with. - - @return: The sync target. - @rtype: CouchSyncTarget + trans_id = self._allocate_transaction_id() + # encode content + content = doc.get_json() + if content is not None: + content = binascii.b2a_base64(content)[:-1] # exclude trailing \n + # encode conflicts + conflicts = None + update_conflicts = doc.modified_conflicts() + if update_conflicts is True: + if doc.has_conflicts: + conflicts = binascii.b2a_base64( + json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + )[:-1] # exclude \n + # perform the request + resource = self._database.resource( + '_design', 'docs', '_update', 'put', doc.doc_id) + response = resource.put_json( + body={ + 'couch_rev': old_doc.couch_rev + if old_doc is not None else None, + 'u1db_rev': doc.rev, + 'content': content, + 'trans_id': trans_id, + 'conflicts': conflicts, + 'update_conflicts': update_conflicts, + }, + headers={'content-type': 'application/json'}) + # the document might have been updated in between, so we check for the + # return message + msg = response[2].read() + if msg == 'ok': + return + elif msg == 'revision conflict': + raise errors.RevisionConflict() + + def put_doc(self, doc): """ - return CouchSyncTarget(self) + Update a document. - def create_index(self, index_name, *index_expressions): - """ - Create a named index, which can then be queried for future lookups. + If the document currently has conflicts, put will fail. + If the database specifies a maximum document size and the document + exceeds it, put will fail and raise a DocumentTooBig exception. - @param index_name: A unique name which can be used as a key prefix. - @param index_expressions: Index expressions defining the index - information. - """ - if index_name in self._indexes: - if self._indexes[index_name]._definition == list( - index_expressions): - return - raise errors.IndexNameTakenError - index = InMemoryIndex(index_name, list(index_expressions)) - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue # skip special files - doc = self._get_doc(doc_id) - if doc.content is not None: - index.add_json(doc_id, doc.get_json()) - self._indexes[index_name] = index + :param doc: A Document with new content. + :return: new_doc_rev - The new revision identifier for the document. + The Document object will also be updated. - def close(self): + :raise errors.InvalidDocId: Raised if the document's id is invalid. + :raise errors.DocumentTooBig: Raised if the document size is too big. + :raise errors.ConflictedDoc: Raised if the document has conflicts. """ - Release any resources associated with this database. - - @return: True if db was succesfully closed. - @rtype: bool + if doc.doc_id is None: + raise errors.InvalidDocId() + self._check_doc_id(doc.doc_id) + self._check_doc_size(doc) + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc and old_doc.has_conflicts: + raise errors.ConflictedDoc() + if old_doc and doc.rev is None and old_doc.is_tombstone(): + new_rev = self._allocate_doc_rev(old_doc.rev) + else: + if old_doc is not None: + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + else: + if doc.rev is not None: + raise errors.RevisionConflict() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + self._put_doc(old_doc, doc) + return new_rev + + def whats_changed(self, old_generation=0): """ - # TODO: fix this method so the connection is properly closed and - # test_close (+tearDown, which deletes the db) works without problems. - self._url = None - self._full_commit = None - self._session = None - #self._server = None - self._database = None - return True - - def sync(self, url, creds=None, autocreate=True): + Return a list of documents that have changed since old_generation. + + :param old_generation: The generation of the database in the old + state. + :type old_generation: int + + :return: (generation, trans_id, [(doc_id, generation, trans_id),...]) + The current generation of the database, its associated + transaction id, and a list of of changed documents since + old_generation, represented by tuples with for each document + its doc_id and the generation and transaction id corresponding + to the last intervening change and sorted by generation (old + changes first) + :rtype: (int, str, [(str, int, str)]) """ - Synchronize documents with remote replica exposed at url. + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'whats_changed', 'log') + response = res.get_json(old_gen=old_generation) + results = map( + lambda row: + (row['generation'], row['doc_id'], row['transaction_id']), + response[2]['transactions']) + results.reverse() + cur_gen = old_generation + seen = set() + changes = [] + newest_trans_id = '' + for generation, doc_id, trans_id in results: + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + if changes: + cur_gen = changes[0][1] # max generation + newest_trans_id = changes[0][2] + changes.reverse() + else: + cur_gen, newest_trans_id = self._get_generation_info() - @param url: The url of the target replica to sync with. - @type url: str - @param creds: optional dictionary giving credentials. - to authorize the operation with the server. - @type creds: dict - @param autocreate: Ask the target to create the db if non-existent. - @type autocreate: bool + return cur_gen, newest_trans_id, changes - @return: The local generation before the synchronisation was performed. - @rtype: int + def delete_doc(self, doc): """ - return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync( - autocreate=autocreate) + Mark a document as deleted. + + Will abort if the current revision doesn't match doc.rev. + This will also set doc.content to None. - #------------------------------------------------------------------------- - # methods from ObjectStoreDatabase - #------------------------------------------------------------------------- + :param doc: The document to mark as deleted. + :type doc: CouchDocument. - def _init_u1db_data(self): + :raise errors.DocumentDoesNotExist: Raised if the document does not + exist. + :raise errors.RevisionConflict: Raised if the revisions do not match. + :raise errors.DocumentAlreadyDeleted: Raised if the document is + already deleted. + :raise errors.ConflictedDoc: Raised if the doc has conflicts. """ - Initialize u1db configuration data on backend storage. + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc is None: + raise errors.DocumentDoesNotExist + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + if old_doc.is_tombstone(): + raise errors.DocumentAlreadyDeleted + if old_doc.has_conflicts: + raise errors.ConflictedDoc() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + doc.make_tombstone() + self._put_doc(old_doc, doc) + return new_rev + + def _get_conflicts(self, doc_id, couch_rev=None): + """ + Get the conflicted versions of a document. + + If the C{couch_rev} parameter is not None, conflicts for a specific + document's couch revision are returned. - A U1DB database needs to keep track of all database transactions, - document conflicts, the generation of other replicas it has seen, - indexes created by users and so on. + :param couch_rev: The couch document revision. + :type couch_rev: str - In this implementation, all this information is stored in special - documents stored in the underlying with doc_id prefix equal to - U1DB_DATA_DOC_ID_PREFIX. Those documents ids are reserved: put_doc(), - get_doc() and delete_doc() will not allow documents with a doc_id with - that prefix to be accessed or modified. + :return: A list of conflicted versions of the document. + :rtype: list """ - for key in self.U1DB_DATA_KEYS: - doc_id = '%s%s' % (self.U1DB_DATA_DOC_ID_PREFIX, key) - doc = self._get_doc(doc_id) - if doc is None: - doc = self._factory(doc_id) - doc.content = {'content': getattr(self, key)} - self._put_doc(doc) + # request conflicts attachment from server + params = {} + if couch_rev is not None: + params['rev'] = couch_rev # restric document's couch revision + resource = self._database.resource(doc_id, 'u1db_conflicts') + try: + response = resource.get_json(**params) + conflicts = [] + # build the conflicted versions + for doc_rev, content in json.loads(response[2].read()): + doc = self._factory(doc_id, doc_rev) + if content is None: + doc.make_tombstone() + else: + doc.content = content + conflicts.append(doc) + return conflicts + except ResourceNotFound: + return [] - #------------------------------------------------------------------------- - # Couch specific methods - #------------------------------------------------------------------------- + def get_doc_conflicts(self, doc_id): + """ + Get the list of conflicts for the given document. - INDEX_NAME_KEY = 'name' - INDEX_DEFINITION_KEY = 'definition' - INDEX_VALUES_KEY = 'values' + The order of the conflicts is such that the first entry is the value + that would be returned by "get_doc". - def delete_database(self): - """ - Delete a U1DB CouchDB database. + :return: A list of the document entries that are conflicted. + :rtype: [CouchDocument] """ - del(self._server[self._dbname]) + conflict_docs = self._get_conflicts(doc_id) + if len(conflict_docs) == 0: + return [] + this_doc = self._get_doc(doc_id, check_for_conflicts=True) + return [this_doc] + conflict_docs - def _dump_indexes_as_json(self): + def _get_replica_gen_and_trans_id(self, other_replica_uid): """ - Dump index definitions as JSON. + Return the last known generation and transaction id for the other db + replica. + + When you do a synchronization with another replica, the Database keeps + track of what generation the other database replica was at, and what + the associated transaction id was. This is used to determine what data + needs to be sent, and if two databases are claiming to be the same + replica. + + :param other_replica_uid: The identifier for the other replica. + :type other_replica_uid: str + + :return: A tuple containing the generation and transaction id we + encountered during synchronization. If we've never + synchronized with the replica, this is (0, ''). + :rtype: (int, str) """ - indexes = {} - for name, idx in self._indexes.iteritems(): - indexes[name] = {} - for attr in [self.INDEX_NAME_KEY, self.INDEX_DEFINITION_KEY, - self.INDEX_VALUES_KEY]: - indexes[name][attr] = getattr(idx, '_' + attr) - return indexes + # query a couch view + result = self._database.view('syncs/log') + if len(result[other_replica_uid].rows) == 0: + return (0, '') + return ( + result[other_replica_uid].rows[0]['value']['known_generation'], + result[other_replica_uid].rows[0]['value']['known_transaction_id'] + ) + + def _set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + self._do_set_replica_gen_and_trans_id( + other_replica_uid, other_generation, other_transaction_id) - def _load_indexes_from_json(self, indexes): + def _do_set_replica_gen_and_trans_id( + self, other_replica_uid, other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + # query a couch update function + res = self._database.resource( + '_design', 'syncs', '_update', 'put', 'u1db_sync_log') + res.put_json( + body={ + 'other_replica_uid': other_replica_uid, + 'other_generation': other_generation, + 'other_transaction_id': other_transaction_id, + }, + headers={'content-type': 'application/json'}) + + def _add_conflict(self, doc, my_doc_rev, my_content): + """ + Add a conflict to the document. + + Note that this method does not actually update the backend; rather, it + updates the CouchDocument object which will provide the conflict data + when the atomic document update is made. + + :param doc: The document to have conflicts added to. + :type doc: CouchDocument + :param my_doc_rev: The revision of the conflicted document. + :type my_doc_rev: str + :param my_content: The content of the conflicted document as a JSON + serialized string. + :type my_content: str """ - Load index definitions from stored JSON. + doc.ensure_fetch_conflicts(self._get_conflicts) + doc.add_conflict( + self._factory(doc_id=doc.doc_id, rev=my_doc_rev, + json=my_content)) - @param indexes: A JSON representation of indexes as - [('index-name', ['field', 'field2', ...]), ...]. - @type indexes: str + def _delete_conflicts(self, doc, conflict_revs): """ - self._indexes = {} - for name, idx_dict in indexes.iteritems(): - idx = InMemoryIndex(name, idx_dict[self.INDEX_DEFINITION_KEY]) - idx._values = idx_dict[self.INDEX_VALUES_KEY] - self._indexes[name] = idx + Delete the conflicted revisions from the list of conflicts of C{doc}. - def _load_transaction_log_from_json(self, transaction_log): + Note that thie method does not actually update the backed; rather, it + updates the CouchDocument object which will provide the conflict data + when the atomic document update is made. + + :param doc: The document to have conflicts deleted. + :type doc: CouchDocument + :param conflict_revs: A list of the revisions to be deleted. + :param conflict_revs: [str] """ - Load transaction log from stored JSON. + doc.ensure_fetch_conflicts(self._get_conflicts) + doc.delete_conflicts(conflict_revs) - @param transaction_log: A JSON representation of transaction_log as - [('generation', 'transaction_id'), ...]. - @type transaction_log: list + def _prune_conflicts(self, doc, doc_vcr): + """ + Prune conflicts that are older then the current document's revision, or + whose content match to the current document's content. + + :param doc: The document to have conflicts pruned. + :type doc: CouchDocument + :param doc_vcr: A vector clock representing the current document's + revision. + :type doc_vcr: u1db.vectorclock.VectorClock """ - self._transaction_log = [] - for gen, trans_id in transaction_log: - self._transaction_log.append((gen, trans_id)) + if doc.has_conflicts is True: + autoresolved = False + c_revs_to_prune = [] + for c_doc in doc.get_conflicts(): + c_vcr = vectorclock.VectorClockRev(c_doc.rev) + if doc_vcr.is_newer(c_vcr): + c_revs_to_prune.append(c_doc.rev) + elif doc.same_content_as(c_doc): + c_revs_to_prune.append(c_doc.rev) + doc_vcr.maximize(c_vcr) + autoresolved = True + if autoresolved: + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._delete_conflicts(doc, c_revs_to_prune) + + def _force_doc_sync_conflict(self, doc): + """ + Add a conflict and force a document put. - def _load_other_generations_from_json(self, other_generations): + :param doc: The document to be put. + :type doc: CouchDocument """ - Load other generations from stored JSON. + my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + self._add_conflict(doc, my_doc.rev, my_doc.get_json()) + doc.has_conflicts = True + self._put_doc(my_doc, doc) - @param other_generations: A JSON representation of other_generations - as {'replica_uid': ('generation', 'transaction_id'), ...}. - @type other_generations: dict + def resolve_doc(self, doc, conflicted_doc_revs): + """ + Mark a document as no longer conflicted. + + We take the list of revisions that the client knows about that it is + superseding. This may be a different list from the actual current + conflicts, in which case only those are removed as conflicted. This + may fail if the conflict list is significantly different from the + supplied information. (sync could have happened in the background from + the time you GET_DOC_CONFLICTS until the point where you RESOLVE) + + :param doc: A Document with the new content to be inserted. + :type doc: CouchDocument + :param conflicted_doc_revs: A list of revisions that the new content + supersedes. + :type conflicted_doc_revs: [str] """ - self._other_generations = {} - for replica_uid, [gen, trans_id] in other_generations.iteritems(): - self._other_generations[replica_uid] = (gen, trans_id) + cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + new_rev = self._ensure_maximal_rev(cur_doc.rev, + conflicted_doc_revs) + superseded_revs = set(conflicted_doc_revs) + doc.rev = new_rev + if cur_doc.rev in superseded_revs: + self._delete_conflicts(doc, superseded_revs) + self._put_doc(cur_doc, doc) + else: + self._add_conflict(doc, new_rev, doc.get_json()) + self._delete_conflicts(doc, superseded_revs) + # perform request to resolve document in server + resource = self._database.resource( + '_design', 'docs', '_update', 'resolve_doc', doc.doc_id) + conflicts = None + if doc.has_conflicts: + conflicts = binascii.b2a_base64( + json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + )[:-1] # exclude \n + response = resource.put_json( + body={ + 'couch_rev': cur_doc.couch_rev, + 'conflicts': conflicts, + }, + headers={'content-type': 'application/json'}) + + def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, + replica_trans_id=''): + """ + Insert/update document into the database with a given revision. + + This api is used during synchronization operations. + + If a document would conflict and save_conflict is set to True, the + content will be selected as the 'current' content for doc.doc_id, + even though doc.rev doesn't supersede the currently stored revision. + The currently stored document will be added to the list of conflict + alternatives for the given doc_id. + + This forces the new content to be 'current' so that we get convergence + after synchronizing, even if people don't resolve conflicts. Users can + then notice that their content is out of date, update it, and + synchronize again. (The alternative is that users could synchronize and + think the data has propagated, but their local copy looks fine, and the + remote copy is never updated again.) + + :param doc: A document object + :type doc: CouchDocument + :param save_conflict: If this document is a conflict, do you want to + save it as a conflict, or just ignore it. + :type save_conflict: bool + :param replica_uid: A unique replica identifier. + :type replica_uid: str + :param replica_gen: The generation of the replica corresponding to the + this document. The replica arguments are optional, + but are used during synchronization. + :type replica_gen: int + :param replica_trans_id: The transaction_id associated with the + generation. + :type replica_trans_id: str + + :return: (state, at_gen) - If we don't have doc_id already, or if + doc_rev supersedes the existing document revision, then the + content will be inserted, and state is 'inserted'. If + doc_rev is less than or equal to the existing revision, then + the put is ignored and state is respecitvely 'superseded' or + 'converged'. If doc_rev is not strictly superseded or + supersedes, then state is 'conflicted'. The document will not + be inserted if save_conflict is False. For 'inserted' or + 'converged', at_gen is the insertion/current generation. + :rtype: (str, int) + """ + cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + # at this point, `doc` has arrived from the other syncing party, and + # we will decide what to do with it. + # First, we prepare the arriving doc to update couch database. + old_doc = doc + doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) + if cur_doc is not None: + doc.couch_rev = cur_doc.couch_rev + # fetch conflicts because we will eventually manipulate them + doc.ensure_fetch_conflicts(self._get_conflicts) + # from now on, it works just like u1db sqlite backend + doc_vcr = vectorclock.VectorClockRev(doc.rev) + if cur_doc is None: + cur_vcr = vectorclock.VectorClockRev(None) + else: + cur_vcr = vectorclock.VectorClockRev(cur_doc.rev) + self._validate_source(replica_uid, replica_gen, replica_trans_id) + if doc_vcr.is_newer(cur_vcr): + rev = doc.rev + self._prune_conflicts(doc, doc_vcr) + if doc.rev != rev: + # conflicts have been autoresolved + state = 'superseded' + else: + state = 'inserted' + self._put_doc(cur_doc, doc) + elif doc.rev == cur_doc.rev: + # magical convergence + state = 'converged' + elif cur_vcr.is_newer(doc_vcr): + # Don't add this to seen_ids, because we have something newer, + # so we should send it back, and we should not generate a + # conflict + state = 'superseded' + elif cur_doc.same_content_as(doc): + # the documents have been edited to the same thing at both ends + doc_vcr.maximize(cur_vcr) + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._put_doc(cur_doc, doc) + state = 'superseded' + else: + state = 'conflicted' + if save_conflict: + self._force_doc_sync_conflict(doc) + if replica_uid is not None and replica_gen is not None: + self._do_set_replica_gen_and_trans_id( + replica_uid, replica_gen, replica_trans_id) + # update info + old_doc.rev = doc.rev + if doc.is_tombstone(): + old_doc.is_tombstone() + else: + old_doc.content = doc.content + old_doc.has_conflicts = doc.has_conflicts + return state, self._get_generation() -class CouchSyncTarget(ObjectStoreSyncTarget): +class CouchSyncTarget(CommonSyncTarget): """ Functionality for using a CouchDatabase as a synchronization target. """ - pass + + def get_sync_info(self, source_replica_uid): + source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( + source_replica_uid) + my_gen, my_trans_id = self._db._get_generation_info() + return ( + self._db._replica_uid, my_gen, my_trans_id, source_gen, + source_trans_id) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + if self._trace_hook: + self._trace_hook('record_sync_info') + self._db._set_replica_gen_and_trans_id( + source_replica_uid, source_replica_generation, + source_replica_transaction_id) class NotEnoughCouchPermissions(Exception): @@ -527,12 +1004,12 @@ class CouchServerState(ServerState): """ Initialize the couch server state. - @param couch_url: The URL for the couch database. - @type couch_url: str - @param shared_db_name: The name of the shared database. - @type shared_db_name: str - @param tokens_db_name: The name of the tokens database. - @type tokens_db_name: str + :param couch_url: The URL for the couch database. + :type couch_url: str + :param shared_db_name: The name of the shared database. + :type shared_db_name: str + :param tokens_db_name: The name of the tokens database. + :type tokens_db_name: str """ self._couch_url = couch_url self._shared_db_name = shared_db_name @@ -563,12 +1040,12 @@ class CouchServerState(ServerState): couch library to ensure that Soledad Server can do everything it needs on the underlying couch database. - @param couch_url: The URL of the couch database. - @type couch_url: str + :param couch_url: The URL of the couch database. + :type couch_url: str @raise NotEnoughCouchPermissions: Raised in case there are not enough permissions to read/write/create the needed couch databases. - @rtype: bool + :rtype: bool """ def _open_couch_db(dbname): @@ -601,11 +1078,11 @@ class CouchServerState(ServerState): """ Open a couch database. - @param dbname: The name of the database to open. - @type dbname: str + :param dbname: The name of the database to open. + :type dbname: str - @return: The CouchDatabase object. - @rtype: CouchDatabase + :return: The CouchDatabase object. + :rtype: CouchDatabase """ # TODO: open couch return CouchDatabase.open_database( @@ -616,11 +1093,11 @@ class CouchServerState(ServerState): """ Ensure couch database exists. - @param dbname: The name of the database to ensure. - @type dbname: str + :param dbname: The name of the database to ensure. + :type dbname: str - @return: The CouchDatabase object and the replica uid. - @rtype: (CouchDatabase, str) + :return: The CouchDatabase object and the replica uid. + :rtype: (CouchDatabase, str) """ db = CouchDatabase.open_database( self._couch_url + '/' + dbname, @@ -631,8 +1108,8 @@ class CouchServerState(ServerState): """ Delete couch database. - @param dbname: The name of the database to delete. - @type dbname: str + :param dbname: The name of the database to delete. + :type dbname: str """ CouchDatabase.delete_database(self._couch_url + '/' + dbname) @@ -640,8 +1117,8 @@ class CouchServerState(ServerState): """ Set the couchdb URL - @param url: CouchDB URL - @type url: str + :param url: CouchDB URL + :type url: str """ self._couch_url = url @@ -649,7 +1126,7 @@ class CouchServerState(ServerState): """ Return CouchDB URL - @rtype: str + :rtype: str """ return self._couch_url diff --git a/common/src/leap/soledad/common/ddocs/README.txt b/common/src/leap/soledad/common/ddocs/README.txt new file mode 100644 index 00000000..37d89fbf --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/README.txt @@ -0,0 +1,29 @@ +The following table depicts the U1DB CouchDB backend method and the URI that +is queried to obtain/update data from/to the server. + + +----------------------------------+------------------------------------------------------------------+ + | u1db backend method | URI | + |----------------------------------+------------------------------------------------------------------| + | _get_generation | _design/transactions/_list/generation/log | + | _get_generation_info | _design/transactions/_list/generation/log | + | _get_trans_id_for_gen | _design/transactions/_list/trans_id_for_gen/log | + | _get_transaction_log | _design/transactions/_view/log | + | _get_doc (*) | _design/docs/_view/get?key= | + | _has_conflicts | _design/docs/_view/get?key= | + | get_all_docs | _design/docs/_view/get | + | _put_doc | _design/docs/_update/put/ | + | _whats_changed | _design/transactions/_list/whats_changed/log?old_gen= | + | _get_conflicts (*) | _design/docs/_view/conflicts?key= | + | _get_replica_gen_and_trans_id | _design/syncs/_view/log?other_replica_uid= | + | _do_set_replica_gen_and_trans_id | _design/syncs/_update/put/u1db_sync_log | + | _add_conflict | _design/docs/_update/add_conflict/ | + | _delete_conflicts | _design/docs/_update/delete_conflicts/?doc_rev= | + | list_indexes | not implemented | + | _get_index_definition | not implemented | + | delete_index | not implemented | + | _get_indexed_fields | not implemented | + | _put_and_update_indexes | not implemented | + +----------------------------------+------------------------------------------------------------------+ + +(*) These methods also request CouchDB document attachments that store U1DB + document contents. diff --git a/common/src/leap/soledad/common/ddocs/__init__.py b/common/src/leap/soledad/common/ddocs/__init__.py new file mode 100644 index 00000000..c2f78e18 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/__init__.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +CouchDB U1DB backend design documents helper. +""" + + +from os import listdir +from os.path import realpath, dirname, isdir, join, isfile, basename +import json +import logging + + +from couchdb import Document as CouchDocument + + +logger = logging.getLogger(__name__) + + +# where to search for design docs definitions +prefix = dirname(realpath(__file__)) + + +def ensure_ddocs_on_remote_db(db, prefix=prefix): + """ + Ensure that the design documents in C{db} contain. + + :param db: The database in which to create/update the design docs. + :type db: couchdb.client.Server + :param prefix: Where to look for design documents definitions. + :type prefix: str + """ + ddocs = build_ddocs(prefix) + for ddoc_name, ddoc_content in ddocs.iteritems(): + ddoc_id = "_design/%s" % ddoc_name + ddoc = CouchDocument({'_id': ddoc_id}) + ddoc.update(ddoc_content) + # ensure revision if ddoc is already in db + doc = db.get(ddoc_id) + if doc is not None: + ddoc['_rev'] = doc.rev + db.save(ddoc) + + +def create_local_ddocs(prefix=prefix): + """ + Create local design docs based on content from subdirectories in + C{prefix}. + + :param create_local: Whether to create local .json files. + :type create_local: bool + """ + ddocs = build_ddocs(prefix) + for ddoc_name, ddoc_content in ddocs.iteritems(): + with open(join(prefix, '%s.json' % ddoc_name), 'w') as f: + f.write(json.dumps(ddoc_content, indent=4)) + + +def build_ddocs(prefix=prefix): + """ + Build design documents based on content from subdirectories in + C{prefix}. + + :param prefix: Where to look for design documents definitions. + :type prefix: str + + :return: A dictionary containing the design docs definitions. + :rtype: dict + """ + ddocs = {} + # design docs are represented by subdirectories in current directory + for ddoc in [f for f in listdir(prefix) if isdir(join(prefix, f))]: + logger.debug("Building %s.json ..." % ddoc) + + ddocs[ddoc] = {} + + for t in ['views', 'lists', 'updates']: + tdir = join(prefix, ddoc, t) + if not isdir(tdir): + logger.debug(" - no %s" % t) + else: + + ddocs[ddoc][t] = {} + + if t == 'views': # handle views (with map/reduce functions) + for view in [f for f in listdir(tdir) \ + if isdir(join(tdir, f))]: + logger.debug(" - view: %s" % view) + # look for map.js and reduce.js + mapfile = join(tdir, view, 'map.js') + reducefile = join(tdir, view, 'reduce.js') + mapfun = None + reducefun = None + try: + with open(mapfile) as f: + mapfun = f.read() + except IOError: + pass + try: + with open(reducefile) as f: + reducefun = f.read() + except IOError: + pass + ddocs[ddoc]['views'][view] = {} + + if mapfun is not None: + ddocs[ddoc]['views'][view]['map'] = mapfun + if reducefun is not None: + ddocs[ddoc]['views'][view]['reduce'] = reducefun + + else: # handle lists, updates, etc + for fun in [f for f in listdir(tdir) \ + if isfile(join(tdir, f))]: + logger.debug(" - %s: %s" % (t, fun)) + funfile = join(tdir, fun) + funname = basename(funfile).replace('.js', '') + try: + with open(funfile) as f: + ddocs[ddoc][t][funname] = f.read() + except IOError: + pass + return ddocs diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js new file mode 100644 index 00000000..5a4647de --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/updates/put.js @@ -0,0 +1,64 @@ +function(doc, req){ + /* we expect to receive the following in `req.body`: + * { + * 'couch_rev': '', + * 'u1db_rev': '', + * 'content': '', + * 'trans_id': '' + * 'conflicts': '', + * 'update_conflicts': + * } + */ + var body = JSON.parse(req.body); + + // create a new document document + if (!doc) { + doc = {} + doc['_id'] = req['id']; + } + // or fail if couch revisions do not match + else if (doc['_rev'] != body['couch_rev']) { + // of fail if revisions do not match + return [null, 'revision conflict'] + } + + // store u1db rev + doc.u1db_rev = body['u1db_rev']; + + // save content as attachment + if (body['content'] != null) { + // save u1db content as attachment + if (!doc._attachments) + doc._attachments = {}; + doc._attachments.u1db_content = { + content_type: "application/octet-stream", + data: body['content'] // should be base64 encoded + }; + } + // or delete the attachment if document is tombstone + else if (doc._attachments && + doc._attachments.u1db_content) + delete doc._attachments.u1db_content; + + // store the transaction id + if (!doc.u1db_transactions) + doc.u1db_transactions = []; + var d = new Date(); + doc.u1db_transactions.push([d.getTime(), body['trans_id']]); + + // save conflicts as attachment if they were sent + if (body['update_conflicts']) + if (body['conflicts'] != null) { + if (!doc._attachments) + doc._attachments = {}; + doc._attachments.u1db_conflicts = { + content_type: "application/octet-stream", + data: body['conflicts'] // should be base64 encoded + } + } else { + if(doc._attachments && doc._attachments.u1db_conflicts) + delete doc._attachments.u1db_conflicts + } + + return [doc, 'ok']; +} diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js new file mode 100644 index 00000000..7ba66cf8 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js @@ -0,0 +1,39 @@ +function(doc, req){ + /* we expect to receive the following in `req.body`: + * { + * 'couch_rev': '', + * 'conflicts': '', + * } + */ + var body = JSON.parse(req.body); + + // fail if no document was given + if (!doc) { + return [null, 'document does not exist'] + } + + // fail if couch revisions do not match + if (body['couch_rev'] != null + && doc['_rev'] != body['couch_rev']) { + return [null, 'revision conflict'] + } + + // fail if conflicts were not sent + if (body['conflicts'] == null) + return [null, 'missing conflicts'] + + // save conflicts as attachment if they were sent + if (body['conflicts'] != null) { + if (!doc._attachments) + doc._attachments = {}; + doc._attachments.u1db_conflicts = { + content_type: "application/octet-stream", + data: body['conflicts'] // should be base64 encoded + } + } + // or delete attachment if there are no conflicts + else if (doc._attachments && doc._attachments.u1db_conflicts) + delete doc._attachments.u1db_conflicts; + + return [doc, 'ok']; +} diff --git a/common/src/leap/soledad/common/ddocs/docs/views/get/map.js b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js new file mode 100644 index 00000000..ae08d9e9 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js @@ -0,0 +1,20 @@ +function(doc) { + if (doc.u1db_rev) { + var is_tombstone = true; + var has_conflicts = false; + if (doc._attachments) { + if (doc._attachments.u1db_content) + is_tombstone = false; + if (doc._attachments.u1db_conflicts) + has_conflicts = true; + } + emit(doc._id, + { + "couch_rev": doc._rev, + "u1db_rev": doc.u1db_rev, + "is_tombstone": is_tombstone, + "has_conflicts": has_conflicts, + } + ); + } +} diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js new file mode 100644 index 00000000..722f695a --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -0,0 +1,22 @@ +function(doc, req){ + if (!doc) { + doc = {} + doc['_id'] = 'u1db_sync_log'; + doc['syncs'] = []; + } + body = JSON.parse(req.body); + // remove outdated info + doc['syncs'] = doc['syncs'].filter( + function (entry) { + return entry[0] != body['other_replica_uid']; + } + ); + // store u1db rev + doc['syncs'].push([ + body['other_replica_uid'], + body['other_generation'], + body['other_transaction_id'] + ]); + return [doc, 'ok']; +} + diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js new file mode 100644 index 00000000..a63c7cf4 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js @@ -0,0 +1,12 @@ +function(doc) { + if (doc._id == 'u1db_sync_log') { + if (doc.syncs) + doc.syncs.forEach(function (entry) { + emit(entry[0], + { + 'known_generation': entry[1], + 'known_transaction_id': entry[2] + }); + }); + } +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js new file mode 100644 index 00000000..dbdfff0d --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js @@ -0,0 +1,20 @@ +function(head, req) { + var row; + var rows=[]; + // fetch all rows + while(row = getRow()) { + rows.push(row); + } + if (rows.length > 0) + send(JSON.stringify({ + "generation": rows.length, + "doc_id": rows[rows.length-1]['id'], + "transaction_id": rows[rows.length-1]['value'] + })); + else + send(JSON.stringify({ + "generation": 0, + "doc_id": "", + "transaction_id": "", + })); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js new file mode 100644 index 00000000..2ec91794 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js @@ -0,0 +1,19 @@ +function(head, req) { + var row; + var rows=[]; + var i = 1; + var gen = 1; + if (req.query.gen) + gen = parseInt(req.query['gen']); + // fetch all rows + while(row = getRow()) + rows.push(row); + if (gen <= rows.length) + send(JSON.stringify({ + "generation": gen, + "doc_id": rows[gen-1]['id'], + "transaction_id": rows[gen-1]['value'], + })); + else + send('{}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js new file mode 100644 index 00000000..b35cdf51 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js @@ -0,0 +1,22 @@ +function(head, req) { + var row; + var gen = 1; + var old_gen = 0; + if (req.query.old_gen) + old_gen = parseInt(req.query['old_gen']); + send('{"transactions":[\n'); + // fetch all rows + while(row = getRow()) { + if (gen > old_gen) { + if (gen > old_gen+1) + send(',\n'); + send(JSON.stringify({ + "generation": gen, + "doc_id": row["id"], + "transaction_id": row["value"] + })); + } + gen++; + } + send('\n]}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js new file mode 100644 index 00000000..94ef63ca --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js @@ -0,0 +1,7 @@ +function(doc) { + if (doc.u1db_transactions) + doc.u1db_transactions.forEach(function(t) { + emit(t[0], // use timestamp as key so the results are ordered + t[1]); // value is the transaction_id + }); +} diff --git a/common/src/leap/soledad/common/objectstore.py b/common/src/leap/soledad/common/objectstore.py deleted file mode 100644 index 7aff3e32..00000000 --- a/common/src/leap/soledad/common/objectstore.py +++ /dev/null @@ -1,282 +0,0 @@ -# -*- coding: utf-8 -*- -# objectstore.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - - -""" -Abstract U1DB backend to handle storage using object stores (like CouchDB, for -example). - -This backend uses special documents to store all U1DB data (replica uid, -indexes, transaction logs and info about other dbs). The id of these documents -are reserved and have prefix equal to ObjectStore.U1DB_DATA_DOC_ID_PREFIX. - -Right now, this is only used by CouchDatabase backend, but can also be -extended to implement OpenStack or Amazon S3 storage, for example. - -See U1DB documentation for more information on how to use databases. -""" - - -from base64 import b64encode, b64decode - - -import uuid -import simplejson as json - - -from u1db import errors -from u1db.backends.inmemory import ( - InMemoryDatabase, - InMemorySyncTarget, -) - - -class ObjectStoreDatabase(InMemoryDatabase): - """ - A backend for storing u1db data in an object store. - """ - - U1DB_DATA_DOC_ID_PREFIX = 'u1db/' - - @classmethod - def open_database(cls, url, create, document_factory=None): - """ - Open a U1DB database using an object store as backend. - - @param url: the url of the database replica - @type url: str - @param create: should the replica be created if it does not exist? - @type create: bool - @param document_factory: A function that will be called with the same - parameters as Document.__init__. - @type document_factory: callable - - @return: the database instance - @rtype: CouchDatabase - """ - raise NotImplementedError(cls.open_database) - - def __init__(self, replica_uid=None, document_factory=None): - """ - Initialize the object store database. - - @param replica_uid: an optional unique replica identifier - @type replica_uid: str - @param document_factory: A function that will be called with the same - parameters as Document.__init__. - @type document_factory: callable - """ - InMemoryDatabase.__init__( - self, - replica_uid, - document_factory=document_factory) - if self._replica_uid is None: - self._replica_uid = uuid.uuid4().hex - self._init_u1db_data() - - def _init_u1db_data(self): - """ - Initialize u1db configuration data on backend storage. - - A U1DB database needs to keep track of all database transactions, - document conflicts, the generation of other replicas it has seen, - indexes created by users and so on. - - In this implementation, all this information is stored in special - documents stored in the couch db with id prefix equal to - U1DB_DATA_DOC_ID_PREFIX. Those documents ids are reserved: - put_doc(), get_doc() and delete_doc() will not allow documents with - a doc_id with that prefix to be accessed or modified. - """ - raise NotImplementedError(self._init_u1db_data) - - #------------------------------------------------------------------------- - # methods from Database - #------------------------------------------------------------------------- - - def put_doc(self, doc): - """ - Update a document. - - If the document currently has conflicts, put will fail. - If the database specifies a maximum document size and the document - exceeds it, put will fail and raise a DocumentTooBig exception. - - This method prevents from updating the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc: A Document with new content. - @type doc: Document - - @return: new_doc_rev - The new revision identifier for the document. - The Document object will also be updated. - @rtype: str - """ - if doc.doc_id is not None and \ - doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - return InMemoryDatabase.put_doc(self, doc) - - def _put_doc(self, doc): - """ - Update a document. - - This is called everytime we just want to do a raw put on the db (i.e. - without index updates, document constraint checks, and conflict - checks). - - @param doc: The document to update. - @type doc: u1db.Document - - @return: The new revision identifier for the document. - @rtype: str - """ - raise NotImplementedError(self._put_doc) - - def get_doc(self, doc_id, include_deleted=False): - """ - Get the JSON string for the given document. - - This method prevents from getting the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise asking for a deleted - document will return None. - @type include_deleted: bool - - @return: a Document object. - @rtype: Document - """ - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - return InMemoryDatabase.get_doc(self, doc_id, include_deleted) - - def _get_doc(self, doc_id): - """ - Get just the document content, without fancy handling. - - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise asking for a deleted - document will return None. - @type include_deleted: bool - - @return: a Document object. - @type: u1db.Document - """ - raise NotImplementedError(self._get_doc) - - def get_all_docs(self, include_deleted=False): - """ - Get the JSON content for all documents in the database. - - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise deleted documents will not - be included in the results. - @type include_deleted: bool - - @return: (generation, [Document]) - The current generation of the database, followed by a list of all - the documents in the database. - @rtype: tuple - """ - generation = self._get_generation() - results = [] - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue - doc = self._get_doc(doc_id, check_for_conflicts=True) - if doc.content is None and not include_deleted: - continue - results.append(doc) - return (generation, results) - - def delete_doc(self, doc): - """ - Mark a document as deleted. - - This method prevents from deleting the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc: The document to mark as deleted. - @type doc: u1db.Document - - @return: The new revision id of the document. - @type: str - """ - if doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) - if old_doc is None: - raise errors.DocumentDoesNotExist - if old_doc.rev != doc.rev: - raise errors.RevisionConflict() - if old_doc.is_tombstone(): - raise errors.DocumentAlreadyDeleted - if old_doc.has_conflicts: - raise errors.ConflictedDoc() - new_rev = self._allocate_doc_rev(doc.rev) - doc.rev = new_rev - doc.make_tombstone() - self._put_and_update_indexes(old_doc, doc) - return new_rev - - # index-related methods - - def create_index(self, index_name, *index_expressions): - """ - Create a named index, which can then be queried for future lookups. - - See U1DB documentation for more information. - - @param index_name: A unique name which can be used as a key prefix. - @param index_expressions: Index expressions defining the index - information. - """ - raise NotImplementedError(self.create_index) - - #------------------------------------------------------------------------- - # implemented methods from CommonBackend - #------------------------------------------------------------------------- - - def _put_and_update_indexes(self, old_doc, doc): - """ - Update a document and all indexes related to it. - - @param old_doc: The old version of the document. - @type old_doc: u1db.Document - @param doc: The new version of the document. - @type doc: u1db.Document - """ - for index in self._indexes.itervalues(): - if old_doc is not None and not old_doc.is_tombstone(): - index.remove_json(old_doc.doc_id, old_doc.get_json()) - if not doc.is_tombstone(): - index.add_json(doc.doc_id, doc.get_json()) - trans_id = self._allocate_transaction_id() - self._put_doc(doc) - self._transaction_log.append((doc.doc_id, trans_id)) - - -class ObjectStoreSyncTarget(InMemorySyncTarget): - """ - Functionality for using an ObjectStore as a synchronization target. - """ diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 42edf9fe..bdef4e0d 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -27,7 +27,6 @@ from base64 import b64decode from leap.common.files import mkdir_p -from leap.soledad.common.document import SoledadDocument from leap.soledad.common.tests import u1db_tests as tests from leap.soledad.common.tests.u1db_tests import test_backends from leap.soledad.common.tests.u1db_tests import test_sync @@ -188,7 +187,7 @@ def copy_couch_database_for_test(test, db): def make_document_for_test(test, doc_id, rev, content, has_conflicts=False): - return SoledadDocument(doc_id, rev, content, has_conflicts=has_conflicts) + return couch.CouchDocument(doc_id, rev, content, has_conflicts=has_conflicts) COUCH_SCENARIOS = [ -- cgit v1.2.3