From ae2894ba47ee5bc905f298db6b67ae40af6ebd74 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 4 Dec 2013 13:42:59 -0200 Subject: Make couch backend consistent and lightweight. This commit introduces the use of couch view, list and update functions to put and get data from the couch database. This avoids loads of metadata transferring and encapsulates operations in atomic PUTs. --- .gitignore | 2 - common/setup.py | 86 ++ common/src/leap/soledad/common/README.txt | 79 ++ common/src/leap/soledad/common/couch.py | 1229 ++++++++++++++------ common/src/leap/soledad/common/ddocs/README.txt | 29 + common/src/leap/soledad/common/ddocs/__init__.py | 138 +++ .../leap/soledad/common/ddocs/docs/updates/put.js | 64 + .../common/ddocs/docs/updates/resolve_doc.js | 39 + .../soledad/common/ddocs/docs/views/get/map.js | 20 + .../leap/soledad/common/ddocs/syncs/updates/put.js | 22 + .../soledad/common/ddocs/syncs/views/log/map.js | 12 + .../common/ddocs/transactions/lists/generation.js | 20 + .../ddocs/transactions/lists/trans_id_for_gen.js | 19 + .../ddocs/transactions/lists/whats_changed.js | 22 + .../common/ddocs/transactions/views/log/map.js | 7 + common/src/leap/soledad/common/objectstore.py | 282 ----- common/src/leap/soledad/common/tests/test_couch.py | 3 +- 17 files changed, 1411 insertions(+), 662 deletions(-) create mode 100644 common/src/leap/soledad/common/README.txt create mode 100644 common/src/leap/soledad/common/ddocs/README.txt create mode 100644 common/src/leap/soledad/common/ddocs/__init__.py create mode 100644 common/src/leap/soledad/common/ddocs/docs/updates/put.js create mode 100644 common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js create mode 100644 common/src/leap/soledad/common/ddocs/docs/views/get/map.js create mode 100644 common/src/leap/soledad/common/ddocs/syncs/updates/put.js create mode 100644 common/src/leap/soledad/common/ddocs/syncs/views/log/map.js create mode 100644 common/src/leap/soledad/common/ddocs/transactions/lists/generation.js create mode 100644 common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js create mode 100644 common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js create mode 100644 common/src/leap/soledad/common/ddocs/transactions/views/log/map.js delete mode 100644 common/src/leap/soledad/common/objectstore.py diff --git a/.gitignore b/.gitignore index 701f48fc..bd170f79 100644 --- a/.gitignore +++ b/.gitignore @@ -10,5 +10,3 @@ MANIFEST *.pyc *.log *.*~ - - diff --git a/common/setup.py b/common/setup.py index bcc2b4b3..42bf272a 100644 --- a/common/setup.py +++ b/common/setup.py @@ -103,6 +103,92 @@ def get_versions(default={}, verbose=False): f.write(subst_template) +from os import listdir +from os.path import realpath, dirname, isdir, join, isfile, basename +import json +import logging +import binascii + + +old_cmd_sdist = cmdclass["sdist"] + + +class cmd_sdist(old_cmd_sdist): + """ + Generate 'src/leap/soledad/common/ddocs.py' which contains coush design + documents scripts. + """ + + def run(self): + old_cmd_sdist.run(self) + self.build_ddocs_py() + + def build_ddocs_py(self): + """ + Build couch design documents based on content from subdirectories in + 'src/leap/soledad/common/ddocs'. + """ + prefix = join( + dirname(realpath(__file__)), + 'src', 'leap', 'soledad', 'common') + ddocs_prefix = join(prefix, 'ddocs') + ddocs = {} + + # design docs are represented by subdirectories of `ddocs_prefix` + for ddoc in [f for f in listdir(ddocs_prefix) + if isdir(join(ddocs_prefix, f))]: + + ddocs[ddoc] = {'_id': '_design/%s' % ddoc} + + for t in ['views', 'lists', 'updates']: + tdir = join(ddocs_prefix, ddoc, t) + if isdir(tdir): + + ddocs[ddoc][t] = {} + + if t == 'views': # handle views (with map/reduce functions) + for view in [f for f in listdir(tdir) \ + if isdir(join(tdir, f))]: + # look for map.js and reduce.js + mapfile = join(tdir, view, 'map.js') + reducefile = join(tdir, view, 'reduce.js') + mapfun = None + reducefun = None + try: + with open(mapfile) as f: + mapfun = f.read() + except IOError: + pass + try: + with open(reducefile) as f: + reducefun = f.read() + except IOError: + pass + ddocs[ddoc]['views'][view] = {} + + if mapfun is not None: + ddocs[ddoc]['views'][view]['map'] = mapfun + if reducefun is not None: + ddocs[ddoc]['views'][view]['reduce'] = reducefun + + else: # handle lists, updates, etc + for fun in [f for f in listdir(tdir) \ + if isfile(join(tdir, f))]: + funfile = join(tdir, fun) + funname = basename(funfile).replace('.js', '') + try: + with open(funfile) as f: + ddocs[ddoc][t][funname] = f.read() + except IOError: + pass + # write file containing design docs strings + with open(join(prefix, 'ddocs.py'), 'w') as f: + for ddoc in ddocs: + f.write( + "%s = '%s'\n" % + (ddoc, binascii.b2a_base64(json.dumps(ddocs[ddoc]))[:-1])) + + cmdclass["freeze_debianver"] = freeze_debianver # XXX add ref to docs diff --git a/common/src/leap/soledad/common/README.txt b/common/src/leap/soledad/common/README.txt new file mode 100644 index 00000000..106efb5e --- /dev/null +++ b/common/src/leap/soledad/common/README.txt @@ -0,0 +1,79 @@ +Soledad common package +====================== + +This package contains Soledad bits used by both server and client. + +Couch U1DB Backend +------------------ + +U1DB backends rely on some atomic operations that modify documents contents +and metadata (conflicts, transaction ids and indexes). The only atomic +operation in Couch is a document put, so every u1db atomic operation has to be +mapped to a couch document put. + +The atomic operations in the U1DB SQLite reference backend implementation may +be identified by the use of a context manager to access the underlying +database. A listing of the methods involved in each atomic operation are +depiced below. The top-level elements correpond to the atomic operations that +have to be mapped, and items on deeper levels of the list have to be +implemented in a way that all changes will be pushed with just one operation. + + * _set_replica_uid + * put_doc: + * _get_doc + * _put_and_update_indexes + * insert/update the document + * insert into transaction log + * delete_doc + * _get_doc + * _put_and_update_indexes + * get_doc_conflicts + * _get_conflicts + * _set_replica_gen_and_trans_id + * _do_set_replica_gen_and_trans_id + * _put_doc_if_newer + * _get_doc + * _validate_source (**) + * _get_replica_gen_and_trans_id + * cases: + * is newer: + * _prune_conflicts (**) + * _has_conflicts + * _delete_conflicts + * _put_and_update_indexes + * same content as: + * _put_and_update_indexes + * conflicted: + * _force_doc_sync_conflict + * _prune_conflicts + * _add_conflict + * _put_and_update_indexes + * _do_set_replica_gen_and_trans_id + * resolve_doc + * _get_doc + * cases: + * doc is superseded + * _put_and_update_indexes + * else + * _add_conflict + * _delete_conflicts + * delete_index + * create_index + +Couch views and update functions are used in order to achieve atomicity on the +Couch backend. Transactions are stored in the `u1db_transactions` field of the +couch document. Document's content and conflicted versions are stored as couch +document attachments with names, respectivelly, `u1db_content` and +`u1db_conflicts`. + +A map of methods and couch query URI can be found on the `./ddocs/README.txt` +document. + +Notes: + + * Currently, the couch backend does not implement indexing, so what is + depicted as `_put_and_update_indexes` above will be found as `_put_doc` in + the backend. + + * Conflict updates are part of document put using couch update functions, + and as such are part of the same atomic operation as document put. diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 1396f4d7..3b0e042a 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -18,26 +18,34 @@ """A U1DB backend that uses CouchDB as its persistence layer.""" -import re +import os import simplejson as json -import socket +import sys +import time +import uuid import logging +import binascii +import socket -from u1db import errors -from u1db.sync import Synchronizer -from u1db.backends.inmemory import InMemoryIndex +from u1db.backends import CommonBackend, CommonSyncTarget from u1db.remote.server_state import ServerState -from u1db.errors import DatabaseDoesNotExist -from couchdb.client import Server, Document as CouchDocument +from u1db import ( + errors, + query_parser, + vectorclock, +) +from couchdb.client import ( + Server, + Document as CouchDocument, + _doc_resource, +) from couchdb.http import ResourceNotFound, Unauthorized from leap.soledad.common import USER_DB_PREFIX -from leap.soledad.common.objectstore import ( - ObjectStoreDatabase, - ObjectStoreSyncTarget, -) +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common.ddocs import ensure_ddocs_on_remote_db logger = logging.getLogger(__name__) @@ -49,158 +57,125 @@ class InvalidURLError(Exception): """ -def persistent_class(cls): +class CouchDocument(SoledadDocument): """ - Decorator that modifies a class to ensure u1db metadata persists on - underlying storage. + This is the document used for maintaining the Couch backend. - @param cls: The class that will be modified. - @type cls: type + A CouchDocument can fetch and manipulate conflicts and also holds a + reference to the couch document revision. This data is used to ensure an + atomic and consistent update of the database. """ - def _create_persistent_method(old_method_name, key, load_method_name, - dump_method_name, store): - """ - Create a persistent method to replace C{old_method_name}. - - The new method will load C{key} using C{load_method_name} and stores - it using C{dump_method_name} depending on the value of C{store}. - """ - # get methods - old_method = getattr(cls, old_method_name) - load_method = getattr(cls, load_method_name) \ - if load_method_name is not None \ - else lambda self, data: setattr(self, key, data) - dump_method = getattr(cls, dump_method_name) \ - if dump_method_name is not None \ - else lambda self: getattr(self, key) - - def _new_method(self, *args, **kwargs): - # get u1db data from couch db - doc = self._get_doc('%s%s' % - (self.U1DB_DATA_DOC_ID_PREFIX, key)) - load_method(self, doc.content['content']) - # run old method - retval = old_method(self, *args, **kwargs) - # store u1db data on couch - if store: - doc.content = {'content': dump_method(self)} - self._put_doc(doc) - return retval - - return _new_method - - # ensure the class has a persistency map - if not hasattr(cls, 'PERSISTENCY_MAP'): - logger.error('Class %s has no PERSISTENCY_MAP attribute, skipping ' - 'persistent methods substitution.' % cls) - return cls - # replace old methods with new persistent ones - for key, ((load_method_name, dump_method_name), - persistent_methods) in cls.PERSISTENCY_MAP.iteritems(): - for (method_name, store) in persistent_methods: - setattr(cls, method_name, - _create_persistent_method( - method_name, - key, - load_method_name, - dump_method_name, - store)) - return cls - - -@persistent_class -class CouchDatabase(ObjectStoreDatabase): + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, + syncable=True): + """ + Container for handling a document that is stored in couch backend. + + :param doc_id: The unique document identifier. + :type doc_id: str + :param rev: The revision identifier of the document. + :type rev: str + :param json: The JSON string for this document. + :type json: str + :param has_conflicts: Boolean indicating if this document has conflicts + :type has_conflicts: bool + :param syncable: Should this document be synced with remote replicas? + :type syncable: bool + """ + SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) + self._couch_rev = None + self._conflicts = None + self._modified_conflicts = False + + def ensure_fetch_conflicts(self, get_conflicts_fun): + """ + Ensure conflict data has been fetched from the server. + + :param get_conflicts_fun: A function which, given the document id and + the couch revision, return the conflicted + versions of the current document. + :type get_conflicts_fun: function + """ + if self._conflicts is None: + self._conflicts = get_conflicts_fun(self.doc_id, + couch_rev=self.couch_rev) + self.has_conflicts = len(self._conflicts) > 0 + + def get_conflicts(self): + """ + Get the conflicted versions of the document. + + :return: The conflicted versions of the document. + :rtype: [CouchDocument] + """ + return self._conflicts + + def add_conflict(self, doc): + """ + Add a conflict to this document. + + :param doc: The conflicted version to be added. + :type doc: CouchDocument + """ + if self._conflicts is None: + raise Exception("Run self.ensure_fetch_conflicts first!") + self._modified_conflicts = True + self._conflicts.append(doc) + self.has_conflicts = len(self._conflicts) > 0 + + def delete_conflicts(self, conflict_revs): + """ + Delete conflicted versions of this document. + + :param conflict_revs: The conflicted revisions to be deleted. + :type conflict_revs: [str] + """ + if self._conflicts is None: + raise Exception("Run self.ensure_fetch_conflicts first!") + conflicts_len = len(self._conflicts) + self._conflicts = filter( + lambda doc: doc.rev not in conflict_revs, + self._conflicts) + if len(self._conflicts) < conflicts_len: + self._modified_conflicts = True + self.has_conflicts = len(self._conflicts) > 0 + + def modified_conflicts(self): + """ + Return whether this document's conflicts have been modified. + + :return: Whether this document's conflicts have been modified. + :rtype: bool + """ + return self._conflicts is not None and \ + self._modified_conflicts is True + + def _get_couch_rev(self): + return self._couch_rev + + def _set_couch_rev(self, rev): + self._couch_rev = rev + + couch_rev = property(_get_couch_rev, _set_couch_rev) + + +class CouchDatabase(CommonBackend): """ - A U1DB backend that uses Couch as its persistence layer. + A U1DB implementation that uses CouchDB as its persistence layer. """ - U1DB_TRANSACTION_LOG_KEY = '_transaction_log' - U1DB_CONFLICTS_KEY = '_conflicts' - U1DB_OTHER_GENERATIONS_KEY = '_other_generations' - U1DB_INDEXES_KEY = '_indexes' - U1DB_REPLICA_UID_KEY = '_replica_uid' - - U1DB_DATA_KEYS = [ - U1DB_TRANSACTION_LOG_KEY, - U1DB_CONFLICTS_KEY, - U1DB_OTHER_GENERATIONS_KEY, - U1DB_INDEXES_KEY, - U1DB_REPLICA_UID_KEY, - ] - - COUCH_ID_KEY = '_id' - COUCH_REV_KEY = '_rev' - COUCH_U1DB_ATTACHMENT_KEY = 'u1db_json' - COUCH_U1DB_REV_KEY = 'u1db_rev' - - # the following map describes information about methods usage of - # properties that have to persist on the underlying database. The format - # of the map is assumed to be: - # - # { - # 'property_name': [ - # ('property_load_method_name', 'property_dump_method_name'), - # [('method_1_name', bool), - # ... - # ('method_N_name', bool)]], - # ... - # } - # - # where the booleans indicate if the property should be stored after - # each method execution (i.e. if the method alters the property). Property - # load/dump methods will be run after/before properties are read/written - # to the underlying db. - PERSISTENCY_MAP = { - U1DB_TRANSACTION_LOG_KEY: [ - ('_load_transaction_log_from_json', None), - [('_get_transaction_log', False), - ('_get_generation', False), - ('_get_generation_info', False), - ('_get_trans_id_for_gen', False), - ('whats_changed', False), - ('_put_and_update_indexes', True)]], - U1DB_CONFLICTS_KEY: [ - (None, None), - [('_has_conflicts', False), - ('get_doc_conflicts', False), - ('_prune_conflicts', False), - ('resolve_doc', False), - ('_replace_conflicts', True), - ('_force_doc_sync_conflict', True)]], - U1DB_OTHER_GENERATIONS_KEY: [ - ('_load_other_generations_from_json', None), - [('_get_replica_gen_and_trans_id', False), - ('_do_set_replica_gen_and_trans_id', True)]], - U1DB_INDEXES_KEY: [ - ('_load_indexes_from_json', '_dump_indexes_as_json'), - [('list_indexes', False), - ('get_from_index', False), - ('get_range_from_index', False), - ('get_index_keys', False), - ('_put_and_update_indexes', True), - ('create_index', True), - ('delete_index', True)]], - U1DB_REPLICA_UID_KEY: [ - (None, None), - [('_allocate_doc_rev', False), - ('_put_doc_if_newer', False), - ('_ensure_maximal_rev', False), - ('_prune_conflicts', False), - ('_set_replica_uid', True)]]} - @classmethod def open_database(cls, url, create): """ Open a U1DB database using CouchDB as backend. - @param url: the url of the database replica - @type url: str - @param create: should the replica be created if it does not exist? - @type create: bool + :param url: the url of the database replica + :type url: str + :param create: should the replica be created if it does not exist? + :type create: bool - @return: the database instance - @rtype: CouchDatabase + :return: the database instance + :rtype: CouchDatabase """ # get database from url m = re.match('(^https?://[^/]+)/(.+)$', url) @@ -221,293 +196,795 @@ class CouchDatabase(ObjectStoreDatabase): """ Create a new Couch data container. - @param url: the url of the couch database - @type url: str - @param dbname: the database name - @type dbname: str - @param replica_uid: an optional unique replica identifier - @type replica_uid: str - @param full_commit: turn on the X-Couch-Full-Commit header - @type full_commit: bool - @param session: an http.Session instance or None for a default session - @type session: http.Session + :param url: the url of the couch database + :type url: str + :param dbname: the database name + :type dbname: str + :param replica_uid: an optional unique replica identifier + :type replica_uid: str + :param full_commit: turn on the X-Couch-Full-Commit header + :type full_commit: bool + :param session: an http.Session instance or None for a default session + :type session: http.Session + :param ensure_ddocs: Ensure that the design docs exist on server. + :type ensure_ddocs: bool """ # save params self._url = url self._full_commit = full_commit self._session = session + self._factory = CouchDocument + self._real_replica_uid = None # configure couch self._server = Server(url=self._url, full_commit=self._full_commit, session=self._session) self._dbname = dbname - # this will ensure that transaction and sync logs exist and are - # up-to-date. try: self._database = self._server[self._dbname] except ResourceNotFound: self._server.create(self._dbname) self._database = self._server[self._dbname] - ObjectStoreDatabase.__init__(self, replica_uid=replica_uid) + self._initialize(replica_uid or uuid.uuid4().hex) + + def get_sync_target(self): + """ + Return a SyncTarget object, for another u1db to synchronize with. + + :return: The sync target. + :rtype: CouchSyncTarget + """ + return CouchSyncTarget(self) + + def delete_database(self): + """ + Delete a U1DB CouchDB database. + """ + del(self._server[self._dbname]) + + def close(self): + """ + Release any resources associated with this database. + + :return: True if db was succesfully closed. + :rtype: bool + """ + self._url = None + self._full_commit = None + self._session = None + self._server = None + self._database = None + return True + + def _set_replica_uid(self, replica_uid): + """ + Force the replica uid to be set. + + :param replica_uid: The new replica uid. + :type replica_uid: str + """ + try: + doc = self._database['u1db_config'] + except ResourceNotFound: + doc = { + '_id': 'u1db_config', + 'replica_uid': replica_uid, + } + self._database.save(doc) + + def _ensure_design_docs(self): + """ + Ensure that the design docs have been created. + """ + if self._is_initialized(): + return + self._initialize() + + def _set_replica_uid(self, replica_uid): + """Force the replica_uid to be set.""" + doc = self._database['u1db_config'] + doc['replica_uid'] = replica_uid + self._database.save(doc) + self._real_replica_uid = replica_uid + + def _get_replica_uid(self): + """ + Get the replica uid. + + :return: The replica uid. + :rtype: str + """ + if self._real_replica_uid is not None: + return self._real_replica_uid + doc = self._database['u1db_config'] + self._real_replica_uid = doc['replica_uid'] + return self._real_replica_uid + + _replica_uid = property(_get_replica_uid, _set_replica_uid) + + def _get_generation(self): + """ + Return the current generation. + + :return: The current generation. + :rtype: int + """ + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'generation', 'log') + response = res.get_json() + return response[2]['generation'] + + def _get_generation_info(self): + """ + Return the current generation. - #------------------------------------------------------------------------- - # methods from Database - #------------------------------------------------------------------------- + :return: A tuple containing the current generation and transaction id. + :rtype: (int, str) + """ + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'generation', 'log') + response = res.get_json() + return (response[2]['generation'], response[2]['transaction_id']) + + def _get_trans_id_for_gen(self, generation): + """ + Get the transaction id corresponding to a particular generation. + + :param generation: The generation for which to get the transaction id. + :type generation: int + + :return: The transaction id for C{generation}. + :rtype: str + + :raise InvalidGeneration: Raised when the generation does not exist. + """ + if generation == 0: + return '' + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'trans_id_for_gen', 'log') + response = res.get_json(gen=generation) + if response[2] == {}: + raise errors.InvalidGeneration + return response[2]['transaction_id'] + + def _get_transaction_log(self): + """ + This is only for the test suite, it is not part of the api. + + :return: The complete transaction log. + :rtype: [(str, str)] + """ + # query a couch view + res = self._database.resource( + '_design', 'transactions', '_view', 'log') + response = res.get_json() + return map(lambda row: (row['id'], row['value']), response[2]['rows']) def _get_doc(self, doc_id, check_for_conflicts=False): """ - Get just the document content, without fancy handling. + Extract the document from storage. + + This can return None if the document doesn't exist. + + :param doc_id: The unique document identifier + :type doc_id: str + :param check_for_conflicts: If set to False, then the conflict check + will be skipped. + :type check_for_conflicts: bool + + :return: The document. + :rtype: CouchDocument + """ + # get document with all attachments (u1db content and eventual + # conflicts) + try: + result = \ + self._database.resource(doc_id).get_json( + attachments=True)[2] + except ResourceNotFound: + return None + # restrict to u1db documents + if 'u1db_rev' not in result: + return None + doc = self._factory(doc_id, result['u1db_rev']) + # set contents or make tombstone + if '_attachments' not in result \ + or 'u1db_content' not in result['_attachments']: + doc.make_tombstone() + else: + doc.content = json.loads( + binascii.a2b_base64( + result['_attachments']['u1db_content']['data'])) + # determine if there are conflicts + if check_for_conflicts \ + and '_attachments' in result \ + and 'u1db_conflicts' in result['_attachments']: + doc.has_conflicts = True + # store couch revision + doc.couch_rev = result['_rev'] + return doc + + def get_doc(self, doc_id, include_deleted=False): + """ + Get the JSON string for the given document. - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be + :param doc_id: The unique document identifier + :type doc_id: str + :param include_deleted: If set to True, deleted documents will be returned with empty content. Otherwise asking for a deleted document will return None. - @type include_deleted: bool + :type include_deleted: bool - @return: a Document object. - @type: u1db.Document + :return: A document object. + :rtype: CouchDocument. """ - cdoc = self._database.get(doc_id) - if cdoc is None: + doc = self._get_doc(doc_id, check_for_conflicts=True) + if doc is None: + return None + if doc.is_tombstone() and not include_deleted: return None - has_conflicts = False - if check_for_conflicts: - has_conflicts = self._has_conflicts(doc_id) - doc = self._factory( - doc_id=doc_id, - rev=cdoc[self.COUCH_U1DB_REV_KEY], - has_conflicts=has_conflicts) - contents = self._database.get_attachment( - cdoc, - self.COUCH_U1DB_ATTACHMENT_KEY) - if contents: - doc.content = json.loads(contents.read()) - else: - doc.make_tombstone() return doc def get_all_docs(self, include_deleted=False): """ Get the JSON content for all documents in the database. - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise deleted documents will not - be included in the results. - @type include_deleted: bool + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :type include_deleted: bool - @return: (generation, [Document]) + :return: (generation, [CouchDocument]) The current generation of the database, followed by a list of all the documents in the database. - @rtype: tuple + :rtype: (int, [CouchDocument]) """ + generation = self._get_generation() results = [] - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue - doc = self._get_doc(doc_id, check_for_conflicts=True) - if doc.content is None and not include_deleted: - continue - results.append(doc) + for row in self._database.view('_all_docs'): + doc = self.get_doc(row.id, include_deleted=include_deleted) + if doc is not None: + results.append(doc) return (generation, results) - def _put_doc(self, doc): + def _put_doc(self, old_doc, doc): """ - Update a document. + Put the document in the Couch backend database. - This is called everytime we just want to do a raw put on the db (i.e. - without index updates, document constraint checks, and conflict - checks). - - @param doc: The document to update. - @type doc: u1db.Document - - @return: The new revision identifier for the document. - @rtype: str - """ - # prepare couch's Document - cdoc = CouchDocument() - cdoc[self.COUCH_ID_KEY] = doc.doc_id - # we have to guarantee that couch's _rev is consistent - old_cdoc = self._database.get(doc.doc_id) - if old_cdoc is not None: - cdoc[self.COUCH_REV_KEY] = old_cdoc[self.COUCH_REV_KEY] - # store u1db's rev - cdoc[self.COUCH_U1DB_REV_KEY] = doc.rev - # save doc in db - self._database.save(cdoc) - # store u1db's content as json string - if not doc.is_tombstone(): - self._database.put_attachment( - cdoc, doc.get_json(), - filename=self.COUCH_U1DB_ATTACHMENT_KEY) - else: - self._database.delete_attachment( - cdoc, - self.COUCH_U1DB_ATTACHMENT_KEY) + :param old_doc: The old document version. + :type old_doc: CouchDocument + :param doc: The document to be put. + :type doc: CouchDocument - def get_sync_target(self): + :raise RevisionConflict: Raised when trying to update a document but + couch revisions mismatch. """ - Return a SyncTarget object, for another u1db to synchronize with. - - @return: The sync target. - @rtype: CouchSyncTarget + trans_id = self._allocate_transaction_id() + # encode content + content = doc.get_json() + if content is not None: + content = binascii.b2a_base64(content)[:-1] # exclude trailing \n + # encode conflicts + conflicts = None + update_conflicts = doc.modified_conflicts() + if update_conflicts is True: + if doc.has_conflicts: + conflicts = binascii.b2a_base64( + json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + )[:-1] # exclude \n + # perform the request + resource = self._database.resource( + '_design', 'docs', '_update', 'put', doc.doc_id) + response = resource.put_json( + body={ + 'couch_rev': old_doc.couch_rev + if old_doc is not None else None, + 'u1db_rev': doc.rev, + 'content': content, + 'trans_id': trans_id, + 'conflicts': conflicts, + 'update_conflicts': update_conflicts, + }, + headers={'content-type': 'application/json'}) + # the document might have been updated in between, so we check for the + # return message + msg = response[2].read() + if msg == 'ok': + return + elif msg == 'revision conflict': + raise errors.RevisionConflict() + + def put_doc(self, doc): """ - return CouchSyncTarget(self) + Update a document. - def create_index(self, index_name, *index_expressions): - """ - Create a named index, which can then be queried for future lookups. + If the document currently has conflicts, put will fail. + If the database specifies a maximum document size and the document + exceeds it, put will fail and raise a DocumentTooBig exception. - @param index_name: A unique name which can be used as a key prefix. - @param index_expressions: Index expressions defining the index - information. - """ - if index_name in self._indexes: - if self._indexes[index_name]._definition == list( - index_expressions): - return - raise errors.IndexNameTakenError - index = InMemoryIndex(index_name, list(index_expressions)) - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue # skip special files - doc = self._get_doc(doc_id) - if doc.content is not None: - index.add_json(doc_id, doc.get_json()) - self._indexes[index_name] = index + :param doc: A Document with new content. + :return: new_doc_rev - The new revision identifier for the document. + The Document object will also be updated. - def close(self): + :raise errors.InvalidDocId: Raised if the document's id is invalid. + :raise errors.DocumentTooBig: Raised if the document size is too big. + :raise errors.ConflictedDoc: Raised if the document has conflicts. """ - Release any resources associated with this database. - - @return: True if db was succesfully closed. - @rtype: bool + if doc.doc_id is None: + raise errors.InvalidDocId() + self._check_doc_id(doc.doc_id) + self._check_doc_size(doc) + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc and old_doc.has_conflicts: + raise errors.ConflictedDoc() + if old_doc and doc.rev is None and old_doc.is_tombstone(): + new_rev = self._allocate_doc_rev(old_doc.rev) + else: + if old_doc is not None: + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + else: + if doc.rev is not None: + raise errors.RevisionConflict() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + self._put_doc(old_doc, doc) + return new_rev + + def whats_changed(self, old_generation=0): """ - # TODO: fix this method so the connection is properly closed and - # test_close (+tearDown, which deletes the db) works without problems. - self._url = None - self._full_commit = None - self._session = None - #self._server = None - self._database = None - return True - - def sync(self, url, creds=None, autocreate=True): + Return a list of documents that have changed since old_generation. + + :param old_generation: The generation of the database in the old + state. + :type old_generation: int + + :return: (generation, trans_id, [(doc_id, generation, trans_id),...]) + The current generation of the database, its associated + transaction id, and a list of of changed documents since + old_generation, represented by tuples with for each document + its doc_id and the generation and transaction id corresponding + to the last intervening change and sorted by generation (old + changes first) + :rtype: (int, str, [(str, int, str)]) """ - Synchronize documents with remote replica exposed at url. + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'whats_changed', 'log') + response = res.get_json(old_gen=old_generation) + results = map( + lambda row: + (row['generation'], row['doc_id'], row['transaction_id']), + response[2]['transactions']) + results.reverse() + cur_gen = old_generation + seen = set() + changes = [] + newest_trans_id = '' + for generation, doc_id, trans_id in results: + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + if changes: + cur_gen = changes[0][1] # max generation + newest_trans_id = changes[0][2] + changes.reverse() + else: + cur_gen, newest_trans_id = self._get_generation_info() - @param url: The url of the target replica to sync with. - @type url: str - @param creds: optional dictionary giving credentials. - to authorize the operation with the server. - @type creds: dict - @param autocreate: Ask the target to create the db if non-existent. - @type autocreate: bool + return cur_gen, newest_trans_id, changes - @return: The local generation before the synchronisation was performed. - @rtype: int + def delete_doc(self, doc): """ - return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync( - autocreate=autocreate) + Mark a document as deleted. + + Will abort if the current revision doesn't match doc.rev. + This will also set doc.content to None. - #------------------------------------------------------------------------- - # methods from ObjectStoreDatabase - #------------------------------------------------------------------------- + :param doc: The document to mark as deleted. + :type doc: CouchDocument. - def _init_u1db_data(self): + :raise errors.DocumentDoesNotExist: Raised if the document does not + exist. + :raise errors.RevisionConflict: Raised if the revisions do not match. + :raise errors.DocumentAlreadyDeleted: Raised if the document is + already deleted. + :raise errors.ConflictedDoc: Raised if the doc has conflicts. """ - Initialize u1db configuration data on backend storage. + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc is None: + raise errors.DocumentDoesNotExist + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + if old_doc.is_tombstone(): + raise errors.DocumentAlreadyDeleted + if old_doc.has_conflicts: + raise errors.ConflictedDoc() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + doc.make_tombstone() + self._put_doc(old_doc, doc) + return new_rev + + def _get_conflicts(self, doc_id, couch_rev=None): + """ + Get the conflicted versions of a document. + + If the C{couch_rev} parameter is not None, conflicts for a specific + document's couch revision are returned. - A U1DB database needs to keep track of all database transactions, - document conflicts, the generation of other replicas it has seen, - indexes created by users and so on. + :param couch_rev: The couch document revision. + :type couch_rev: str - In this implementation, all this information is stored in special - documents stored in the underlying with doc_id prefix equal to - U1DB_DATA_DOC_ID_PREFIX. Those documents ids are reserved: put_doc(), - get_doc() and delete_doc() will not allow documents with a doc_id with - that prefix to be accessed or modified. + :return: A list of conflicted versions of the document. + :rtype: list """ - for key in self.U1DB_DATA_KEYS: - doc_id = '%s%s' % (self.U1DB_DATA_DOC_ID_PREFIX, key) - doc = self._get_doc(doc_id) - if doc is None: - doc = self._factory(doc_id) - doc.content = {'content': getattr(self, key)} - self._put_doc(doc) + # request conflicts attachment from server + params = {} + if couch_rev is not None: + params['rev'] = couch_rev # restric document's couch revision + resource = self._database.resource(doc_id, 'u1db_conflicts') + try: + response = resource.get_json(**params) + conflicts = [] + # build the conflicted versions + for doc_rev, content in json.loads(response[2].read()): + doc = self._factory(doc_id, doc_rev) + if content is None: + doc.make_tombstone() + else: + doc.content = content + conflicts.append(doc) + return conflicts + except ResourceNotFound: + return [] - #------------------------------------------------------------------------- - # Couch specific methods - #------------------------------------------------------------------------- + def get_doc_conflicts(self, doc_id): + """ + Get the list of conflicts for the given document. - INDEX_NAME_KEY = 'name' - INDEX_DEFINITION_KEY = 'definition' - INDEX_VALUES_KEY = 'values' + The order of the conflicts is such that the first entry is the value + that would be returned by "get_doc". - def delete_database(self): - """ - Delete a U1DB CouchDB database. + :return: A list of the document entries that are conflicted. + :rtype: [CouchDocument] """ - del(self._server[self._dbname]) + conflict_docs = self._get_conflicts(doc_id) + if len(conflict_docs) == 0: + return [] + this_doc = self._get_doc(doc_id, check_for_conflicts=True) + return [this_doc] + conflict_docs - def _dump_indexes_as_json(self): + def _get_replica_gen_and_trans_id(self, other_replica_uid): """ - Dump index definitions as JSON. + Return the last known generation and transaction id for the other db + replica. + + When you do a synchronization with another replica, the Database keeps + track of what generation the other database replica was at, and what + the associated transaction id was. This is used to determine what data + needs to be sent, and if two databases are claiming to be the same + replica. + + :param other_replica_uid: The identifier for the other replica. + :type other_replica_uid: str + + :return: A tuple containing the generation and transaction id we + encountered during synchronization. If we've never + synchronized with the replica, this is (0, ''). + :rtype: (int, str) """ - indexes = {} - for name, idx in self._indexes.iteritems(): - indexes[name] = {} - for attr in [self.INDEX_NAME_KEY, self.INDEX_DEFINITION_KEY, - self.INDEX_VALUES_KEY]: - indexes[name][attr] = getattr(idx, '_' + attr) - return indexes + # query a couch view + result = self._database.view('syncs/log') + if len(result[other_replica_uid].rows) == 0: + return (0, '') + return ( + result[other_replica_uid].rows[0]['value']['known_generation'], + result[other_replica_uid].rows[0]['value']['known_transaction_id'] + ) + + def _set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + self._do_set_replica_gen_and_trans_id( + other_replica_uid, other_generation, other_transaction_id) - def _load_indexes_from_json(self, indexes): + def _do_set_replica_gen_and_trans_id( + self, other_replica_uid, other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + # query a couch update function + res = self._database.resource( + '_design', 'syncs', '_update', 'put', 'u1db_sync_log') + res.put_json( + body={ + 'other_replica_uid': other_replica_uid, + 'other_generation': other_generation, + 'other_transaction_id': other_transaction_id, + }, + headers={'content-type': 'application/json'}) + + def _add_conflict(self, doc, my_doc_rev, my_content): + """ + Add a conflict to the document. + + Note that this method does not actually update the backend; rather, it + updates the CouchDocument object which will provide the conflict data + when the atomic document update is made. + + :param doc: The document to have conflicts added to. + :type doc: CouchDocument + :param my_doc_rev: The revision of the conflicted document. + :type my_doc_rev: str + :param my_content: The content of the conflicted document as a JSON + serialized string. + :type my_content: str """ - Load index definitions from stored JSON. + doc.ensure_fetch_conflicts(self._get_conflicts) + doc.add_conflict( + self._factory(doc_id=doc.doc_id, rev=my_doc_rev, + json=my_content)) - @param indexes: A JSON representation of indexes as - [('index-name', ['field', 'field2', ...]), ...]. - @type indexes: str + def _delete_conflicts(self, doc, conflict_revs): """ - self._indexes = {} - for name, idx_dict in indexes.iteritems(): - idx = InMemoryIndex(name, idx_dict[self.INDEX_DEFINITION_KEY]) - idx._values = idx_dict[self.INDEX_VALUES_KEY] - self._indexes[name] = idx + Delete the conflicted revisions from the list of conflicts of C{doc}. - def _load_transaction_log_from_json(self, transaction_log): + Note that thie method does not actually update the backed; rather, it + updates the CouchDocument object which will provide the conflict data + when the atomic document update is made. + + :param doc: The document to have conflicts deleted. + :type doc: CouchDocument + :param conflict_revs: A list of the revisions to be deleted. + :param conflict_revs: [str] """ - Load transaction log from stored JSON. + doc.ensure_fetch_conflicts(self._get_conflicts) + doc.delete_conflicts(conflict_revs) - @param transaction_log: A JSON representation of transaction_log as - [('generation', 'transaction_id'), ...]. - @type transaction_log: list + def _prune_conflicts(self, doc, doc_vcr): + """ + Prune conflicts that are older then the current document's revision, or + whose content match to the current document's content. + + :param doc: The document to have conflicts pruned. + :type doc: CouchDocument + :param doc_vcr: A vector clock representing the current document's + revision. + :type doc_vcr: u1db.vectorclock.VectorClock """ - self._transaction_log = [] - for gen, trans_id in transaction_log: - self._transaction_log.append((gen, trans_id)) + if doc.has_conflicts is True: + autoresolved = False + c_revs_to_prune = [] + for c_doc in doc.get_conflicts(): + c_vcr = vectorclock.VectorClockRev(c_doc.rev) + if doc_vcr.is_newer(c_vcr): + c_revs_to_prune.append(c_doc.rev) + elif doc.same_content_as(c_doc): + c_revs_to_prune.append(c_doc.rev) + doc_vcr.maximize(c_vcr) + autoresolved = True + if autoresolved: + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._delete_conflicts(doc, c_revs_to_prune) + + def _force_doc_sync_conflict(self, doc): + """ + Add a conflict and force a document put. - def _load_other_generations_from_json(self, other_generations): + :param doc: The document to be put. + :type doc: CouchDocument """ - Load other generations from stored JSON. + my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + self._add_conflict(doc, my_doc.rev, my_doc.get_json()) + doc.has_conflicts = True + self._put_doc(my_doc, doc) - @param other_generations: A JSON representation of other_generations - as {'replica_uid': ('generation', 'transaction_id'), ...}. - @type other_generations: dict + def resolve_doc(self, doc, conflicted_doc_revs): + """ + Mark a document as no longer conflicted. + + We take the list of revisions that the client knows about that it is + superseding. This may be a different list from the actual current + conflicts, in which case only those are removed as conflicted. This + may fail if the conflict list is significantly different from the + supplied information. (sync could have happened in the background from + the time you GET_DOC_CONFLICTS until the point where you RESOLVE) + + :param doc: A Document with the new content to be inserted. + :type doc: CouchDocument + :param conflicted_doc_revs: A list of revisions that the new content + supersedes. + :type conflicted_doc_revs: [str] """ - self._other_generations = {} - for replica_uid, [gen, trans_id] in other_generations.iteritems(): - self._other_generations[replica_uid] = (gen, trans_id) + cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + new_rev = self._ensure_maximal_rev(cur_doc.rev, + conflicted_doc_revs) + superseded_revs = set(conflicted_doc_revs) + doc.rev = new_rev + if cur_doc.rev in superseded_revs: + self._delete_conflicts(doc, superseded_revs) + self._put_doc(cur_doc, doc) + else: + self._add_conflict(doc, new_rev, doc.get_json()) + self._delete_conflicts(doc, superseded_revs) + # perform request to resolve document in server + resource = self._database.resource( + '_design', 'docs', '_update', 'resolve_doc', doc.doc_id) + conflicts = None + if doc.has_conflicts: + conflicts = binascii.b2a_base64( + json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + )[:-1] # exclude \n + response = resource.put_json( + body={ + 'couch_rev': cur_doc.couch_rev, + 'conflicts': conflicts, + }, + headers={'content-type': 'application/json'}) + + def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, + replica_trans_id=''): + """ + Insert/update document into the database with a given revision. + + This api is used during synchronization operations. + + If a document would conflict and save_conflict is set to True, the + content will be selected as the 'current' content for doc.doc_id, + even though doc.rev doesn't supersede the currently stored revision. + The currently stored document will be added to the list of conflict + alternatives for the given doc_id. + + This forces the new content to be 'current' so that we get convergence + after synchronizing, even if people don't resolve conflicts. Users can + then notice that their content is out of date, update it, and + synchronize again. (The alternative is that users could synchronize and + think the data has propagated, but their local copy looks fine, and the + remote copy is never updated again.) + + :param doc: A document object + :type doc: CouchDocument + :param save_conflict: If this document is a conflict, do you want to + save it as a conflict, or just ignore it. + :type save_conflict: bool + :param replica_uid: A unique replica identifier. + :type replica_uid: str + :param replica_gen: The generation of the replica corresponding to the + this document. The replica arguments are optional, + but are used during synchronization. + :type replica_gen: int + :param replica_trans_id: The transaction_id associated with the + generation. + :type replica_trans_id: str + + :return: (state, at_gen) - If we don't have doc_id already, or if + doc_rev supersedes the existing document revision, then the + content will be inserted, and state is 'inserted'. If + doc_rev is less than or equal to the existing revision, then + the put is ignored and state is respecitvely 'superseded' or + 'converged'. If doc_rev is not strictly superseded or + supersedes, then state is 'conflicted'. The document will not + be inserted if save_conflict is False. For 'inserted' or + 'converged', at_gen is the insertion/current generation. + :rtype: (str, int) + """ + cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + # at this point, `doc` has arrived from the other syncing party, and + # we will decide what to do with it. + # First, we prepare the arriving doc to update couch database. + old_doc = doc + doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) + if cur_doc is not None: + doc.couch_rev = cur_doc.couch_rev + # fetch conflicts because we will eventually manipulate them + doc.ensure_fetch_conflicts(self._get_conflicts) + # from now on, it works just like u1db sqlite backend + doc_vcr = vectorclock.VectorClockRev(doc.rev) + if cur_doc is None: + cur_vcr = vectorclock.VectorClockRev(None) + else: + cur_vcr = vectorclock.VectorClockRev(cur_doc.rev) + self._validate_source(replica_uid, replica_gen, replica_trans_id) + if doc_vcr.is_newer(cur_vcr): + rev = doc.rev + self._prune_conflicts(doc, doc_vcr) + if doc.rev != rev: + # conflicts have been autoresolved + state = 'superseded' + else: + state = 'inserted' + self._put_doc(cur_doc, doc) + elif doc.rev == cur_doc.rev: + # magical convergence + state = 'converged' + elif cur_vcr.is_newer(doc_vcr): + # Don't add this to seen_ids, because we have something newer, + # so we should send it back, and we should not generate a + # conflict + state = 'superseded' + elif cur_doc.same_content_as(doc): + # the documents have been edited to the same thing at both ends + doc_vcr.maximize(cur_vcr) + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._put_doc(cur_doc, doc) + state = 'superseded' + else: + state = 'conflicted' + if save_conflict: + self._force_doc_sync_conflict(doc) + if replica_uid is not None and replica_gen is not None: + self._do_set_replica_gen_and_trans_id( + replica_uid, replica_gen, replica_trans_id) + # update info + old_doc.rev = doc.rev + if doc.is_tombstone(): + old_doc.is_tombstone() + else: + old_doc.content = doc.content + old_doc.has_conflicts = doc.has_conflicts + return state, self._get_generation() -class CouchSyncTarget(ObjectStoreSyncTarget): +class CouchSyncTarget(CommonSyncTarget): """ Functionality for using a CouchDatabase as a synchronization target. """ - pass + + def get_sync_info(self, source_replica_uid): + source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( + source_replica_uid) + my_gen, my_trans_id = self._db._get_generation_info() + return ( + self._db._replica_uid, my_gen, my_trans_id, source_gen, + source_trans_id) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + if self._trace_hook: + self._trace_hook('record_sync_info') + self._db._set_replica_gen_and_trans_id( + source_replica_uid, source_replica_generation, + source_replica_transaction_id) class NotEnoughCouchPermissions(Exception): @@ -527,12 +1004,12 @@ class CouchServerState(ServerState): """ Initialize the couch server state. - @param couch_url: The URL for the couch database. - @type couch_url: str - @param shared_db_name: The name of the shared database. - @type shared_db_name: str - @param tokens_db_name: The name of the tokens database. - @type tokens_db_name: str + :param couch_url: The URL for the couch database. + :type couch_url: str + :param shared_db_name: The name of the shared database. + :type shared_db_name: str + :param tokens_db_name: The name of the tokens database. + :type tokens_db_name: str """ self._couch_url = couch_url self._shared_db_name = shared_db_name @@ -563,12 +1040,12 @@ class CouchServerState(ServerState): couch library to ensure that Soledad Server can do everything it needs on the underlying couch database. - @param couch_url: The URL of the couch database. - @type couch_url: str + :param couch_url: The URL of the couch database. + :type couch_url: str @raise NotEnoughCouchPermissions: Raised in case there are not enough permissions to read/write/create the needed couch databases. - @rtype: bool + :rtype: bool """ def _open_couch_db(dbname): @@ -601,11 +1078,11 @@ class CouchServerState(ServerState): """ Open a couch database. - @param dbname: The name of the database to open. - @type dbname: str + :param dbname: The name of the database to open. + :type dbname: str - @return: The CouchDatabase object. - @rtype: CouchDatabase + :return: The CouchDatabase object. + :rtype: CouchDatabase """ # TODO: open couch return CouchDatabase.open_database( @@ -616,11 +1093,11 @@ class CouchServerState(ServerState): """ Ensure couch database exists. - @param dbname: The name of the database to ensure. - @type dbname: str + :param dbname: The name of the database to ensure. + :type dbname: str - @return: The CouchDatabase object and the replica uid. - @rtype: (CouchDatabase, str) + :return: The CouchDatabase object and the replica uid. + :rtype: (CouchDatabase, str) """ db = CouchDatabase.open_database( self._couch_url + '/' + dbname, @@ -631,8 +1108,8 @@ class CouchServerState(ServerState): """ Delete couch database. - @param dbname: The name of the database to delete. - @type dbname: str + :param dbname: The name of the database to delete. + :type dbname: str """ CouchDatabase.delete_database(self._couch_url + '/' + dbname) @@ -640,8 +1117,8 @@ class CouchServerState(ServerState): """ Set the couchdb URL - @param url: CouchDB URL - @type url: str + :param url: CouchDB URL + :type url: str """ self._couch_url = url @@ -649,7 +1126,7 @@ class CouchServerState(ServerState): """ Return CouchDB URL - @rtype: str + :rtype: str """ return self._couch_url diff --git a/common/src/leap/soledad/common/ddocs/README.txt b/common/src/leap/soledad/common/ddocs/README.txt new file mode 100644 index 00000000..37d89fbf --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/README.txt @@ -0,0 +1,29 @@ +The following table depicts the U1DB CouchDB backend method and the URI that +is queried to obtain/update data from/to the server. + + +----------------------------------+------------------------------------------------------------------+ + | u1db backend method | URI | + |----------------------------------+------------------------------------------------------------------| + | _get_generation | _design/transactions/_list/generation/log | + | _get_generation_info | _design/transactions/_list/generation/log | + | _get_trans_id_for_gen | _design/transactions/_list/trans_id_for_gen/log | + | _get_transaction_log | _design/transactions/_view/log | + | _get_doc (*) | _design/docs/_view/get?key= | + | _has_conflicts | _design/docs/_view/get?key= | + | get_all_docs | _design/docs/_view/get | + | _put_doc | _design/docs/_update/put/ | + | _whats_changed | _design/transactions/_list/whats_changed/log?old_gen= | + | _get_conflicts (*) | _design/docs/_view/conflicts?key= | + | _get_replica_gen_and_trans_id | _design/syncs/_view/log?other_replica_uid= | + | _do_set_replica_gen_and_trans_id | _design/syncs/_update/put/u1db_sync_log | + | _add_conflict | _design/docs/_update/add_conflict/ | + | _delete_conflicts | _design/docs/_update/delete_conflicts/?doc_rev= | + | list_indexes | not implemented | + | _get_index_definition | not implemented | + | delete_index | not implemented | + | _get_indexed_fields | not implemented | + | _put_and_update_indexes | not implemented | + +----------------------------------+------------------------------------------------------------------+ + +(*) These methods also request CouchDB document attachments that store U1DB + document contents. diff --git a/common/src/leap/soledad/common/ddocs/__init__.py b/common/src/leap/soledad/common/ddocs/__init__.py new file mode 100644 index 00000000..c2f78e18 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/__init__.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +CouchDB U1DB backend design documents helper. +""" + + +from os import listdir +from os.path import realpath, dirname, isdir, join, isfile, basename +import json +import logging + + +from couchdb import Document as CouchDocument + + +logger = logging.getLogger(__name__) + + +# where to search for design docs definitions +prefix = dirname(realpath(__file__)) + + +def ensure_ddocs_on_remote_db(db, prefix=prefix): + """ + Ensure that the design documents in C{db} contain. + + :param db: The database in which to create/update the design docs. + :type db: couchdb.client.Server + :param prefix: Where to look for design documents definitions. + :type prefix: str + """ + ddocs = build_ddocs(prefix) + for ddoc_name, ddoc_content in ddocs.iteritems(): + ddoc_id = "_design/%s" % ddoc_name + ddoc = CouchDocument({'_id': ddoc_id}) + ddoc.update(ddoc_content) + # ensure revision if ddoc is already in db + doc = db.get(ddoc_id) + if doc is not None: + ddoc['_rev'] = doc.rev + db.save(ddoc) + + +def create_local_ddocs(prefix=prefix): + """ + Create local design docs based on content from subdirectories in + C{prefix}. + + :param create_local: Whether to create local .json files. + :type create_local: bool + """ + ddocs = build_ddocs(prefix) + for ddoc_name, ddoc_content in ddocs.iteritems(): + with open(join(prefix, '%s.json' % ddoc_name), 'w') as f: + f.write(json.dumps(ddoc_content, indent=4)) + + +def build_ddocs(prefix=prefix): + """ + Build design documents based on content from subdirectories in + C{prefix}. + + :param prefix: Where to look for design documents definitions. + :type prefix: str + + :return: A dictionary containing the design docs definitions. + :rtype: dict + """ + ddocs = {} + # design docs are represented by subdirectories in current directory + for ddoc in [f for f in listdir(prefix) if isdir(join(prefix, f))]: + logger.debug("Building %s.json ..." % ddoc) + + ddocs[ddoc] = {} + + for t in ['views', 'lists', 'updates']: + tdir = join(prefix, ddoc, t) + if not isdir(tdir): + logger.debug(" - no %s" % t) + else: + + ddocs[ddoc][t] = {} + + if t == 'views': # handle views (with map/reduce functions) + for view in [f for f in listdir(tdir) \ + if isdir(join(tdir, f))]: + logger.debug(" - view: %s" % view) + # look for map.js and reduce.js + mapfile = join(tdir, view, 'map.js') + reducefile = join(tdir, view, 'reduce.js') + mapfun = None + reducefun = None + try: + with open(mapfile) as f: + mapfun = f.read() + except IOError: + pass + try: + with open(reducefile) as f: + reducefun = f.read() + except IOError: + pass + ddocs[ddoc]['views'][view] = {} + + if mapfun is not None: + ddocs[ddoc]['views'][view]['map'] = mapfun + if reducefun is not None: + ddocs[ddoc]['views'][view]['reduce'] = reducefun + + else: # handle lists, updates, etc + for fun in [f for f in listdir(tdir) \ + if isfile(join(tdir, f))]: + logger.debug(" - %s: %s" % (t, fun)) + funfile = join(tdir, fun) + funname = basename(funfile).replace('.js', '') + try: + with open(funfile) as f: + ddocs[ddoc][t][funname] = f.read() + except IOError: + pass + return ddocs diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js new file mode 100644 index 00000000..5a4647de --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/updates/put.js @@ -0,0 +1,64 @@ +function(doc, req){ + /* we expect to receive the following in `req.body`: + * { + * 'couch_rev': '', + * 'u1db_rev': '', + * 'content': '', + * 'trans_id': '' + * 'conflicts': '', + * 'update_conflicts': + * } + */ + var body = JSON.parse(req.body); + + // create a new document document + if (!doc) { + doc = {} + doc['_id'] = req['id']; + } + // or fail if couch revisions do not match + else if (doc['_rev'] != body['couch_rev']) { + // of fail if revisions do not match + return [null, 'revision conflict'] + } + + // store u1db rev + doc.u1db_rev = body['u1db_rev']; + + // save content as attachment + if (body['content'] != null) { + // save u1db content as attachment + if (!doc._attachments) + doc._attachments = {}; + doc._attachments.u1db_content = { + content_type: "application/octet-stream", + data: body['content'] // should be base64 encoded + }; + } + // or delete the attachment if document is tombstone + else if (doc._attachments && + doc._attachments.u1db_content) + delete doc._attachments.u1db_content; + + // store the transaction id + if (!doc.u1db_transactions) + doc.u1db_transactions = []; + var d = new Date(); + doc.u1db_transactions.push([d.getTime(), body['trans_id']]); + + // save conflicts as attachment if they were sent + if (body['update_conflicts']) + if (body['conflicts'] != null) { + if (!doc._attachments) + doc._attachments = {}; + doc._attachments.u1db_conflicts = { + content_type: "application/octet-stream", + data: body['conflicts'] // should be base64 encoded + } + } else { + if(doc._attachments && doc._attachments.u1db_conflicts) + delete doc._attachments.u1db_conflicts + } + + return [doc, 'ok']; +} diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js new file mode 100644 index 00000000..7ba66cf8 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js @@ -0,0 +1,39 @@ +function(doc, req){ + /* we expect to receive the following in `req.body`: + * { + * 'couch_rev': '', + * 'conflicts': '', + * } + */ + var body = JSON.parse(req.body); + + // fail if no document was given + if (!doc) { + return [null, 'document does not exist'] + } + + // fail if couch revisions do not match + if (body['couch_rev'] != null + && doc['_rev'] != body['couch_rev']) { + return [null, 'revision conflict'] + } + + // fail if conflicts were not sent + if (body['conflicts'] == null) + return [null, 'missing conflicts'] + + // save conflicts as attachment if they were sent + if (body['conflicts'] != null) { + if (!doc._attachments) + doc._attachments = {}; + doc._attachments.u1db_conflicts = { + content_type: "application/octet-stream", + data: body['conflicts'] // should be base64 encoded + } + } + // or delete attachment if there are no conflicts + else if (doc._attachments && doc._attachments.u1db_conflicts) + delete doc._attachments.u1db_conflicts; + + return [doc, 'ok']; +} diff --git a/common/src/leap/soledad/common/ddocs/docs/views/get/map.js b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js new file mode 100644 index 00000000..ae08d9e9 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js @@ -0,0 +1,20 @@ +function(doc) { + if (doc.u1db_rev) { + var is_tombstone = true; + var has_conflicts = false; + if (doc._attachments) { + if (doc._attachments.u1db_content) + is_tombstone = false; + if (doc._attachments.u1db_conflicts) + has_conflicts = true; + } + emit(doc._id, + { + "couch_rev": doc._rev, + "u1db_rev": doc.u1db_rev, + "is_tombstone": is_tombstone, + "has_conflicts": has_conflicts, + } + ); + } +} diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js new file mode 100644 index 00000000..722f695a --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -0,0 +1,22 @@ +function(doc, req){ + if (!doc) { + doc = {} + doc['_id'] = 'u1db_sync_log'; + doc['syncs'] = []; + } + body = JSON.parse(req.body); + // remove outdated info + doc['syncs'] = doc['syncs'].filter( + function (entry) { + return entry[0] != body['other_replica_uid']; + } + ); + // store u1db rev + doc['syncs'].push([ + body['other_replica_uid'], + body['other_generation'], + body['other_transaction_id'] + ]); + return [doc, 'ok']; +} + diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js new file mode 100644 index 00000000..a63c7cf4 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js @@ -0,0 +1,12 @@ +function(doc) { + if (doc._id == 'u1db_sync_log') { + if (doc.syncs) + doc.syncs.forEach(function (entry) { + emit(entry[0], + { + 'known_generation': entry[1], + 'known_transaction_id': entry[2] + }); + }); + } +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js new file mode 100644 index 00000000..dbdfff0d --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js @@ -0,0 +1,20 @@ +function(head, req) { + var row; + var rows=[]; + // fetch all rows + while(row = getRow()) { + rows.push(row); + } + if (rows.length > 0) + send(JSON.stringify({ + "generation": rows.length, + "doc_id": rows[rows.length-1]['id'], + "transaction_id": rows[rows.length-1]['value'] + })); + else + send(JSON.stringify({ + "generation": 0, + "doc_id": "", + "transaction_id": "", + })); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js new file mode 100644 index 00000000..2ec91794 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js @@ -0,0 +1,19 @@ +function(head, req) { + var row; + var rows=[]; + var i = 1; + var gen = 1; + if (req.query.gen) + gen = parseInt(req.query['gen']); + // fetch all rows + while(row = getRow()) + rows.push(row); + if (gen <= rows.length) + send(JSON.stringify({ + "generation": gen, + "doc_id": rows[gen-1]['id'], + "transaction_id": rows[gen-1]['value'], + })); + else + send('{}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js new file mode 100644 index 00000000..b35cdf51 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js @@ -0,0 +1,22 @@ +function(head, req) { + var row; + var gen = 1; + var old_gen = 0; + if (req.query.old_gen) + old_gen = parseInt(req.query['old_gen']); + send('{"transactions":[\n'); + // fetch all rows + while(row = getRow()) { + if (gen > old_gen) { + if (gen > old_gen+1) + send(',\n'); + send(JSON.stringify({ + "generation": gen, + "doc_id": row["id"], + "transaction_id": row["value"] + })); + } + gen++; + } + send('\n]}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js new file mode 100644 index 00000000..94ef63ca --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js @@ -0,0 +1,7 @@ +function(doc) { + if (doc.u1db_transactions) + doc.u1db_transactions.forEach(function(t) { + emit(t[0], // use timestamp as key so the results are ordered + t[1]); // value is the transaction_id + }); +} diff --git a/common/src/leap/soledad/common/objectstore.py b/common/src/leap/soledad/common/objectstore.py deleted file mode 100644 index 7aff3e32..00000000 --- a/common/src/leap/soledad/common/objectstore.py +++ /dev/null @@ -1,282 +0,0 @@ -# -*- coding: utf-8 -*- -# objectstore.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - - -""" -Abstract U1DB backend to handle storage using object stores (like CouchDB, for -example). - -This backend uses special documents to store all U1DB data (replica uid, -indexes, transaction logs and info about other dbs). The id of these documents -are reserved and have prefix equal to ObjectStore.U1DB_DATA_DOC_ID_PREFIX. - -Right now, this is only used by CouchDatabase backend, but can also be -extended to implement OpenStack or Amazon S3 storage, for example. - -See U1DB documentation for more information on how to use databases. -""" - - -from base64 import b64encode, b64decode - - -import uuid -import simplejson as json - - -from u1db import errors -from u1db.backends.inmemory import ( - InMemoryDatabase, - InMemorySyncTarget, -) - - -class ObjectStoreDatabase(InMemoryDatabase): - """ - A backend for storing u1db data in an object store. - """ - - U1DB_DATA_DOC_ID_PREFIX = 'u1db/' - - @classmethod - def open_database(cls, url, create, document_factory=None): - """ - Open a U1DB database using an object store as backend. - - @param url: the url of the database replica - @type url: str - @param create: should the replica be created if it does not exist? - @type create: bool - @param document_factory: A function that will be called with the same - parameters as Document.__init__. - @type document_factory: callable - - @return: the database instance - @rtype: CouchDatabase - """ - raise NotImplementedError(cls.open_database) - - def __init__(self, replica_uid=None, document_factory=None): - """ - Initialize the object store database. - - @param replica_uid: an optional unique replica identifier - @type replica_uid: str - @param document_factory: A function that will be called with the same - parameters as Document.__init__. - @type document_factory: callable - """ - InMemoryDatabase.__init__( - self, - replica_uid, - document_factory=document_factory) - if self._replica_uid is None: - self._replica_uid = uuid.uuid4().hex - self._init_u1db_data() - - def _init_u1db_data(self): - """ - Initialize u1db configuration data on backend storage. - - A U1DB database needs to keep track of all database transactions, - document conflicts, the generation of other replicas it has seen, - indexes created by users and so on. - - In this implementation, all this information is stored in special - documents stored in the couch db with id prefix equal to - U1DB_DATA_DOC_ID_PREFIX. Those documents ids are reserved: - put_doc(), get_doc() and delete_doc() will not allow documents with - a doc_id with that prefix to be accessed or modified. - """ - raise NotImplementedError(self._init_u1db_data) - - #------------------------------------------------------------------------- - # methods from Database - #------------------------------------------------------------------------- - - def put_doc(self, doc): - """ - Update a document. - - If the document currently has conflicts, put will fail. - If the database specifies a maximum document size and the document - exceeds it, put will fail and raise a DocumentTooBig exception. - - This method prevents from updating the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc: A Document with new content. - @type doc: Document - - @return: new_doc_rev - The new revision identifier for the document. - The Document object will also be updated. - @rtype: str - """ - if doc.doc_id is not None and \ - doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - return InMemoryDatabase.put_doc(self, doc) - - def _put_doc(self, doc): - """ - Update a document. - - This is called everytime we just want to do a raw put on the db (i.e. - without index updates, document constraint checks, and conflict - checks). - - @param doc: The document to update. - @type doc: u1db.Document - - @return: The new revision identifier for the document. - @rtype: str - """ - raise NotImplementedError(self._put_doc) - - def get_doc(self, doc_id, include_deleted=False): - """ - Get the JSON string for the given document. - - This method prevents from getting the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise asking for a deleted - document will return None. - @type include_deleted: bool - - @return: a Document object. - @rtype: Document - """ - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - return InMemoryDatabase.get_doc(self, doc_id, include_deleted) - - def _get_doc(self, doc_id): - """ - Get just the document content, without fancy handling. - - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise asking for a deleted - document will return None. - @type include_deleted: bool - - @return: a Document object. - @type: u1db.Document - """ - raise NotImplementedError(self._get_doc) - - def get_all_docs(self, include_deleted=False): - """ - Get the JSON content for all documents in the database. - - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise deleted documents will not - be included in the results. - @type include_deleted: bool - - @return: (generation, [Document]) - The current generation of the database, followed by a list of all - the documents in the database. - @rtype: tuple - """ - generation = self._get_generation() - results = [] - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue - doc = self._get_doc(doc_id, check_for_conflicts=True) - if doc.content is None and not include_deleted: - continue - results.append(doc) - return (generation, results) - - def delete_doc(self, doc): - """ - Mark a document as deleted. - - This method prevents from deleting the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc: The document to mark as deleted. - @type doc: u1db.Document - - @return: The new revision id of the document. - @type: str - """ - if doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) - if old_doc is None: - raise errors.DocumentDoesNotExist - if old_doc.rev != doc.rev: - raise errors.RevisionConflict() - if old_doc.is_tombstone(): - raise errors.DocumentAlreadyDeleted - if old_doc.has_conflicts: - raise errors.ConflictedDoc() - new_rev = self._allocate_doc_rev(doc.rev) - doc.rev = new_rev - doc.make_tombstone() - self._put_and_update_indexes(old_doc, doc) - return new_rev - - # index-related methods - - def create_index(self, index_name, *index_expressions): - """ - Create a named index, which can then be queried for future lookups. - - See U1DB documentation for more information. - - @param index_name: A unique name which can be used as a key prefix. - @param index_expressions: Index expressions defining the index - information. - """ - raise NotImplementedError(self.create_index) - - #------------------------------------------------------------------------- - # implemented methods from CommonBackend - #------------------------------------------------------------------------- - - def _put_and_update_indexes(self, old_doc, doc): - """ - Update a document and all indexes related to it. - - @param old_doc: The old version of the document. - @type old_doc: u1db.Document - @param doc: The new version of the document. - @type doc: u1db.Document - """ - for index in self._indexes.itervalues(): - if old_doc is not None and not old_doc.is_tombstone(): - index.remove_json(old_doc.doc_id, old_doc.get_json()) - if not doc.is_tombstone(): - index.add_json(doc.doc_id, doc.get_json()) - trans_id = self._allocate_transaction_id() - self._put_doc(doc) - self._transaction_log.append((doc.doc_id, trans_id)) - - -class ObjectStoreSyncTarget(InMemorySyncTarget): - """ - Functionality for using an ObjectStore as a synchronization target. - """ diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 42edf9fe..bdef4e0d 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -27,7 +27,6 @@ from base64 import b64decode from leap.common.files import mkdir_p -from leap.soledad.common.document import SoledadDocument from leap.soledad.common.tests import u1db_tests as tests from leap.soledad.common.tests.u1db_tests import test_backends from leap.soledad.common.tests.u1db_tests import test_sync @@ -188,7 +187,7 @@ def copy_couch_database_for_test(test, db): def make_document_for_test(test, doc_id, rev, content, has_conflicts=False): - return SoledadDocument(doc_id, rev, content, has_conflicts=has_conflicts) + return couch.CouchDocument(doc_id, rev, content, has_conflicts=has_conflicts) COUCH_SCENARIOS = [ -- cgit v1.2.3 From 69762784c41f9e231260d1e790a4a5c05bf6de96 Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 16 Dec 2013 10:56:57 -0200 Subject: Monkey-patch u1db to use CouchDocument. --- common/src/leap/soledad/common/couch.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 3b0e042a..4caaf48f 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -29,6 +29,7 @@ import socket from u1db.backends import CommonBackend, CommonSyncTarget +from u1db.remote import http_app from u1db.remote.server_state import ServerState from u1db import ( errors, @@ -159,6 +160,10 @@ class CouchDocument(SoledadDocument): couch_rev = property(_get_couch_rev, _set_couch_rev) +# monkey-patch the u1db http app to use CouchDocument +http_app.Document = CouchDocument + + class CouchDatabase(CommonBackend): """ A U1DB implementation that uses CouchDB as its persistence layer. -- cgit v1.2.3 From ab93164a3a9532340a899e0b23f4b541d61c22c6 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 10 Dec 2013 18:53:54 -0200 Subject: Fix couch tests to reflect remodelling. --- common/src/leap/soledad/common/couch.py | 43 +--- common/src/leap/soledad/common/ddocs/__init__.py | 4 +- .../leap/soledad/common/tests/couchdb.ini.template | 4 +- common/src/leap/soledad/common/tests/test_couch.py | 240 +++++++++++---------- 4 files changed, 141 insertions(+), 150 deletions(-) diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 4caaf48f..b9e699f3 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -18,35 +18,25 @@ """A U1DB backend that uses CouchDB as its persistence layer.""" -import os import simplejson as json -import sys -import time +import re import uuid import logging import binascii import socket +from couchdb.client import Server +from couchdb.http import ResourceNotFound, Unauthorized +from u1db import errors, query_parser, vectorclock from u1db.backends import CommonBackend, CommonSyncTarget from u1db.remote import http_app from u1db.remote.server_state import ServerState -from u1db import ( - errors, - query_parser, - vectorclock, -) -from couchdb.client import ( - Server, - Document as CouchDocument, - _doc_resource, -) -from couchdb.http import ResourceNotFound, Unauthorized from leap.soledad.common import USER_DB_PREFIX from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.ddocs import ensure_ddocs_on_remote_db +from leap.soledad.common.ddocs import ensure_ddocs_on_db logger = logging.getLogger(__name__) @@ -193,11 +183,11 @@ class CouchDatabase(CommonBackend): server[dbname] except ResourceNotFound: if not create: - raise DatabaseDoesNotExist() + raise errors.DatabaseDoesNotExist() return cls(url, dbname) def __init__(self, url, dbname, replica_uid=None, full_commit=True, - session=None): + session=None, ensure_ddocs=False): """ Create a new Couch data container. @@ -230,7 +220,9 @@ class CouchDatabase(CommonBackend): except ResourceNotFound: self._server.create(self._dbname) self._database = self._server[self._dbname] - self._initialize(replica_uid or uuid.uuid4().hex) + self._set_replica_uid(replica_uid or uuid.uuid4().hex) + if ensure_ddocs: + ensure_ddocs_on_db(self._database) def get_sync_target(self): """ @@ -270,25 +262,12 @@ class CouchDatabase(CommonBackend): """ try: doc = self._database['u1db_config'] + doc['replica_uid'] = replica_uid except ResourceNotFound: doc = { '_id': 'u1db_config', 'replica_uid': replica_uid, } - self._database.save(doc) - - def _ensure_design_docs(self): - """ - Ensure that the design docs have been created. - """ - if self._is_initialized(): - return - self._initialize() - - def _set_replica_uid(self, replica_uid): - """Force the replica_uid to be set.""" - doc = self._database['u1db_config'] - doc['replica_uid'] = replica_uid self._database.save(doc) self._real_replica_uid = replica_uid diff --git a/common/src/leap/soledad/common/ddocs/__init__.py b/common/src/leap/soledad/common/ddocs/__init__.py index c2f78e18..389bdff9 100644 --- a/common/src/leap/soledad/common/ddocs/__init__.py +++ b/common/src/leap/soledad/common/ddocs/__init__.py @@ -37,7 +37,7 @@ logger = logging.getLogger(__name__) prefix = dirname(realpath(__file__)) -def ensure_ddocs_on_remote_db(db, prefix=prefix): +def ensure_ddocs_on_db(db, prefix=prefix): """ Ensure that the design documents in C{db} contain. @@ -118,7 +118,7 @@ def build_ddocs(prefix=prefix): except IOError: pass ddocs[ddoc]['views'][view] = {} - + if mapfun is not None: ddocs[ddoc]['views'][view]['map'] = mapfun if reducefun is not None: diff --git a/common/src/leap/soledad/common/tests/couchdb.ini.template b/common/src/leap/soledad/common/tests/couchdb.ini.template index 7d0316f0..217ae201 100644 --- a/common/src/leap/soledad/common/tests/couchdb.ini.template +++ b/common/src/leap/soledad/common/tests/couchdb.ini.template @@ -74,6 +74,8 @@ use_users_db = false [query_servers] ; javascript = %(tempdir)s/server/main.js +javascript = /usr/bin/couchjs /usr/share/couchdb/server/main.js +coffeescript = /usr/bin/couchjs /usr/share/couchdb/server/main-coffee.js ; Changing reduce_limit to false will disable reduce_limit. @@ -219,4 +221,4 @@ min_file_size = 131072 ;[admins] ;testuser = -hashed-f50a252c12615697c5ed24ec5cd56b05d66fe91e,b05471ba260132953930cf9f97f327f5 -; pass for above user is 'testpass' \ No newline at end of file +; pass for above user is 'testpass' diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index bdef4e0d..a181c6cb 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -25,6 +25,9 @@ import copy import shutil from base64 import b64decode +from couchdb.client import Server +from u1db import errors + from leap.common.files import mkdir_p from leap.soledad.common.tests import u1db_tests as tests @@ -147,7 +150,7 @@ class TestCouchBackendImpl(CouchDBTestCase): def test__allocate_doc_id(self): db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') + 'u1db_tests', ensure_ddocs=True) doc_id1 = db._allocate_doc_id() self.assertTrue(doc_id1.startswith('D-')) self.assertEqual(34, len(doc_id1)) @@ -162,32 +165,51 @@ class TestCouchBackendImpl(CouchDBTestCase): def make_couch_database_for_test(test, replica_uid): port = str(test.wrapper.port) return couch.CouchDatabase('http://localhost:' + port, replica_uid, - replica_uid=replica_uid or 'test') + replica_uid=replica_uid or 'test', + ensure_ddocs=True) def copy_couch_database_for_test(test, db): port = str(test.wrapper.port) - new_db = couch.CouchDatabase('http://localhost:' + port, - db._replica_uid + '_copy', + couch_url = 'http://localhost:' + port + new_dbname = db._replica_uid + '_copy' + new_db = couch.CouchDatabase(couch_url, + new_dbname, replica_uid=db._replica_uid or 'test') - gen, docs = db.get_all_docs(include_deleted=True) - for doc in docs: - new_db._put_doc(doc) - new_db._transaction_log = copy.deepcopy(db._transaction_log) - new_db._conflicts = copy.deepcopy(db._conflicts) - new_db._other_generations = copy.deepcopy(db._other_generations) - new_db._indexes = copy.deepcopy(db._indexes) - # save u1db data on couch - for key in new_db.U1DB_DATA_KEYS: - doc_id = '%s%s' % (new_db.U1DB_DATA_DOC_ID_PREFIX, key) - doc = new_db._get_doc(doc_id) - doc.content = {'content': getattr(new_db, key)} - new_db._put_doc(doc) + # copy all docs + old_couch_db = Server(couch_url)[db._replica_uid] + new_couch_db = Server(couch_url)[new_dbname] + for doc_id in old_couch_db: + doc = old_couch_db.get(doc_id) + # copy design docs + if ('u1db_rev' not in doc): + new_couch_db.save(doc) + # copy u1db docs + else: + new_doc = { + '_id': doc['_id'], + 'u1db_transactions': doc['u1db_transactions'], + 'u1db_rev': doc['u1db_rev'] + } + attachments = [] + if ('u1db_conflicts' in doc): + new_doc['u1db_conflicts'] = doc['u1db_conflicts'] + for c_rev in doc['u1db_conflicts']: + attachments.append('u1db_conflict_%s' % c_rev) + new_couch_db.save(new_doc) + # save conflict data + attachments.append('u1db_content') + for att_name in attachments: + att = old_couch_db.get_attachment(doc_id, att_name) + if (att is not None): + new_couch_db.put_attachment(new_doc, att, + filename=att_name) return new_db def make_document_for_test(test, doc_id, rev, content, has_conflicts=False): - return couch.CouchDocument(doc_id, rev, content, has_conflicts=has_conflicts) + return couch.CouchDocument( + doc_id, rev, content, has_conflicts=has_conflicts) COUCH_SCENARIOS = [ @@ -245,9 +267,8 @@ class CouchWithConflictsTests( test_backends.LocalDatabaseWithConflictsTests.tearDown(self) -# Notice: the CouchDB backend is currently used for storing encrypted data in -# the server, so indexing makes no sense. Thus, we ignore index testing for -# now. +# Notice: the CouchDB backend does not have indexing capabilities, but we +# added in memory indexing for tests only. class CouchIndexTests(test_backends.DatabaseIndexTests, CouchDBTestCase): @@ -310,6 +331,89 @@ class CouchDatabaseSyncTargetTests(test_sync.DatabaseSyncTargetTests, [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]}) +# The following tests need that the database have an index, so we fake one. +old_class = couch.CouchDatabase + +from u1db.backends.inmemory import InMemoryIndex + + +class IndexedCouchDatabase(couch.CouchDatabase): + + def __init__(self, url, dbname, replica_uid=None, full_commit=True, + session=None, ensure_ddocs=True): + old_class.__init__(self, url, dbname, replica_uid, full_commit, + session, ensure_ddocs=True) + self._indexes = {} + + def _put_doc(self, old_doc, doc): + for index in self._indexes.itervalues(): + if old_doc is not None and not old_doc.is_tombstone(): + index.remove_json(old_doc.doc_id, old_doc.get_json()) + if not doc.is_tombstone(): + index.add_json(doc.doc_id, doc.get_json()) + old_class._put_doc(self, old_doc, doc) + + def create_index(self, index_name, *index_expressions): + if index_name in self._indexes: + if self._indexes[index_name]._definition == list( + index_expressions): + return + raise errors.IndexNameTakenError + index = InMemoryIndex(index_name, list(index_expressions)) + _, all_docs = self.get_all_docs() + for doc in all_docs: + index.add_json(doc.doc_id, doc.get_json()) + self._indexes[index_name] = index + + def delete_index(self, index_name): + del self._indexes[index_name] + + def list_indexes(self): + definitions = [] + for idx in self._indexes.itervalues(): + definitions.append((idx._name, idx._definition)) + return definitions + + def get_from_index(self, index_name, *key_values): + try: + index = self._indexes[index_name] + except KeyError: + raise errors.IndexDoesNotExist + doc_ids = index.lookup(key_values) + result = [] + for doc_id in doc_ids: + result.append(self._get_doc(doc_id, check_for_conflicts=True)) + return result + + def get_range_from_index(self, index_name, start_value=None, + end_value=None): + """Return all documents with key values in the specified range.""" + try: + index = self._indexes[index_name] + except KeyError: + raise errors.IndexDoesNotExist + if isinstance(start_value, basestring): + start_value = (start_value,) + if isinstance(end_value, basestring): + end_value = (end_value,) + doc_ids = index.lookup_range(start_value, end_value) + result = [] + for doc_id in doc_ids: + result.append(self._get_doc(doc_id, check_for_conflicts=True)) + return result + + def get_index_keys(self, index_name): + try: + index = self._indexes[index_name] + except KeyError: + raise errors.IndexDoesNotExist + keys = index.keys() + # XXX inefficiency warning + return list(set([tuple(key.split('\x01')) for key in keys])) + + +couch.CouchDatabase = IndexedCouchDatabase + sync_scenarios = [] for name, scenario in COUCH_SCENARIOS: scenario = dict(scenario) @@ -343,98 +447,4 @@ class CouchDatabaseSyncTests(test_sync.DatabaseSyncTests, CouchDBTestCase): test_sync.DatabaseSyncTests.tearDown(self) -#----------------------------------------------------------------------------- -# The following tests test extra functionality introduced by our backends -#----------------------------------------------------------------------------- - -class CouchDatabaseStorageTests(CouchDBTestCase): - - def _listify(self, l): - if type(l) is dict: - return { - self._listify(a): self._listify(b) for a, b in l.iteritems()} - if hasattr(l, '__iter__'): - return [self._listify(i) for i in l] - return l - - def _fetch_u1db_data(self, db, key): - doc = db._get_doc("%s%s" % (db.U1DB_DATA_DOC_ID_PREFIX, key)) - return doc.content['content'] - - def test_transaction_log_storage_after_put(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - db.create_doc({'simple': 'doc'}) - content = self._fetch_u1db_data(db, db.U1DB_TRANSACTION_LOG_KEY) - self.assertEqual( - self._listify(db._transaction_log), - self._listify(content)) - - def test_conflict_log_storage_after_put_if_newer(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - doc = db.create_doc({'simple': 'doc'}) - doc.set_json(nested_doc) - doc.rev = db._replica_uid + ':2' - db._force_doc_sync_conflict(doc) - content = self._fetch_u1db_data(db, db.U1DB_CONFLICTS_KEY) - self.assertEqual( - self._listify(db._conflicts), - self._listify(content)) - - def test_other_gens_storage_after_set(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - doc = db.create_doc({'simple': 'doc'}) - db._set_replica_gen_and_trans_id('a', 'b', 'c') - content = self._fetch_u1db_data(db, db.U1DB_OTHER_GENERATIONS_KEY) - self.assertEqual( - self._listify(db._other_generations), - self._listify(content)) - - def test_index_storage_after_create(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - doc = db.create_doc({'name': 'john'}) - db.create_index('myindex', 'name') - content = self._fetch_u1db_data(db, db.U1DB_INDEXES_KEY) - myind = db._indexes['myindex'] - index = { - 'myindex': { - 'definition': myind._definition, - 'name': myind._name, - 'values': myind._values, - } - } - self.assertEqual( - self._listify(index), - self._listify(content)) - - def test_index_storage_after_delete(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - doc = db.create_doc({'name': 'john'}) - db.create_index('myindex', 'name') - db.create_index('myindex2', 'name') - db.delete_index('myindex') - content = self._fetch_u1db_data(db, db.U1DB_INDEXES_KEY) - myind = db._indexes['myindex2'] - index = { - 'myindex2': { - 'definition': myind._definition, - 'name': myind._name, - 'values': myind._values, - } - } - self.assertEqual( - self._listify(index), - self._listify(content)) - - def test_replica_uid_storage_after_db_creation(self): - db = couch.CouchDatabase('http://localhost:' + str(self.wrapper.port), - 'u1db_tests') - content = self._fetch_u1db_data(db, db.U1DB_REPLICA_UID_KEY) - self.assertEqual(db._replica_uid, content) - - load_tests = tests.load_with_scenarios -- cgit v1.2.3 From af913a77b384a97d857c37ba9c0a8730b0846373 Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 10 Dec 2013 22:24:47 -0200 Subject: Remove index tests for couch backend. --- common/src/leap/soledad/common/tests/test_couch.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index a181c6cb..48b3585f 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -267,16 +267,16 @@ class CouchWithConflictsTests( test_backends.LocalDatabaseWithConflictsTests.tearDown(self) -# Notice: the CouchDB backend does not have indexing capabilities, but we -# added in memory indexing for tests only. +# Notice: the CouchDB backend does not have indexing capabilities, so we do +# not test indexing now. -class CouchIndexTests(test_backends.DatabaseIndexTests, CouchDBTestCase): - - scenarios = COUCH_SCENARIOS - - def tearDown(self): - self.db.delete_database() - test_backends.DatabaseIndexTests.tearDown(self) +#class CouchIndexTests(test_backends.DatabaseIndexTests, CouchDBTestCase): +# +# scenarios = COUCH_SCENARIOS +# +# def tearDown(self): +# self.db.delete_database() +# test_backends.DatabaseIndexTests.tearDown(self) #----------------------------------------------------------------------------- -- cgit v1.2.3 From 96cf0af3880aa9bec8fcad422526297ef7f40964 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 11 Dec 2013 15:32:42 -0200 Subject: Add 'ddocs.py' generation on setup.py. --- common/setup.py | 208 +++++++++++++++-------- common/src/leap/soledad/common/.gitignore | 1 + common/src/leap/soledad/common/couch.py | 42 ++++- common/src/leap/soledad/common/ddocs/README.txt | 5 + common/src/leap/soledad/common/ddocs/__init__.py | 138 --------------- 5 files changed, 179 insertions(+), 215 deletions(-) create mode 100644 common/src/leap/soledad/common/.gitignore delete mode 100644 common/src/leap/soledad/common/ddocs/__init__.py diff --git a/common/setup.py b/common/setup.py index 42bf272a..e142d958 100644 --- a/common/setup.py +++ b/common/setup.py @@ -103,93 +103,163 @@ def get_versions(default={}, verbose=False): f.write(subst_template) +# +# Couch backend design docs file generation. +# + from os import listdir from os.path import realpath, dirname, isdir, join, isfile, basename import json -import logging import binascii old_cmd_sdist = cmdclass["sdist"] +def build_ddocs_py(basedir=None, with_src=True): + """ + Build `ddocs.py` file. + + For ease of development, couch backend design documents are stored as + `.js` files in subdirectories of `src/leap/soledad/common/ddocs`. This + function scans that directory for javascript files, builds the design + documents structure, and encode those structures in the `ddocs.py` file. + + This function is used when installing in develop mode, building or + generating source distributions (see the next classes and the `cmdclass` + setuptools parameter. + + This funciton uses the following conventions to generate design documents: + + - Design documents are represented by directories in the form + `/`, there prefix is the `src/leap/soledad/common/ddocs` + directory. + - Design document directories might contain `views`, `lists` and + `updates` subdirectories. + - Views subdirectories must contain a `map.js` file and may contain a + `reduce.js` file. + - List and updates subdirectories may contain any number of javascript + files (i.e. ending in `.js`) whose names will be mapped to the + corresponding list or update function name. + """ + cur_pwd = dirname(realpath(__file__)) + common_path = ('src', 'leap', 'soledad', 'common') + dest_common_path = common_path + if not with_src: + dest_common_path = common_path[1:] + prefix = join(cur_pwd, *common_path) + + dest_prefix = prefix + if basedir is not None: + # we're bulding a sdist + dest_prefix = join(basedir, *dest_common_path) + + ddocs_prefix = join(prefix, 'ddocs') + ddocs = {} + + # design docs are represented by subdirectories of `ddocs_prefix` + for ddoc in [f for f in listdir(ddocs_prefix) + if isdir(join(ddocs_prefix, f))]: + + ddocs[ddoc] = {'_id': '_design/%s' % ddoc} + + for t in ['views', 'lists', 'updates']: + tdir = join(ddocs_prefix, ddoc, t) + if isdir(tdir): + + ddocs[ddoc][t] = {} + + if t == 'views': # handle views (with map/reduce functions) + for view in [f for f in listdir(tdir) + if isdir(join(tdir, f))]: + # look for map.js and reduce.js + mapfile = join(tdir, view, 'map.js') + reducefile = join(tdir, view, 'reduce.js') + mapfun = None + reducefun = None + try: + with open(mapfile) as f: + mapfun = f.read() + except IOError: + pass + try: + with open(reducefile) as f: + reducefun = f.read() + except IOError: + pass + ddocs[ddoc]['views'][view] = {} + + if mapfun is not None: + ddocs[ddoc]['views'][view]['map'] = mapfun + if reducefun is not None: + ddocs[ddoc]['views'][view]['reduce'] = reducefun + + else: # handle lists, updates, etc + for fun in [f for f in listdir(tdir) + if isfile(join(tdir, f))]: + funfile = join(tdir, fun) + funname = basename(funfile).replace('.js', '') + try: + with open(funfile) as f: + ddocs[ddoc][t][funname] = f.read() + except IOError: + pass + # write file containing design docs strings + ddoc_filename = "ddocs.py" + with open(join(dest_prefix, ddoc_filename), 'w') as f: + for ddoc in ddocs: + f.write( + "%s = '%s'\n" % + (ddoc, binascii.b2a_base64(json.dumps(ddocs[ddoc]))[:-1])) + print "Wrote design docs in %s" % (dest_prefix + '/' + ddoc_filename,) + + +from setuptools.command.develop import develop as _cmd_develop + + +class cmd_develop(_cmd_develop): + def run(self): + # versioneer: + versions = versioneer.get_versions(verbose=True) + self._versioneer_generated_versions = versions + # unless we update this, the command will keep using the old version + self.distribution.metadata.version = versions["version"] + _cmd_develop.run(self) + build_ddocs_py() + + +# versioneer powered +old_cmd_sdist = cmdclass["sdist"] + + class cmd_sdist(old_cmd_sdist): """ - Generate 'src/leap/soledad/common/ddocs.py' which contains coush design + Generate 'src/leap/soledad/common/ddocs.py' which contains couch design documents scripts. """ - def run(self): old_cmd_sdist.run(self) - self.build_ddocs_py() - - def build_ddocs_py(self): - """ - Build couch design documents based on content from subdirectories in - 'src/leap/soledad/common/ddocs'. - """ - prefix = join( - dirname(realpath(__file__)), - 'src', 'leap', 'soledad', 'common') - ddocs_prefix = join(prefix, 'ddocs') - ddocs = {} - - # design docs are represented by subdirectories of `ddocs_prefix` - for ddoc in [f for f in listdir(ddocs_prefix) - if isdir(join(ddocs_prefix, f))]: - - ddocs[ddoc] = {'_id': '_design/%s' % ddoc} - - for t in ['views', 'lists', 'updates']: - tdir = join(ddocs_prefix, ddoc, t) - if isdir(tdir): - - ddocs[ddoc][t] = {} - - if t == 'views': # handle views (with map/reduce functions) - for view in [f for f in listdir(tdir) \ - if isdir(join(tdir, f))]: - # look for map.js and reduce.js - mapfile = join(tdir, view, 'map.js') - reducefile = join(tdir, view, 'reduce.js') - mapfun = None - reducefun = None - try: - with open(mapfile) as f: - mapfun = f.read() - except IOError: - pass - try: - with open(reducefile) as f: - reducefun = f.read() - except IOError: - pass - ddocs[ddoc]['views'][view] = {} - - if mapfun is not None: - ddocs[ddoc]['views'][view]['map'] = mapfun - if reducefun is not None: - ddocs[ddoc]['views'][view]['reduce'] = reducefun - - else: # handle lists, updates, etc - for fun in [f for f in listdir(tdir) \ - if isfile(join(tdir, f))]: - funfile = join(tdir, fun) - funname = basename(funfile).replace('.js', '') - try: - with open(funfile) as f: - ddocs[ddoc][t][funname] = f.read() - except IOError: - pass - # write file containing design docs strings - with open(join(prefix, 'ddocs.py'), 'w') as f: - for ddoc in ddocs: - f.write( - "%s = '%s'\n" % - (ddoc, binascii.b2a_base64(json.dumps(ddocs[ddoc]))[:-1])) + + def make_release_tree(self, base_dir, files): + old_cmd_sdist.make_release_tree(self, base_dir, files) + build_ddocs_py(basedir=base_dir) + + +# versioneer powered +old_cmd_build = cmdclass["build"] + + +class cmd_build(old_cmd_build): + def run(self): + old_cmd_build.run(self) + build_ddocs_py(basedir=self.build_lib, with_src=False) cmdclass["freeze_debianver"] = freeze_debianver +cmdclass["build"] = cmd_build +cmdclass["sdist"] = cmd_sdist +cmdclass["develop"] = cmd_develop + # XXX add ref to docs diff --git a/common/src/leap/soledad/common/.gitignore b/common/src/leap/soledad/common/.gitignore new file mode 100644 index 00000000..3378c78a --- /dev/null +++ b/common/src/leap/soledad/common/.gitignore @@ -0,0 +1 @@ +ddocs.py diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index b9e699f3..d2414477 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -34,9 +34,8 @@ from u1db.remote import http_app from u1db.remote.server_state import ServerState -from leap.soledad.common import USER_DB_PREFIX +from leap.soledad.common import USER_DB_PREFIX, ddocs from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.ddocs import ensure_ddocs_on_db logger = logging.getLogger(__name__) @@ -187,7 +186,7 @@ class CouchDatabase(CommonBackend): return cls(url, dbname) def __init__(self, url, dbname, replica_uid=None, full_commit=True, - session=None, ensure_ddocs=False): + session=None, ensure_ddocs=True): """ Create a new Couch data container. @@ -220,9 +219,27 @@ class CouchDatabase(CommonBackend): except ResourceNotFound: self._server.create(self._dbname) self._database = self._server[self._dbname] - self._set_replica_uid(replica_uid or uuid.uuid4().hex) + if replica_uid is not None: + self._set_replica_uid(replica_uid) if ensure_ddocs: - ensure_ddocs_on_db(self._database) + self.ensure_ddocs_on_db() + + def ensure_ddocs_on_db(self): + """ + Ensure that the design documents used by the backend exist on the + couch database. + """ + # we check for existence of one of the files, and put all of them if + # that one does not exist + try: + self._database['_design/docs'] + return + except ResourceNotFound: + for ddoc_name in ['docs', 'syncs', 'transactions']: + ddoc = json.loads( + binascii.a2b_base64( + getattr(ddocs, ddoc_name))) + self._database.save(ddoc) def get_sync_target(self): """ @@ -261,9 +278,11 @@ class CouchDatabase(CommonBackend): :type replica_uid: str """ try: + # set on existent config document doc = self._database['u1db_config'] doc['replica_uid'] = replica_uid except ResourceNotFound: + # or create the config document doc = { '_id': 'u1db_config', 'replica_uid': replica_uid, @@ -280,9 +299,16 @@ class CouchDatabase(CommonBackend): """ if self._real_replica_uid is not None: return self._real_replica_uid - doc = self._database['u1db_config'] - self._real_replica_uid = doc['replica_uid'] - return self._real_replica_uid + try: + # grab replica_uid from server + doc = self._database['u1db_config'] + self._real_replica_uid = doc['replica_uid'] + return self._real_replica_uid + except ResourceNotFound: + # create a unique replica_uid + self._real_replica_uid = uuid.uuid4().hex + self._set_replica_uid(self._real_replica_uid) + return self._real_replica_uid _replica_uid = property(_get_replica_uid, _set_replica_uid) diff --git a/common/src/leap/soledad/common/ddocs/README.txt b/common/src/leap/soledad/common/ddocs/README.txt index 37d89fbf..5569d929 100644 --- a/common/src/leap/soledad/common/ddocs/README.txt +++ b/common/src/leap/soledad/common/ddocs/README.txt @@ -1,3 +1,8 @@ +This directory holds a folder structure containing javascript files that +represent the design documents needed by the CouchDB U1DB backend. These files +are compiled into the `../ddocs.py` file by setuptools when creating the +source distribution. + The following table depicts the U1DB CouchDB backend method and the URI that is queried to obtain/update data from/to the server. diff --git a/common/src/leap/soledad/common/ddocs/__init__.py b/common/src/leap/soledad/common/ddocs/__init__.py deleted file mode 100644 index 389bdff9..00000000 --- a/common/src/leap/soledad/common/ddocs/__init__.py +++ /dev/null @@ -1,138 +0,0 @@ -# -*- coding: utf-8 -*- -# __init__.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - - -""" -CouchDB U1DB backend design documents helper. -""" - - -from os import listdir -from os.path import realpath, dirname, isdir, join, isfile, basename -import json -import logging - - -from couchdb import Document as CouchDocument - - -logger = logging.getLogger(__name__) - - -# where to search for design docs definitions -prefix = dirname(realpath(__file__)) - - -def ensure_ddocs_on_db(db, prefix=prefix): - """ - Ensure that the design documents in C{db} contain. - - :param db: The database in which to create/update the design docs. - :type db: couchdb.client.Server - :param prefix: Where to look for design documents definitions. - :type prefix: str - """ - ddocs = build_ddocs(prefix) - for ddoc_name, ddoc_content in ddocs.iteritems(): - ddoc_id = "_design/%s" % ddoc_name - ddoc = CouchDocument({'_id': ddoc_id}) - ddoc.update(ddoc_content) - # ensure revision if ddoc is already in db - doc = db.get(ddoc_id) - if doc is not None: - ddoc['_rev'] = doc.rev - db.save(ddoc) - - -def create_local_ddocs(prefix=prefix): - """ - Create local design docs based on content from subdirectories in - C{prefix}. - - :param create_local: Whether to create local .json files. - :type create_local: bool - """ - ddocs = build_ddocs(prefix) - for ddoc_name, ddoc_content in ddocs.iteritems(): - with open(join(prefix, '%s.json' % ddoc_name), 'w') as f: - f.write(json.dumps(ddoc_content, indent=4)) - - -def build_ddocs(prefix=prefix): - """ - Build design documents based on content from subdirectories in - C{prefix}. - - :param prefix: Where to look for design documents definitions. - :type prefix: str - - :return: A dictionary containing the design docs definitions. - :rtype: dict - """ - ddocs = {} - # design docs are represented by subdirectories in current directory - for ddoc in [f for f in listdir(prefix) if isdir(join(prefix, f))]: - logger.debug("Building %s.json ..." % ddoc) - - ddocs[ddoc] = {} - - for t in ['views', 'lists', 'updates']: - tdir = join(prefix, ddoc, t) - if not isdir(tdir): - logger.debug(" - no %s" % t) - else: - - ddocs[ddoc][t] = {} - - if t == 'views': # handle views (with map/reduce functions) - for view in [f for f in listdir(tdir) \ - if isdir(join(tdir, f))]: - logger.debug(" - view: %s" % view) - # look for map.js and reduce.js - mapfile = join(tdir, view, 'map.js') - reducefile = join(tdir, view, 'reduce.js') - mapfun = None - reducefun = None - try: - with open(mapfile) as f: - mapfun = f.read() - except IOError: - pass - try: - with open(reducefile) as f: - reducefun = f.read() - except IOError: - pass - ddocs[ddoc]['views'][view] = {} - - if mapfun is not None: - ddocs[ddoc]['views'][view]['map'] = mapfun - if reducefun is not None: - ddocs[ddoc]['views'][view]['reduce'] = reducefun - - else: # handle lists, updates, etc - for fun in [f for f in listdir(tdir) \ - if isfile(join(tdir, f))]: - logger.debug(" - %s: %s" % (t, fun)) - funfile = join(tdir, fun) - funname = basename(funfile).replace('.js', '') - try: - with open(funfile) as f: - ddocs[ddoc][t][funname] = f.read() - except IOError: - pass - return ddocs -- cgit v1.2.3 From 42b02476ff70326e2d52fa70a94f1f7035cb185a Mon Sep 17 00:00:00 2001 From: drebs Date: Sun, 15 Dec 2013 02:04:14 -0200 Subject: Add database migration script. --- scripts/migrate_dbs.py | 288 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 288 insertions(+) create mode 100644 scripts/migrate_dbs.py diff --git a/scripts/migrate_dbs.py b/scripts/migrate_dbs.py new file mode 100644 index 00000000..f1c20d87 --- /dev/null +++ b/scripts/migrate_dbs.py @@ -0,0 +1,288 @@ +#!/usr/bin/python + +import sys +import json +import logging +import argparse +import re +import threading +from urlparse import urlparse +from ConfigParser import ConfigParser +from couchdb.client import Server +from couchdb.http import ResourceNotFound, Resource, Session +from datetime import datetime + +from leap.soledad.common.couch import CouchDatabase + + +# parse command line for the log file name +logger_fname = "/tmp/u1db-couch-db-migration_%s.log" % \ + str(datetime.now()).replace(' ', '_') +parser = argparse.ArgumentParser() +parser.add_argument('--log', action='store', default=logger_fname, type=str, + required=False, help='the name of the log file', nargs=1) +args = parser.parse_args() + + +# configure the logger +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +print "Logging to %s." % args.log +logging.basicConfig( + filename=args.log, + format="%(asctime)-15s %(message)s") + + +# configure threads +max_threads = 20 +semaphore_pool = threading.BoundedSemaphore(value=max_threads) + +# get couch url +cp = ConfigParser() +cp.read('/etc/leap/soledad-server.conf') +url = cp.get('soledad-server', 'couch_url') + +resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) +server = Server(url=resource) + +hidden_url = re.sub( + 'http://(.*):.*@', + 'http://\\1:xxxxx@', + url) + +print """ +========== +ATTENTION! +========== + +This script will modify Soledad's shared and user databases in: + + %s + +This script does not make a backup of the couch db data, so make sure youj +have a copy or you may loose data. +""" % hidden_url +confirm = raw_input("Proceed (type uppercase YES)? ") + +if confirm != "YES": + exit(1) + + +# +# Thread +# + +class DocWorkerThread(threading.Thread): + + def __init__(self, dbname, doc_id, db_idx, db_len, doc_idx, doc_len, + transaction_log, conflict_log, release_fun): + threading.Thread.__init__(self) + resource = Resource(url, Session(retry_delays=[1,2,4,8], timeout=10)) + server = Server(url=resource) + self._dbname = dbname + self._cdb = server[self._dbname] + self._doc_id = doc_id + self._db_idx = db_idx + self._db_len = db_len + self._doc_idx = doc_idx + self._doc_len = doc_len + self._transaction_log = transaction_log + self._conflict_log = conflict_log + self._release_fun = release_fun + + def run(self): + + old_doc = self._cdb[self._doc_id] + + # skip non u1db docs + if 'u1db_rev' not in old_doc: + logger.debug('(%d/%d) (%d/%d) Skipping %s/%s).' % + (self._db_idx, self._db_len, self._doc_idx, + self._doc_len, self._dbname, self._doc_id)) + self._release_fun() + return + else: + logger.debug('(%d/%d) (%d/%d) Processing %s/%s ...' % + (self._db_idx, self._db_len, self._doc_idx, + self._doc_len, self._dbname, self._doc_id)) + + doc = { + '_id': self._doc_id, + '_rev': old_doc['_rev'], + 'u1db_rev': old_doc['u1db_rev'] + } + attachments = [] + + # add transactions + doc['u1db_transactions'] = map( + lambda (gen, doc_id, trans_id): (gen, trans_id), + filter( + lambda (gen, doc_id, trans_id): doc_id == doc['_id'], + self._transaction_log)) + if len(doc['u1db_transactions']) == 0: + del doc['u1db_transactions'] + + # add conflicts + if doc['_id'] in self._conflict_log: + attachments.append([ + conflict_log[doc['_id']], + 'u1db_conflicts', + "application/octet-stream"]) + + # move document's content to 'u1db_content' attachment + content = self._cdb.get_attachment(doc, 'u1db_json') + if content is not None: + attachments.append([ + content, + 'u1db_content', + "application/octet-stream"]) + #self._cdb.delete_attachment(doc, 'u1db_json') + + # save modified doc + self._cdb.save(doc) + + # save all doc attachments + for content, att_name, content_type in attachments: + self._cdb.put_attachment( + doc, + content, + filename=att_name, + content_type=content_type) + + # release the semaphore + self._release_fun() + + +db_idx = 0 +db_len = len(server) +for dbname in server: + + db_idx += 1 + + if not (dbname.startswith('user-') or dbname == 'shared') \ + or dbname == 'user-test-db': + logger.info("(%d/%d) Skipping db %s." % (db_idx, db_len, dbname)) + continue + + logger.info("(%d/%d) Migrating db %s." % (db_idx, db_len, dbname)) + + # get access to couch db + cdb = Server(url)[dbname] + + # get access to soledad db + sdb = CouchDatabase(url, dbname) + + # Migration table + # --------------- + # + # * Metadata that was previously stored in special documents migrate to + # inside documents, to allow for atomic doc-and-metadata updates. + # * Doc content attachment name changes. + # * Indexes are removed, to be implemented in the future possibly as + # design docs view functions. + # + # +-----------------+-------------------------+-------------------------+ + # | Data | old storage | new storage | + # |-----------------+-------------------------+-------------------------+ + # | doc content | /u1db_json | /u1db_content | + # | doc conflicts | u1db/_conflicts | /u1db_conflicts | + # | transaction log | u1db/_transaction_log | doc.u1db_transactions | + # | sync log | u1db/_other_generations | u1db_sync_log | + # | indexes | u1db/_indexes | not implemented | + # | replica uid | u1db/_replica_uid | u1db_config | + # +-----------------+-------------------------+-------------------------+ + + def get_att_content(db, doc_id, att_name): + try: + return json.loads( + db.get_attachment( + doc_id, att_name).read())['content'] + except: + import ipdb + ipdb.set_trace() + + # only migrate databases that have the 'u1db/_replica_uid' document + try: + metadoc = cdb.get('u1db/_replica_uid') + replica_uid = get_att_content(cdb, 'u1db/_replica_uid', 'u1db_json') + except ResourceNotFound: + continue + + #--------------------------------------------------------------------- + # Step 1: Set replica uid. + #--------------------------------------------------------------------- + sdb._set_replica_uid(replica_uid) + + #--------------------------------------------------------------------- + # Step 2: Obtain metadata. + #--------------------------------------------------------------------- + + # obtain the transaction log: [['', ''], ...] + transaction_log = get_att_content( + cdb, 'u1db/_transaction_log', 'u1db_json') + new_transaction_log = [] + gen = 1 + for (doc_id, trans_id) in transaction_log: + new_transaction_log.append((gen, doc_id, trans_id)) + gen += 1 + transaction_log = new_transaction_log + + # obtain the conflict log: {'': ['', ''], ...} + conflict_log = get_att_content(cdb, 'u1db/_conflicts', 'u1db_json') + + # obtain the sync log: + # {'': ['', ''], ...} + other_generations = get_att_content( + cdb, 'u1db/_other_generations', 'u1db_json') + + #--------------------------------------------------------------------- + # Step 3: Iterate over all documents in database. + #--------------------------------------------------------------------- + doc_len = len(cdb) + logger.info("(%d, %d) Found %d documents." % (db_idx, db_len, doc_len)) + doc_idx = 0 + threads = [] + for doc_id in cdb: + doc_idx = doc_idx + 1 + + semaphore_pool.acquire() + thread = DocWorkerThread(dbname, doc_id, db_idx, db_len, + doc_idx, doc_len, transaction_log, + conflict_log, semaphore_pool.release) + thread.daemon = True + thread.start() + threads.append(thread) + + map(lambda thread: thread.join(), threads) + + #--------------------------------------------------------------------- + # Step 4: Move sync log. + #--------------------------------------------------------------------- + + # move sync log + sync_doc = { + '_id': 'u1db_sync_log', + 'syncs': [] + } + + for replica_uid in other_generations: + gen, transaction_id = other_generations[replica_uid] + sync_doc['syncs'].append([replica_uid, gen, transaction_id]) + cdb.save(sync_doc) + + #--------------------------------------------------------------------- + # Step 5: Delete old meta documents. + #--------------------------------------------------------------------- + + # remove unused docs + for doc_id in ['_transaction_log', '_conflicts', '_other_generations', + '_indexes', '_replica_uid']: + for prefix in ['u1db/', 'u1db%2F']: + try: + doc = cdb['%s%s' % (prefix, doc_id)] + logger.info( + "(%d/%d) Deleting %s/%s/%s." % + (db_idx, db_len, dbname, 'u1db', doc_id)) + cdb.delete(doc) + except ResourceNotFound: + pass -- cgit v1.2.3 From 69c9e8776b3c30af74440482b8d1e37945e0d2b3 Mon Sep 17 00:00:00 2001 From: drebs Date: Sun, 15 Dec 2013 14:40:48 -0200 Subject: Add changes file. --- common/changes/bug_4475_remodel-couch-backend | 2 ++ common/src/leap/soledad/common/tests/test_couch.py | 16 +++++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 common/changes/bug_4475_remodel-couch-backend diff --git a/common/changes/bug_4475_remodel-couch-backend b/common/changes/bug_4475_remodel-couch-backend new file mode 100644 index 00000000..13a1b121 --- /dev/null +++ b/common/changes/bug_4475_remodel-couch-backend @@ -0,0 +1,2 @@ + o Remodel couch backend to fix concurrency and scalability. Closes #4475, + #4682, #4683 and #4680. diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 48b3585f..72346333 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -223,8 +223,22 @@ class CouchTests(test_backends.AllDatabaseTests, CouchDBTestCase): scenarios = COUCH_SCENARIOS + def setUp(self): + test_backends.AllDatabaseTests.setUp(self) + # save db info because of test_close + self._server = self.db._server + self._dbname = self.db._dbname + def tearDown(self): - self.db.delete_database() + # if current test is `test_close` we have to use saved objects to + # delete the database because the close() method will have removed the + # references needed to do it using the CouchDatabase. + if self.id() == \ + 'leap.soledad.common.tests.test_couch.CouchTests.' \ + 'test_close(couch)': + del(self._server[self._dbname]) + else: + self.db.delete_database() test_backends.AllDatabaseTests.tearDown(self) -- cgit v1.2.3 From ebb113849f26e91803f5c2017a8576fbd2de7033 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 19 Dec 2013 11:42:31 -0200 Subject: Add couch atomicity tests. --- .../tests/test_couch_operations_atomicity.py | 339 +++++++++++++++++++++ 1 file changed, 339 insertions(+) create mode 100644 common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py new file mode 100644 index 00000000..a0c473b1 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -0,0 +1,339 @@ +# -*- coding: utf-8 -*- +# test_soledad.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +""" + +import os +import mock +import tempfile +import threading + +from leap.soledad.client import Soledad +from leap.soledad.common.couch import CouchDatabase, CouchServerState +from leap.soledad.common.tests.test_couch import CouchDBTestCase +from leap.soledad.common.tests.u1db_tests import TestCaseWithServer +from leap.soledad.common.tests.test_target import ( + make_token_soledad_app, + make_leap_document_for_test, + token_leap_sync_target, +) + + +REPEAT_TIMES = 20 + + +class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): + + @staticmethod + def make_app_after_state(state): + return make_token_soledad_app(state) + + make_document_for_test = make_leap_document_for_test + + sync_target = token_leap_sync_target + + def _soledad_instance(self, user='user-uuid', passphrase=u'123', + prefix='', + secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME, + local_db_path='soledad.u1db', server_url='', + cert_file=None, auth_token=None, secret_id=None): + """ + Instantiate Soledad. + """ + + # this callback ensures we save a document which is sent to the shared + # db. + def _put_doc_side_effect(doc): + self._doc_put = doc + + # we need a mocked shared db or else Soledad will try to access the + # network to find if there are uploaded secrets. + class MockSharedDB(object): + + get_doc = mock.Mock(return_value=None) + put_doc = mock.Mock(side_effect=_put_doc_side_effect) + lock = mock.Mock(return_value=('atoken', 300)) + unlock = mock.Mock() + + def __call__(self): + return self + + Soledad._shared_db = MockSharedDB() + return Soledad( + user, + passphrase, + secrets_path=os.path.join(self.tempdir, prefix, secrets_path), + local_db_path=os.path.join( + self.tempdir, prefix, local_db_path), + server_url=server_url, + cert_file=cert_file, + auth_token=auth_token, + secret_id=secret_id) + + def make_app(self): + self.request_state = CouchServerState(self._couch_url, 'shared', + 'tokens') + return self.make_app_after_state(self.request_state) + + def setUp(self): + TestCaseWithServer.setUp(self) + CouchDBTestCase.setUp(self) + self._couch_url = 'http://localhost:' + str(self.wrapper.port) + self.db = CouchDatabase( + self._couch_url, 'user-user-uuid', replica_uid='replica') + self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") + + def tearDown(self): + CouchDBTestCase.tearDown(self) + TestCaseWithServer.tearDown(self) + + # + # Sequential tests + # + + def test_correct_transaction_log_after_sequential_puts(self): + """ + Assert that the transaction_log increases accordingly with sequential + puts. + """ + doc = self.db.create_doc({'ops': 0}) + ops = 1 + docs = [doc.doc_id] + for i in range(0, REPEAT_TIMES): + self.assertEqual( + i+1, len(self.db._get_transaction_log())) + doc.content['ops'] += 1 + self.db.put_doc(doc) + docs.append(doc.doc_id) + + # assert length of transaction_log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + REPEAT_TIMES+1, len(transaction_log)) + + # assert that all entries in the log belong to the same doc + self.assertEqual(REPEAT_TIMES+1, len(docs)) + for doc_id in docs: + self.assertEqual( + REPEAT_TIMES+1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_transaction_log_after_sequential_deletes(self): + """ + Assert that the transaction_log increases accordingly with sequential + puts and deletes. + """ + docs = [] + for i in range(0, REPEAT_TIMES): + doc = self.db.create_doc({'ops': 0}) + self.assertEqual( + 2*i+1, len(self.db._get_transaction_log())) + docs.append(doc.doc_id) + self.db.delete_doc(doc) + self.assertEqual( + 2*i+2, len(self.db._get_transaction_log())) + + # assert length of transaction_log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + 2*REPEAT_TIMES, len(transaction_log)) + + # assert that each doc appears twice in the transaction_log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 2, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_sync_log_after_sequential_syncs(self): + """ + Assert that the sync_log increases accordingly with sequential syncs. + """ + self.startServer() + sol = self._soledad_instance( + auth_token='auth-token', + server_url=self.getURL()) + + def _create_docs_and_sync(sol, syncs): + # create a lot of documents + for i in range(0, REPEAT_TIMES): + sol.create_doc({}) + # assert sizes of transaction and sync logs + self.assertEqual( + syncs*REPEAT_TIMES, + len(self.db._get_transaction_log())) + self.assertEqual( + 1 if syncs > 0 else 0, + len(self.db._database.view('syncs/log').rows)) + # sync to the remote db + sol.sync() + gen, docs = self.db.get_all_docs() + self.assertEqual((syncs+1)*REPEAT_TIMES, gen) + self.assertEqual((syncs+1)*REPEAT_TIMES, len(docs)) + # assert sizes of transaction and sync logs + self.assertEqual((syncs+1)*REPEAT_TIMES, + len(self.db._get_transaction_log())) + sync_log_rows = self.db._database.view('syncs/log').rows + sync_log = sync_log_rows[0].value + replica_uid = sync_log_rows[0].key + known_gen = sync_log['known_generation'] + known_trans_id = sync_log['known_transaction_id'] + # assert sync_log has exactly 1 row + self.assertEqual(1, len(sync_log_rows)) + # assert it has the correct replica_uid, gen and trans_id + self.assertEqual(sol._db._replica_uid, replica_uid) + sol_gen, sol_trans_id = sol._db._get_generation_info() + self.assertEqual(sol_gen, known_gen) + self.assertEqual(sol_trans_id, known_trans_id) + + _create_docs_and_sync(sol, 0) + _create_docs_and_sync(sol, 1) + + # + # Concurrency tests + # + + class _WorkerThread(threading.Thread): + + def __init__(self, params, run_method): + threading.Thread.__init__(self) + self._params = params + self._run_method = run_method + + def run(self): + self._run_method(self) + + def test_correct_transaction_log_after_concurrent_puts(self): + """ + Assert that the transaction_log increases accordingly with concurrent + puts. + """ + pool = threading.BoundedSemaphore(value=1) + threads = [] + docs = [] + + def _run_method(self): + doc = self._params['db'].create_doc({}) + pool.acquire() + self._params['docs'].append(doc.doc_id) + pool.release() + + + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread( + {'docs': docs, 'db': self.db}, + _run_method) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + # assert length of transaction_log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + REPEAT_TIMES, len(transaction_log)) + + # assert all documents are in the log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_transaction_log_after_concurrent_deletes(self): + """ + Assert that the transaction_log increases accordingly with concurrent + puts and deletes. + """ + threads = [] + docs = [] + pool = threading.BoundedSemaphore(value=1) + + # create/delete method that will be run concurrently + def _run_method(self): + doc = self._params['db'].create_doc({}) + pool.acquire() + docs.append(doc.doc_id) + pool.release() + self._params['db'].delete_doc(doc) + + # launch concurrent threads + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread({'db': self.db}, _run_method) + thread.start() + threads.append(thread) + + # wait for threads to finish + for thread in threads: + thread.join() + + # assert transaction log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + 2*REPEAT_TIMES, len(transaction_log)) + # assert that each doc appears twice in the transaction_log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 2, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_sync_log_after_concurrent_puts_and_sync(self): + """ + Assert that the sync_log is correct after concurrent syncs. + """ + threads = [] + docs = [] + pool = threading.BoundedSemaphore(value=1) + self.startServer() + sol = self._soledad_instance( + auth_token='auth-token', + server_url=self.getURL()) + + def _run_method(self): + # create a lot of documents + doc = self._params['sol'].create_doc({}) + pool.acquire() + docs.append(doc.doc_id) + pool.release() + + # launch threads to create documents in parallel + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread( + {'sol': sol, 'syncs': i}, + _run_method) + thread.start() + threads.append(thread) + + # wait for threads to finish + for thread in threads: + thread.join() + + # do the sync! + sol.sync() + + transaction_log = self.db._get_transaction_log() + self.assertEqual(REPEAT_TIMES, len(transaction_log)) + # assert all documents are in the remote log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) -- cgit v1.2.3