diff options
Diffstat (limited to 'common/src/leap/soledad/common')
15 files changed, 1325 insertions, 660 deletions
diff --git a/common/src/leap/soledad/common/README.txt b/common/src/leap/soledad/common/README.txt new file mode 100644 index 00000000..106efb5e --- /dev/null +++ b/common/src/leap/soledad/common/README.txt @@ -0,0 +1,79 @@ +Soledad common package +====================== + +This package contains Soledad bits used by both server and client. + +Couch U1DB Backend +------------------ + +U1DB backends rely on some atomic operations that modify documents contents +and metadata (conflicts, transaction ids and indexes). The only atomic +operation in Couch is a document put, so every u1db atomic operation has to be +mapped to a couch document put. + +The atomic operations in the U1DB SQLite reference backend implementation may +be identified by the use of a context manager to access the underlying +database. A listing of the methods involved in each atomic operation are +depiced below. The top-level elements correpond to the atomic operations that +have to be mapped, and items on deeper levels of the list have to be +implemented in a way that all changes will be pushed with just one operation. + + * _set_replica_uid + * put_doc: + * _get_doc + * _put_and_update_indexes + * insert/update the document + * insert into transaction log + * delete_doc + * _get_doc + * _put_and_update_indexes + * get_doc_conflicts + * _get_conflicts + * _set_replica_gen_and_trans_id + * _do_set_replica_gen_and_trans_id + * _put_doc_if_newer + * _get_doc + * _validate_source (**) + * _get_replica_gen_and_trans_id + * cases: + * is newer: + * _prune_conflicts (**) + * _has_conflicts + * _delete_conflicts + * _put_and_update_indexes + * same content as: + * _put_and_update_indexes + * conflicted: + * _force_doc_sync_conflict + * _prune_conflicts + * _add_conflict + * _put_and_update_indexes + * _do_set_replica_gen_and_trans_id + * resolve_doc + * _get_doc + * cases: + * doc is superseded + * _put_and_update_indexes + * else + * _add_conflict + * _delete_conflicts + * delete_index + * create_index + +Couch views and update functions are used in order to achieve atomicity on the +Couch backend. Transactions are stored in the `u1db_transactions` field of the +couch document. Document's content and conflicted versions are stored as couch +document attachments with names, respectivelly, `u1db_content` and +`u1db_conflicts`. + +A map of methods and couch query URI can be found on the `./ddocs/README.txt` +document. + +Notes: + + * Currently, the couch backend does not implement indexing, so what is + depicted as `_put_and_update_indexes` above will be found as `_put_doc` in + the backend. + + * Conflict updates are part of document put using couch update functions, + and as such are part of the same atomic operation as document put. diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 1396f4d7..3b0e042a 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -18,26 +18,34 @@ """A U1DB backend that uses CouchDB as its persistence layer.""" -import re +import os import simplejson as json -import socket +import sys +import time +import uuid import logging +import binascii +import socket -from u1db import errors -from u1db.sync import Synchronizer -from u1db.backends.inmemory import InMemoryIndex +from u1db.backends import CommonBackend, CommonSyncTarget from u1db.remote.server_state import ServerState -from u1db.errors import DatabaseDoesNotExist -from couchdb.client import Server, Document as CouchDocument +from u1db import ( + errors, + query_parser, + vectorclock, +) +from couchdb.client import ( + Server, + Document as CouchDocument, + _doc_resource, +) from couchdb.http import ResourceNotFound, Unauthorized from leap.soledad.common import USER_DB_PREFIX -from leap.soledad.common.objectstore import ( - ObjectStoreDatabase, - ObjectStoreSyncTarget, -) +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common.ddocs import ensure_ddocs_on_remote_db logger = logging.getLogger(__name__) @@ -49,158 +57,125 @@ class InvalidURLError(Exception): """ -def persistent_class(cls): +class CouchDocument(SoledadDocument): """ - Decorator that modifies a class to ensure u1db metadata persists on - underlying storage. + This is the document used for maintaining the Couch backend. - @param cls: The class that will be modified. - @type cls: type + A CouchDocument can fetch and manipulate conflicts and also holds a + reference to the couch document revision. This data is used to ensure an + atomic and consistent update of the database. """ - def _create_persistent_method(old_method_name, key, load_method_name, - dump_method_name, store): - """ - Create a persistent method to replace C{old_method_name}. - - The new method will load C{key} using C{load_method_name} and stores - it using C{dump_method_name} depending on the value of C{store}. - """ - # get methods - old_method = getattr(cls, old_method_name) - load_method = getattr(cls, load_method_name) \ - if load_method_name is not None \ - else lambda self, data: setattr(self, key, data) - dump_method = getattr(cls, dump_method_name) \ - if dump_method_name is not None \ - else lambda self: getattr(self, key) - - def _new_method(self, *args, **kwargs): - # get u1db data from couch db - doc = self._get_doc('%s%s' % - (self.U1DB_DATA_DOC_ID_PREFIX, key)) - load_method(self, doc.content['content']) - # run old method - retval = old_method(self, *args, **kwargs) - # store u1db data on couch - if store: - doc.content = {'content': dump_method(self)} - self._put_doc(doc) - return retval - - return _new_method - - # ensure the class has a persistency map - if not hasattr(cls, 'PERSISTENCY_MAP'): - logger.error('Class %s has no PERSISTENCY_MAP attribute, skipping ' - 'persistent methods substitution.' % cls) - return cls - # replace old methods with new persistent ones - for key, ((load_method_name, dump_method_name), - persistent_methods) in cls.PERSISTENCY_MAP.iteritems(): - for (method_name, store) in persistent_methods: - setattr(cls, method_name, - _create_persistent_method( - method_name, - key, - load_method_name, - dump_method_name, - store)) - return cls - - -@persistent_class -class CouchDatabase(ObjectStoreDatabase): + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, + syncable=True): + """ + Container for handling a document that is stored in couch backend. + + :param doc_id: The unique document identifier. + :type doc_id: str + :param rev: The revision identifier of the document. + :type rev: str + :param json: The JSON string for this document. + :type json: str + :param has_conflicts: Boolean indicating if this document has conflicts + :type has_conflicts: bool + :param syncable: Should this document be synced with remote replicas? + :type syncable: bool + """ + SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) + self._couch_rev = None + self._conflicts = None + self._modified_conflicts = False + + def ensure_fetch_conflicts(self, get_conflicts_fun): + """ + Ensure conflict data has been fetched from the server. + + :param get_conflicts_fun: A function which, given the document id and + the couch revision, return the conflicted + versions of the current document. + :type get_conflicts_fun: function + """ + if self._conflicts is None: + self._conflicts = get_conflicts_fun(self.doc_id, + couch_rev=self.couch_rev) + self.has_conflicts = len(self._conflicts) > 0 + + def get_conflicts(self): + """ + Get the conflicted versions of the document. + + :return: The conflicted versions of the document. + :rtype: [CouchDocument] + """ + return self._conflicts + + def add_conflict(self, doc): + """ + Add a conflict to this document. + + :param doc: The conflicted version to be added. + :type doc: CouchDocument + """ + if self._conflicts is None: + raise Exception("Run self.ensure_fetch_conflicts first!") + self._modified_conflicts = True + self._conflicts.append(doc) + self.has_conflicts = len(self._conflicts) > 0 + + def delete_conflicts(self, conflict_revs): + """ + Delete conflicted versions of this document. + + :param conflict_revs: The conflicted revisions to be deleted. + :type conflict_revs: [str] + """ + if self._conflicts is None: + raise Exception("Run self.ensure_fetch_conflicts first!") + conflicts_len = len(self._conflicts) + self._conflicts = filter( + lambda doc: doc.rev not in conflict_revs, + self._conflicts) + if len(self._conflicts) < conflicts_len: + self._modified_conflicts = True + self.has_conflicts = len(self._conflicts) > 0 + + def modified_conflicts(self): + """ + Return whether this document's conflicts have been modified. + + :return: Whether this document's conflicts have been modified. + :rtype: bool + """ + return self._conflicts is not None and \ + self._modified_conflicts is True + + def _get_couch_rev(self): + return self._couch_rev + + def _set_couch_rev(self, rev): + self._couch_rev = rev + + couch_rev = property(_get_couch_rev, _set_couch_rev) + + +class CouchDatabase(CommonBackend): """ - A U1DB backend that uses Couch as its persistence layer. + A U1DB implementation that uses CouchDB as its persistence layer. """ - U1DB_TRANSACTION_LOG_KEY = '_transaction_log' - U1DB_CONFLICTS_KEY = '_conflicts' - U1DB_OTHER_GENERATIONS_KEY = '_other_generations' - U1DB_INDEXES_KEY = '_indexes' - U1DB_REPLICA_UID_KEY = '_replica_uid' - - U1DB_DATA_KEYS = [ - U1DB_TRANSACTION_LOG_KEY, - U1DB_CONFLICTS_KEY, - U1DB_OTHER_GENERATIONS_KEY, - U1DB_INDEXES_KEY, - U1DB_REPLICA_UID_KEY, - ] - - COUCH_ID_KEY = '_id' - COUCH_REV_KEY = '_rev' - COUCH_U1DB_ATTACHMENT_KEY = 'u1db_json' - COUCH_U1DB_REV_KEY = 'u1db_rev' - - # the following map describes information about methods usage of - # properties that have to persist on the underlying database. The format - # of the map is assumed to be: - # - # { - # 'property_name': [ - # ('property_load_method_name', 'property_dump_method_name'), - # [('method_1_name', bool), - # ... - # ('method_N_name', bool)]], - # ... - # } - # - # where the booleans indicate if the property should be stored after - # each method execution (i.e. if the method alters the property). Property - # load/dump methods will be run after/before properties are read/written - # to the underlying db. - PERSISTENCY_MAP = { - U1DB_TRANSACTION_LOG_KEY: [ - ('_load_transaction_log_from_json', None), - [('_get_transaction_log', False), - ('_get_generation', False), - ('_get_generation_info', False), - ('_get_trans_id_for_gen', False), - ('whats_changed', False), - ('_put_and_update_indexes', True)]], - U1DB_CONFLICTS_KEY: [ - (None, None), - [('_has_conflicts', False), - ('get_doc_conflicts', False), - ('_prune_conflicts', False), - ('resolve_doc', False), - ('_replace_conflicts', True), - ('_force_doc_sync_conflict', True)]], - U1DB_OTHER_GENERATIONS_KEY: [ - ('_load_other_generations_from_json', None), - [('_get_replica_gen_and_trans_id', False), - ('_do_set_replica_gen_and_trans_id', True)]], - U1DB_INDEXES_KEY: [ - ('_load_indexes_from_json', '_dump_indexes_as_json'), - [('list_indexes', False), - ('get_from_index', False), - ('get_range_from_index', False), - ('get_index_keys', False), - ('_put_and_update_indexes', True), - ('create_index', True), - ('delete_index', True)]], - U1DB_REPLICA_UID_KEY: [ - (None, None), - [('_allocate_doc_rev', False), - ('_put_doc_if_newer', False), - ('_ensure_maximal_rev', False), - ('_prune_conflicts', False), - ('_set_replica_uid', True)]]} - @classmethod def open_database(cls, url, create): """ Open a U1DB database using CouchDB as backend. - @param url: the url of the database replica - @type url: str - @param create: should the replica be created if it does not exist? - @type create: bool + :param url: the url of the database replica + :type url: str + :param create: should the replica be created if it does not exist? + :type create: bool - @return: the database instance - @rtype: CouchDatabase + :return: the database instance + :rtype: CouchDatabase """ # get database from url m = re.match('(^https?://[^/]+)/(.+)$', url) @@ -221,293 +196,795 @@ class CouchDatabase(ObjectStoreDatabase): """ Create a new Couch data container. - @param url: the url of the couch database - @type url: str - @param dbname: the database name - @type dbname: str - @param replica_uid: an optional unique replica identifier - @type replica_uid: str - @param full_commit: turn on the X-Couch-Full-Commit header - @type full_commit: bool - @param session: an http.Session instance or None for a default session - @type session: http.Session + :param url: the url of the couch database + :type url: str + :param dbname: the database name + :type dbname: str + :param replica_uid: an optional unique replica identifier + :type replica_uid: str + :param full_commit: turn on the X-Couch-Full-Commit header + :type full_commit: bool + :param session: an http.Session instance or None for a default session + :type session: http.Session + :param ensure_ddocs: Ensure that the design docs exist on server. + :type ensure_ddocs: bool """ # save params self._url = url self._full_commit = full_commit self._session = session + self._factory = CouchDocument + self._real_replica_uid = None # configure couch self._server = Server(url=self._url, full_commit=self._full_commit, session=self._session) self._dbname = dbname - # this will ensure that transaction and sync logs exist and are - # up-to-date. try: self._database = self._server[self._dbname] except ResourceNotFound: self._server.create(self._dbname) self._database = self._server[self._dbname] - ObjectStoreDatabase.__init__(self, replica_uid=replica_uid) + self._initialize(replica_uid or uuid.uuid4().hex) + + def get_sync_target(self): + """ + Return a SyncTarget object, for another u1db to synchronize with. + + :return: The sync target. + :rtype: CouchSyncTarget + """ + return CouchSyncTarget(self) + + def delete_database(self): + """ + Delete a U1DB CouchDB database. + """ + del(self._server[self._dbname]) + + def close(self): + """ + Release any resources associated with this database. + + :return: True if db was succesfully closed. + :rtype: bool + """ + self._url = None + self._full_commit = None + self._session = None + self._server = None + self._database = None + return True + + def _set_replica_uid(self, replica_uid): + """ + Force the replica uid to be set. + + :param replica_uid: The new replica uid. + :type replica_uid: str + """ + try: + doc = self._database['u1db_config'] + except ResourceNotFound: + doc = { + '_id': 'u1db_config', + 'replica_uid': replica_uid, + } + self._database.save(doc) + + def _ensure_design_docs(self): + """ + Ensure that the design docs have been created. + """ + if self._is_initialized(): + return + self._initialize() + + def _set_replica_uid(self, replica_uid): + """Force the replica_uid to be set.""" + doc = self._database['u1db_config'] + doc['replica_uid'] = replica_uid + self._database.save(doc) + self._real_replica_uid = replica_uid + + def _get_replica_uid(self): + """ + Get the replica uid. + + :return: The replica uid. + :rtype: str + """ + if self._real_replica_uid is not None: + return self._real_replica_uid + doc = self._database['u1db_config'] + self._real_replica_uid = doc['replica_uid'] + return self._real_replica_uid + + _replica_uid = property(_get_replica_uid, _set_replica_uid) + + def _get_generation(self): + """ + Return the current generation. + + :return: The current generation. + :rtype: int + """ + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'generation', 'log') + response = res.get_json() + return response[2]['generation'] + + def _get_generation_info(self): + """ + Return the current generation. - #------------------------------------------------------------------------- - # methods from Database - #------------------------------------------------------------------------- + :return: A tuple containing the current generation and transaction id. + :rtype: (int, str) + """ + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'generation', 'log') + response = res.get_json() + return (response[2]['generation'], response[2]['transaction_id']) + + def _get_trans_id_for_gen(self, generation): + """ + Get the transaction id corresponding to a particular generation. + + :param generation: The generation for which to get the transaction id. + :type generation: int + + :return: The transaction id for C{generation}. + :rtype: str + + :raise InvalidGeneration: Raised when the generation does not exist. + """ + if generation == 0: + return '' + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'trans_id_for_gen', 'log') + response = res.get_json(gen=generation) + if response[2] == {}: + raise errors.InvalidGeneration + return response[2]['transaction_id'] + + def _get_transaction_log(self): + """ + This is only for the test suite, it is not part of the api. + + :return: The complete transaction log. + :rtype: [(str, str)] + """ + # query a couch view + res = self._database.resource( + '_design', 'transactions', '_view', 'log') + response = res.get_json() + return map(lambda row: (row['id'], row['value']), response[2]['rows']) def _get_doc(self, doc_id, check_for_conflicts=False): """ - Get just the document content, without fancy handling. + Extract the document from storage. + + This can return None if the document doesn't exist. + + :param doc_id: The unique document identifier + :type doc_id: str + :param check_for_conflicts: If set to False, then the conflict check + will be skipped. + :type check_for_conflicts: bool + + :return: The document. + :rtype: CouchDocument + """ + # get document with all attachments (u1db content and eventual + # conflicts) + try: + result = \ + self._database.resource(doc_id).get_json( + attachments=True)[2] + except ResourceNotFound: + return None + # restrict to u1db documents + if 'u1db_rev' not in result: + return None + doc = self._factory(doc_id, result['u1db_rev']) + # set contents or make tombstone + if '_attachments' not in result \ + or 'u1db_content' not in result['_attachments']: + doc.make_tombstone() + else: + doc.content = json.loads( + binascii.a2b_base64( + result['_attachments']['u1db_content']['data'])) + # determine if there are conflicts + if check_for_conflicts \ + and '_attachments' in result \ + and 'u1db_conflicts' in result['_attachments']: + doc.has_conflicts = True + # store couch revision + doc.couch_rev = result['_rev'] + return doc + + def get_doc(self, doc_id, include_deleted=False): + """ + Get the JSON string for the given document. - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be + :param doc_id: The unique document identifier + :type doc_id: str + :param include_deleted: If set to True, deleted documents will be returned with empty content. Otherwise asking for a deleted document will return None. - @type include_deleted: bool + :type include_deleted: bool - @return: a Document object. - @type: u1db.Document + :return: A document object. + :rtype: CouchDocument. """ - cdoc = self._database.get(doc_id) - if cdoc is None: + doc = self._get_doc(doc_id, check_for_conflicts=True) + if doc is None: + return None + if doc.is_tombstone() and not include_deleted: return None - has_conflicts = False - if check_for_conflicts: - has_conflicts = self._has_conflicts(doc_id) - doc = self._factory( - doc_id=doc_id, - rev=cdoc[self.COUCH_U1DB_REV_KEY], - has_conflicts=has_conflicts) - contents = self._database.get_attachment( - cdoc, - self.COUCH_U1DB_ATTACHMENT_KEY) - if contents: - doc.content = json.loads(contents.read()) - else: - doc.make_tombstone() return doc def get_all_docs(self, include_deleted=False): """ Get the JSON content for all documents in the database. - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise deleted documents will not - be included in the results. - @type include_deleted: bool + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :type include_deleted: bool - @return: (generation, [Document]) + :return: (generation, [CouchDocument]) The current generation of the database, followed by a list of all the documents in the database. - @rtype: tuple + :rtype: (int, [CouchDocument]) """ + generation = self._get_generation() results = [] - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue - doc = self._get_doc(doc_id, check_for_conflicts=True) - if doc.content is None and not include_deleted: - continue - results.append(doc) + for row in self._database.view('_all_docs'): + doc = self.get_doc(row.id, include_deleted=include_deleted) + if doc is not None: + results.append(doc) return (generation, results) - def _put_doc(self, doc): + def _put_doc(self, old_doc, doc): """ - Update a document. + Put the document in the Couch backend database. - This is called everytime we just want to do a raw put on the db (i.e. - without index updates, document constraint checks, and conflict - checks). - - @param doc: The document to update. - @type doc: u1db.Document - - @return: The new revision identifier for the document. - @rtype: str - """ - # prepare couch's Document - cdoc = CouchDocument() - cdoc[self.COUCH_ID_KEY] = doc.doc_id - # we have to guarantee that couch's _rev is consistent - old_cdoc = self._database.get(doc.doc_id) - if old_cdoc is not None: - cdoc[self.COUCH_REV_KEY] = old_cdoc[self.COUCH_REV_KEY] - # store u1db's rev - cdoc[self.COUCH_U1DB_REV_KEY] = doc.rev - # save doc in db - self._database.save(cdoc) - # store u1db's content as json string - if not doc.is_tombstone(): - self._database.put_attachment( - cdoc, doc.get_json(), - filename=self.COUCH_U1DB_ATTACHMENT_KEY) - else: - self._database.delete_attachment( - cdoc, - self.COUCH_U1DB_ATTACHMENT_KEY) + :param old_doc: The old document version. + :type old_doc: CouchDocument + :param doc: The document to be put. + :type doc: CouchDocument - def get_sync_target(self): + :raise RevisionConflict: Raised when trying to update a document but + couch revisions mismatch. """ - Return a SyncTarget object, for another u1db to synchronize with. - - @return: The sync target. - @rtype: CouchSyncTarget + trans_id = self._allocate_transaction_id() + # encode content + content = doc.get_json() + if content is not None: + content = binascii.b2a_base64(content)[:-1] # exclude trailing \n + # encode conflicts + conflicts = None + update_conflicts = doc.modified_conflicts() + if update_conflicts is True: + if doc.has_conflicts: + conflicts = binascii.b2a_base64( + json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + )[:-1] # exclude \n + # perform the request + resource = self._database.resource( + '_design', 'docs', '_update', 'put', doc.doc_id) + response = resource.put_json( + body={ + 'couch_rev': old_doc.couch_rev + if old_doc is not None else None, + 'u1db_rev': doc.rev, + 'content': content, + 'trans_id': trans_id, + 'conflicts': conflicts, + 'update_conflicts': update_conflicts, + }, + headers={'content-type': 'application/json'}) + # the document might have been updated in between, so we check for the + # return message + msg = response[2].read() + if msg == 'ok': + return + elif msg == 'revision conflict': + raise errors.RevisionConflict() + + def put_doc(self, doc): """ - return CouchSyncTarget(self) + Update a document. - def create_index(self, index_name, *index_expressions): - """ - Create a named index, which can then be queried for future lookups. + If the document currently has conflicts, put will fail. + If the database specifies a maximum document size and the document + exceeds it, put will fail and raise a DocumentTooBig exception. - @param index_name: A unique name which can be used as a key prefix. - @param index_expressions: Index expressions defining the index - information. - """ - if index_name in self._indexes: - if self._indexes[index_name]._definition == list( - index_expressions): - return - raise errors.IndexNameTakenError - index = InMemoryIndex(index_name, list(index_expressions)) - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue # skip special files - doc = self._get_doc(doc_id) - if doc.content is not None: - index.add_json(doc_id, doc.get_json()) - self._indexes[index_name] = index + :param doc: A Document with new content. + :return: new_doc_rev - The new revision identifier for the document. + The Document object will also be updated. - def close(self): + :raise errors.InvalidDocId: Raised if the document's id is invalid. + :raise errors.DocumentTooBig: Raised if the document size is too big. + :raise errors.ConflictedDoc: Raised if the document has conflicts. """ - Release any resources associated with this database. - - @return: True if db was succesfully closed. - @rtype: bool + if doc.doc_id is None: + raise errors.InvalidDocId() + self._check_doc_id(doc.doc_id) + self._check_doc_size(doc) + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc and old_doc.has_conflicts: + raise errors.ConflictedDoc() + if old_doc and doc.rev is None and old_doc.is_tombstone(): + new_rev = self._allocate_doc_rev(old_doc.rev) + else: + if old_doc is not None: + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + else: + if doc.rev is not None: + raise errors.RevisionConflict() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + self._put_doc(old_doc, doc) + return new_rev + + def whats_changed(self, old_generation=0): """ - # TODO: fix this method so the connection is properly closed and - # test_close (+tearDown, which deletes the db) works without problems. - self._url = None - self._full_commit = None - self._session = None - #self._server = None - self._database = None - return True - - def sync(self, url, creds=None, autocreate=True): + Return a list of documents that have changed since old_generation. + + :param old_generation: The generation of the database in the old + state. + :type old_generation: int + + :return: (generation, trans_id, [(doc_id, generation, trans_id),...]) + The current generation of the database, its associated + transaction id, and a list of of changed documents since + old_generation, represented by tuples with for each document + its doc_id and the generation and transaction id corresponding + to the last intervening change and sorted by generation (old + changes first) + :rtype: (int, str, [(str, int, str)]) """ - Synchronize documents with remote replica exposed at url. + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'whats_changed', 'log') + response = res.get_json(old_gen=old_generation) + results = map( + lambda row: + (row['generation'], row['doc_id'], row['transaction_id']), + response[2]['transactions']) + results.reverse() + cur_gen = old_generation + seen = set() + changes = [] + newest_trans_id = '' + for generation, doc_id, trans_id in results: + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + if changes: + cur_gen = changes[0][1] # max generation + newest_trans_id = changes[0][2] + changes.reverse() + else: + cur_gen, newest_trans_id = self._get_generation_info() - @param url: The url of the target replica to sync with. - @type url: str - @param creds: optional dictionary giving credentials. - to authorize the operation with the server. - @type creds: dict - @param autocreate: Ask the target to create the db if non-existent. - @type autocreate: bool + return cur_gen, newest_trans_id, changes - @return: The local generation before the synchronisation was performed. - @rtype: int + def delete_doc(self, doc): """ - return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync( - autocreate=autocreate) + Mark a document as deleted. + + Will abort if the current revision doesn't match doc.rev. + This will also set doc.content to None. - #------------------------------------------------------------------------- - # methods from ObjectStoreDatabase - #------------------------------------------------------------------------- + :param doc: The document to mark as deleted. + :type doc: CouchDocument. - def _init_u1db_data(self): + :raise errors.DocumentDoesNotExist: Raised if the document does not + exist. + :raise errors.RevisionConflict: Raised if the revisions do not match. + :raise errors.DocumentAlreadyDeleted: Raised if the document is + already deleted. + :raise errors.ConflictedDoc: Raised if the doc has conflicts. """ - Initialize u1db configuration data on backend storage. + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc is None: + raise errors.DocumentDoesNotExist + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + if old_doc.is_tombstone(): + raise errors.DocumentAlreadyDeleted + if old_doc.has_conflicts: + raise errors.ConflictedDoc() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + doc.make_tombstone() + self._put_doc(old_doc, doc) + return new_rev + + def _get_conflicts(self, doc_id, couch_rev=None): + """ + Get the conflicted versions of a document. + + If the C{couch_rev} parameter is not None, conflicts for a specific + document's couch revision are returned. - A U1DB database needs to keep track of all database transactions, - document conflicts, the generation of other replicas it has seen, - indexes created by users and so on. + :param couch_rev: The couch document revision. + :type couch_rev: str - In this implementation, all this information is stored in special - documents stored in the underlying with doc_id prefix equal to - U1DB_DATA_DOC_ID_PREFIX. Those documents ids are reserved: put_doc(), - get_doc() and delete_doc() will not allow documents with a doc_id with - that prefix to be accessed or modified. + :return: A list of conflicted versions of the document. + :rtype: list """ - for key in self.U1DB_DATA_KEYS: - doc_id = '%s%s' % (self.U1DB_DATA_DOC_ID_PREFIX, key) - doc = self._get_doc(doc_id) - if doc is None: - doc = self._factory(doc_id) - doc.content = {'content': getattr(self, key)} - self._put_doc(doc) + # request conflicts attachment from server + params = {} + if couch_rev is not None: + params['rev'] = couch_rev # restric document's couch revision + resource = self._database.resource(doc_id, 'u1db_conflicts') + try: + response = resource.get_json(**params) + conflicts = [] + # build the conflicted versions + for doc_rev, content in json.loads(response[2].read()): + doc = self._factory(doc_id, doc_rev) + if content is None: + doc.make_tombstone() + else: + doc.content = content + conflicts.append(doc) + return conflicts + except ResourceNotFound: + return [] - #------------------------------------------------------------------------- - # Couch specific methods - #------------------------------------------------------------------------- + def get_doc_conflicts(self, doc_id): + """ + Get the list of conflicts for the given document. - INDEX_NAME_KEY = 'name' - INDEX_DEFINITION_KEY = 'definition' - INDEX_VALUES_KEY = 'values' + The order of the conflicts is such that the first entry is the value + that would be returned by "get_doc". - def delete_database(self): - """ - Delete a U1DB CouchDB database. + :return: A list of the document entries that are conflicted. + :rtype: [CouchDocument] """ - del(self._server[self._dbname]) + conflict_docs = self._get_conflicts(doc_id) + if len(conflict_docs) == 0: + return [] + this_doc = self._get_doc(doc_id, check_for_conflicts=True) + return [this_doc] + conflict_docs - def _dump_indexes_as_json(self): + def _get_replica_gen_and_trans_id(self, other_replica_uid): """ - Dump index definitions as JSON. + Return the last known generation and transaction id for the other db + replica. + + When you do a synchronization with another replica, the Database keeps + track of what generation the other database replica was at, and what + the associated transaction id was. This is used to determine what data + needs to be sent, and if two databases are claiming to be the same + replica. + + :param other_replica_uid: The identifier for the other replica. + :type other_replica_uid: str + + :return: A tuple containing the generation and transaction id we + encountered during synchronization. If we've never + synchronized with the replica, this is (0, ''). + :rtype: (int, str) """ - indexes = {} - for name, idx in self._indexes.iteritems(): - indexes[name] = {} - for attr in [self.INDEX_NAME_KEY, self.INDEX_DEFINITION_KEY, - self.INDEX_VALUES_KEY]: - indexes[name][attr] = getattr(idx, '_' + attr) - return indexes + # query a couch view + result = self._database.view('syncs/log') + if len(result[other_replica_uid].rows) == 0: + return (0, '') + return ( + result[other_replica_uid].rows[0]['value']['known_generation'], + result[other_replica_uid].rows[0]['value']['known_transaction_id'] + ) + + def _set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + self._do_set_replica_gen_and_trans_id( + other_replica_uid, other_generation, other_transaction_id) - def _load_indexes_from_json(self, indexes): + def _do_set_replica_gen_and_trans_id( + self, other_replica_uid, other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + # query a couch update function + res = self._database.resource( + '_design', 'syncs', '_update', 'put', 'u1db_sync_log') + res.put_json( + body={ + 'other_replica_uid': other_replica_uid, + 'other_generation': other_generation, + 'other_transaction_id': other_transaction_id, + }, + headers={'content-type': 'application/json'}) + + def _add_conflict(self, doc, my_doc_rev, my_content): + """ + Add a conflict to the document. + + Note that this method does not actually update the backend; rather, it + updates the CouchDocument object which will provide the conflict data + when the atomic document update is made. + + :param doc: The document to have conflicts added to. + :type doc: CouchDocument + :param my_doc_rev: The revision of the conflicted document. + :type my_doc_rev: str + :param my_content: The content of the conflicted document as a JSON + serialized string. + :type my_content: str """ - Load index definitions from stored JSON. + doc.ensure_fetch_conflicts(self._get_conflicts) + doc.add_conflict( + self._factory(doc_id=doc.doc_id, rev=my_doc_rev, + json=my_content)) - @param indexes: A JSON representation of indexes as - [('index-name', ['field', 'field2', ...]), ...]. - @type indexes: str + def _delete_conflicts(self, doc, conflict_revs): """ - self._indexes = {} - for name, idx_dict in indexes.iteritems(): - idx = InMemoryIndex(name, idx_dict[self.INDEX_DEFINITION_KEY]) - idx._values = idx_dict[self.INDEX_VALUES_KEY] - self._indexes[name] = idx + Delete the conflicted revisions from the list of conflicts of C{doc}. - def _load_transaction_log_from_json(self, transaction_log): + Note that thie method does not actually update the backed; rather, it + updates the CouchDocument object which will provide the conflict data + when the atomic document update is made. + + :param doc: The document to have conflicts deleted. + :type doc: CouchDocument + :param conflict_revs: A list of the revisions to be deleted. + :param conflict_revs: [str] """ - Load transaction log from stored JSON. + doc.ensure_fetch_conflicts(self._get_conflicts) + doc.delete_conflicts(conflict_revs) - @param transaction_log: A JSON representation of transaction_log as - [('generation', 'transaction_id'), ...]. - @type transaction_log: list + def _prune_conflicts(self, doc, doc_vcr): + """ + Prune conflicts that are older then the current document's revision, or + whose content match to the current document's content. + + :param doc: The document to have conflicts pruned. + :type doc: CouchDocument + :param doc_vcr: A vector clock representing the current document's + revision. + :type doc_vcr: u1db.vectorclock.VectorClock """ - self._transaction_log = [] - for gen, trans_id in transaction_log: - self._transaction_log.append((gen, trans_id)) + if doc.has_conflicts is True: + autoresolved = False + c_revs_to_prune = [] + for c_doc in doc.get_conflicts(): + c_vcr = vectorclock.VectorClockRev(c_doc.rev) + if doc_vcr.is_newer(c_vcr): + c_revs_to_prune.append(c_doc.rev) + elif doc.same_content_as(c_doc): + c_revs_to_prune.append(c_doc.rev) + doc_vcr.maximize(c_vcr) + autoresolved = True + if autoresolved: + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._delete_conflicts(doc, c_revs_to_prune) + + def _force_doc_sync_conflict(self, doc): + """ + Add a conflict and force a document put. - def _load_other_generations_from_json(self, other_generations): + :param doc: The document to be put. + :type doc: CouchDocument """ - Load other generations from stored JSON. + my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + self._add_conflict(doc, my_doc.rev, my_doc.get_json()) + doc.has_conflicts = True + self._put_doc(my_doc, doc) - @param other_generations: A JSON representation of other_generations - as {'replica_uid': ('generation', 'transaction_id'), ...}. - @type other_generations: dict + def resolve_doc(self, doc, conflicted_doc_revs): + """ + Mark a document as no longer conflicted. + + We take the list of revisions that the client knows about that it is + superseding. This may be a different list from the actual current + conflicts, in which case only those are removed as conflicted. This + may fail if the conflict list is significantly different from the + supplied information. (sync could have happened in the background from + the time you GET_DOC_CONFLICTS until the point where you RESOLVE) + + :param doc: A Document with the new content to be inserted. + :type doc: CouchDocument + :param conflicted_doc_revs: A list of revisions that the new content + supersedes. + :type conflicted_doc_revs: [str] """ - self._other_generations = {} - for replica_uid, [gen, trans_id] in other_generations.iteritems(): - self._other_generations[replica_uid] = (gen, trans_id) + cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + new_rev = self._ensure_maximal_rev(cur_doc.rev, + conflicted_doc_revs) + superseded_revs = set(conflicted_doc_revs) + doc.rev = new_rev + if cur_doc.rev in superseded_revs: + self._delete_conflicts(doc, superseded_revs) + self._put_doc(cur_doc, doc) + else: + self._add_conflict(doc, new_rev, doc.get_json()) + self._delete_conflicts(doc, superseded_revs) + # perform request to resolve document in server + resource = self._database.resource( + '_design', 'docs', '_update', 'resolve_doc', doc.doc_id) + conflicts = None + if doc.has_conflicts: + conflicts = binascii.b2a_base64( + json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + )[:-1] # exclude \n + response = resource.put_json( + body={ + 'couch_rev': cur_doc.couch_rev, + 'conflicts': conflicts, + }, + headers={'content-type': 'application/json'}) + + def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, + replica_trans_id=''): + """ + Insert/update document into the database with a given revision. + + This api is used during synchronization operations. + + If a document would conflict and save_conflict is set to True, the + content will be selected as the 'current' content for doc.doc_id, + even though doc.rev doesn't supersede the currently stored revision. + The currently stored document will be added to the list of conflict + alternatives for the given doc_id. + + This forces the new content to be 'current' so that we get convergence + after synchronizing, even if people don't resolve conflicts. Users can + then notice that their content is out of date, update it, and + synchronize again. (The alternative is that users could synchronize and + think the data has propagated, but their local copy looks fine, and the + remote copy is never updated again.) + + :param doc: A document object + :type doc: CouchDocument + :param save_conflict: If this document is a conflict, do you want to + save it as a conflict, or just ignore it. + :type save_conflict: bool + :param replica_uid: A unique replica identifier. + :type replica_uid: str + :param replica_gen: The generation of the replica corresponding to the + this document. The replica arguments are optional, + but are used during synchronization. + :type replica_gen: int + :param replica_trans_id: The transaction_id associated with the + generation. + :type replica_trans_id: str + + :return: (state, at_gen) - If we don't have doc_id already, or if + doc_rev supersedes the existing document revision, then the + content will be inserted, and state is 'inserted'. If + doc_rev is less than or equal to the existing revision, then + the put is ignored and state is respecitvely 'superseded' or + 'converged'. If doc_rev is not strictly superseded or + supersedes, then state is 'conflicted'. The document will not + be inserted if save_conflict is False. For 'inserted' or + 'converged', at_gen is the insertion/current generation. + :rtype: (str, int) + """ + cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + # at this point, `doc` has arrived from the other syncing party, and + # we will decide what to do with it. + # First, we prepare the arriving doc to update couch database. + old_doc = doc + doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) + if cur_doc is not None: + doc.couch_rev = cur_doc.couch_rev + # fetch conflicts because we will eventually manipulate them + doc.ensure_fetch_conflicts(self._get_conflicts) + # from now on, it works just like u1db sqlite backend + doc_vcr = vectorclock.VectorClockRev(doc.rev) + if cur_doc is None: + cur_vcr = vectorclock.VectorClockRev(None) + else: + cur_vcr = vectorclock.VectorClockRev(cur_doc.rev) + self._validate_source(replica_uid, replica_gen, replica_trans_id) + if doc_vcr.is_newer(cur_vcr): + rev = doc.rev + self._prune_conflicts(doc, doc_vcr) + if doc.rev != rev: + # conflicts have been autoresolved + state = 'superseded' + else: + state = 'inserted' + self._put_doc(cur_doc, doc) + elif doc.rev == cur_doc.rev: + # magical convergence + state = 'converged' + elif cur_vcr.is_newer(doc_vcr): + # Don't add this to seen_ids, because we have something newer, + # so we should send it back, and we should not generate a + # conflict + state = 'superseded' + elif cur_doc.same_content_as(doc): + # the documents have been edited to the same thing at both ends + doc_vcr.maximize(cur_vcr) + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._put_doc(cur_doc, doc) + state = 'superseded' + else: + state = 'conflicted' + if save_conflict: + self._force_doc_sync_conflict(doc) + if replica_uid is not None and replica_gen is not None: + self._do_set_replica_gen_and_trans_id( + replica_uid, replica_gen, replica_trans_id) + # update info + old_doc.rev = doc.rev + if doc.is_tombstone(): + old_doc.is_tombstone() + else: + old_doc.content = doc.content + old_doc.has_conflicts = doc.has_conflicts + return state, self._get_generation() -class CouchSyncTarget(ObjectStoreSyncTarget): +class CouchSyncTarget(CommonSyncTarget): """ Functionality for using a CouchDatabase as a synchronization target. """ - pass + + def get_sync_info(self, source_replica_uid): + source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( + source_replica_uid) + my_gen, my_trans_id = self._db._get_generation_info() + return ( + self._db._replica_uid, my_gen, my_trans_id, source_gen, + source_trans_id) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + if self._trace_hook: + self._trace_hook('record_sync_info') + self._db._set_replica_gen_and_trans_id( + source_replica_uid, source_replica_generation, + source_replica_transaction_id) class NotEnoughCouchPermissions(Exception): @@ -527,12 +1004,12 @@ class CouchServerState(ServerState): """ Initialize the couch server state. - @param couch_url: The URL for the couch database. - @type couch_url: str - @param shared_db_name: The name of the shared database. - @type shared_db_name: str - @param tokens_db_name: The name of the tokens database. - @type tokens_db_name: str + :param couch_url: The URL for the couch database. + :type couch_url: str + :param shared_db_name: The name of the shared database. + :type shared_db_name: str + :param tokens_db_name: The name of the tokens database. + :type tokens_db_name: str """ self._couch_url = couch_url self._shared_db_name = shared_db_name @@ -563,12 +1040,12 @@ class CouchServerState(ServerState): couch library to ensure that Soledad Server can do everything it needs on the underlying couch database. - @param couch_url: The URL of the couch database. - @type couch_url: str + :param couch_url: The URL of the couch database. + :type couch_url: str @raise NotEnoughCouchPermissions: Raised in case there are not enough permissions to read/write/create the needed couch databases. - @rtype: bool + :rtype: bool """ def _open_couch_db(dbname): @@ -601,11 +1078,11 @@ class CouchServerState(ServerState): """ Open a couch database. - @param dbname: The name of the database to open. - @type dbname: str + :param dbname: The name of the database to open. + :type dbname: str - @return: The CouchDatabase object. - @rtype: CouchDatabase + :return: The CouchDatabase object. + :rtype: CouchDatabase """ # TODO: open couch return CouchDatabase.open_database( @@ -616,11 +1093,11 @@ class CouchServerState(ServerState): """ Ensure couch database exists. - @param dbname: The name of the database to ensure. - @type dbname: str + :param dbname: The name of the database to ensure. + :type dbname: str - @return: The CouchDatabase object and the replica uid. - @rtype: (CouchDatabase, str) + :return: The CouchDatabase object and the replica uid. + :rtype: (CouchDatabase, str) """ db = CouchDatabase.open_database( self._couch_url + '/' + dbname, @@ -631,8 +1108,8 @@ class CouchServerState(ServerState): """ Delete couch database. - @param dbname: The name of the database to delete. - @type dbname: str + :param dbname: The name of the database to delete. + :type dbname: str """ CouchDatabase.delete_database(self._couch_url + '/' + dbname) @@ -640,8 +1117,8 @@ class CouchServerState(ServerState): """ Set the couchdb URL - @param url: CouchDB URL - @type url: str + :param url: CouchDB URL + :type url: str """ self._couch_url = url @@ -649,7 +1126,7 @@ class CouchServerState(ServerState): """ Return CouchDB URL - @rtype: str + :rtype: str """ return self._couch_url diff --git a/common/src/leap/soledad/common/ddocs/README.txt b/common/src/leap/soledad/common/ddocs/README.txt new file mode 100644 index 00000000..37d89fbf --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/README.txt @@ -0,0 +1,29 @@ +The following table depicts the U1DB CouchDB backend method and the URI that +is queried to obtain/update data from/to the server. + + +----------------------------------+------------------------------------------------------------------+ + | u1db backend method | URI | + |----------------------------------+------------------------------------------------------------------| + | _get_generation | _design/transactions/_list/generation/log | + | _get_generation_info | _design/transactions/_list/generation/log | + | _get_trans_id_for_gen | _design/transactions/_list/trans_id_for_gen/log | + | _get_transaction_log | _design/transactions/_view/log | + | _get_doc (*) | _design/docs/_view/get?key=<doc_id> | + | _has_conflicts | _design/docs/_view/get?key=<doc_id> | + | get_all_docs | _design/docs/_view/get | + | _put_doc | _design/docs/_update/put/<doc_id> | + | _whats_changed | _design/transactions/_list/whats_changed/log?old_gen=<gen> | + | _get_conflicts (*) | _design/docs/_view/conflicts?key=<doc_id> | + | _get_replica_gen_and_trans_id | _design/syncs/_view/log?other_replica_uid=<uid> | + | _do_set_replica_gen_and_trans_id | _design/syncs/_update/put/u1db_sync_log | + | _add_conflict | _design/docs/_update/add_conflict/<doc_id> | + | _delete_conflicts | _design/docs/_update/delete_conflicts/<doc_id>?doc_rev=<doc_rev> | + | list_indexes | not implemented | + | _get_index_definition | not implemented | + | delete_index | not implemented | + | _get_indexed_fields | not implemented | + | _put_and_update_indexes | not implemented | + +----------------------------------+------------------------------------------------------------------+ + +(*) These methods also request CouchDB document attachments that store U1DB + document contents. diff --git a/common/src/leap/soledad/common/ddocs/__init__.py b/common/src/leap/soledad/common/ddocs/__init__.py new file mode 100644 index 00000000..c2f78e18 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/__init__.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +CouchDB U1DB backend design documents helper. +""" + + +from os import listdir +from os.path import realpath, dirname, isdir, join, isfile, basename +import json +import logging + + +from couchdb import Document as CouchDocument + + +logger = logging.getLogger(__name__) + + +# where to search for design docs definitions +prefix = dirname(realpath(__file__)) + + +def ensure_ddocs_on_remote_db(db, prefix=prefix): + """ + Ensure that the design documents in C{db} contain. + + :param db: The database in which to create/update the design docs. + :type db: couchdb.client.Server + :param prefix: Where to look for design documents definitions. + :type prefix: str + """ + ddocs = build_ddocs(prefix) + for ddoc_name, ddoc_content in ddocs.iteritems(): + ddoc_id = "_design/%s" % ddoc_name + ddoc = CouchDocument({'_id': ddoc_id}) + ddoc.update(ddoc_content) + # ensure revision if ddoc is already in db + doc = db.get(ddoc_id) + if doc is not None: + ddoc['_rev'] = doc.rev + db.save(ddoc) + + +def create_local_ddocs(prefix=prefix): + """ + Create local design docs based on content from subdirectories in + C{prefix}. + + :param create_local: Whether to create local .json files. + :type create_local: bool + """ + ddocs = build_ddocs(prefix) + for ddoc_name, ddoc_content in ddocs.iteritems(): + with open(join(prefix, '%s.json' % ddoc_name), 'w') as f: + f.write(json.dumps(ddoc_content, indent=4)) + + +def build_ddocs(prefix=prefix): + """ + Build design documents based on content from subdirectories in + C{prefix}. + + :param prefix: Where to look for design documents definitions. + :type prefix: str + + :return: A dictionary containing the design docs definitions. + :rtype: dict + """ + ddocs = {} + # design docs are represented by subdirectories in current directory + for ddoc in [f for f in listdir(prefix) if isdir(join(prefix, f))]: + logger.debug("Building %s.json ..." % ddoc) + + ddocs[ddoc] = {} + + for t in ['views', 'lists', 'updates']: + tdir = join(prefix, ddoc, t) + if not isdir(tdir): + logger.debug(" - no %s" % t) + else: + + ddocs[ddoc][t] = {} + + if t == 'views': # handle views (with map/reduce functions) + for view in [f for f in listdir(tdir) \ + if isdir(join(tdir, f))]: + logger.debug(" - view: %s" % view) + # look for map.js and reduce.js + mapfile = join(tdir, view, 'map.js') + reducefile = join(tdir, view, 'reduce.js') + mapfun = None + reducefun = None + try: + with open(mapfile) as f: + mapfun = f.read() + except IOError: + pass + try: + with open(reducefile) as f: + reducefun = f.read() + except IOError: + pass + ddocs[ddoc]['views'][view] = {} + + if mapfun is not None: + ddocs[ddoc]['views'][view]['map'] = mapfun + if reducefun is not None: + ddocs[ddoc]['views'][view]['reduce'] = reducefun + + else: # handle lists, updates, etc + for fun in [f for f in listdir(tdir) \ + if isfile(join(tdir, f))]: + logger.debug(" - %s: %s" % (t, fun)) + funfile = join(tdir, fun) + funname = basename(funfile).replace('.js', '') + try: + with open(funfile) as f: + ddocs[ddoc][t][funname] = f.read() + except IOError: + pass + return ddocs diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/put.js b/common/src/leap/soledad/common/ddocs/docs/updates/put.js new file mode 100644 index 00000000..5a4647de --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/updates/put.js @@ -0,0 +1,64 @@ +function(doc, req){ + /* we expect to receive the following in `req.body`: + * { + * 'couch_rev': '<couch_rev>', + * 'u1db_rev': '<u1db_rev>', + * 'content': '<base64 encoded content>', + * 'trans_id': '<reansaction_id>' + * 'conflicts': '<base64 encoded conflicts>', + * 'update_conflicts': <boolean> + * } + */ + var body = JSON.parse(req.body); + + // create a new document document + if (!doc) { + doc = {} + doc['_id'] = req['id']; + } + // or fail if couch revisions do not match + else if (doc['_rev'] != body['couch_rev']) { + // of fail if revisions do not match + return [null, 'revision conflict'] + } + + // store u1db rev + doc.u1db_rev = body['u1db_rev']; + + // save content as attachment + if (body['content'] != null) { + // save u1db content as attachment + if (!doc._attachments) + doc._attachments = {}; + doc._attachments.u1db_content = { + content_type: "application/octet-stream", + data: body['content'] // should be base64 encoded + }; + } + // or delete the attachment if document is tombstone + else if (doc._attachments && + doc._attachments.u1db_content) + delete doc._attachments.u1db_content; + + // store the transaction id + if (!doc.u1db_transactions) + doc.u1db_transactions = []; + var d = new Date(); + doc.u1db_transactions.push([d.getTime(), body['trans_id']]); + + // save conflicts as attachment if they were sent + if (body['update_conflicts']) + if (body['conflicts'] != null) { + if (!doc._attachments) + doc._attachments = {}; + doc._attachments.u1db_conflicts = { + content_type: "application/octet-stream", + data: body['conflicts'] // should be base64 encoded + } + } else { + if(doc._attachments && doc._attachments.u1db_conflicts) + delete doc._attachments.u1db_conflicts + } + + return [doc, 'ok']; +} diff --git a/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js new file mode 100644 index 00000000..7ba66cf8 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/updates/resolve_doc.js @@ -0,0 +1,39 @@ +function(doc, req){ + /* we expect to receive the following in `req.body`: + * { + * 'couch_rev': '<couch_rev>', + * 'conflicts': '<base64 encoded conflicts>', + * } + */ + var body = JSON.parse(req.body); + + // fail if no document was given + if (!doc) { + return [null, 'document does not exist'] + } + + // fail if couch revisions do not match + if (body['couch_rev'] != null + && doc['_rev'] != body['couch_rev']) { + return [null, 'revision conflict'] + } + + // fail if conflicts were not sent + if (body['conflicts'] == null) + return [null, 'missing conflicts'] + + // save conflicts as attachment if they were sent + if (body['conflicts'] != null) { + if (!doc._attachments) + doc._attachments = {}; + doc._attachments.u1db_conflicts = { + content_type: "application/octet-stream", + data: body['conflicts'] // should be base64 encoded + } + } + // or delete attachment if there are no conflicts + else if (doc._attachments && doc._attachments.u1db_conflicts) + delete doc._attachments.u1db_conflicts; + + return [doc, 'ok']; +} diff --git a/common/src/leap/soledad/common/ddocs/docs/views/get/map.js b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js new file mode 100644 index 00000000..ae08d9e9 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/docs/views/get/map.js @@ -0,0 +1,20 @@ +function(doc) { + if (doc.u1db_rev) { + var is_tombstone = true; + var has_conflicts = false; + if (doc._attachments) { + if (doc._attachments.u1db_content) + is_tombstone = false; + if (doc._attachments.u1db_conflicts) + has_conflicts = true; + } + emit(doc._id, + { + "couch_rev": doc._rev, + "u1db_rev": doc.u1db_rev, + "is_tombstone": is_tombstone, + "has_conflicts": has_conflicts, + } + ); + } +} diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js new file mode 100644 index 00000000..722f695a --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -0,0 +1,22 @@ +function(doc, req){ + if (!doc) { + doc = {} + doc['_id'] = 'u1db_sync_log'; + doc['syncs'] = []; + } + body = JSON.parse(req.body); + // remove outdated info + doc['syncs'] = doc['syncs'].filter( + function (entry) { + return entry[0] != body['other_replica_uid']; + } + ); + // store u1db rev + doc['syncs'].push([ + body['other_replica_uid'], + body['other_generation'], + body['other_transaction_id'] + ]); + return [doc, 'ok']; +} + diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js new file mode 100644 index 00000000..a63c7cf4 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js @@ -0,0 +1,12 @@ +function(doc) { + if (doc._id == 'u1db_sync_log') { + if (doc.syncs) + doc.syncs.forEach(function (entry) { + emit(entry[0], + { + 'known_generation': entry[1], + 'known_transaction_id': entry[2] + }); + }); + } +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js new file mode 100644 index 00000000..dbdfff0d --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/generation.js @@ -0,0 +1,20 @@ +function(head, req) { + var row; + var rows=[]; + // fetch all rows + while(row = getRow()) { + rows.push(row); + } + if (rows.length > 0) + send(JSON.stringify({ + "generation": rows.length, + "doc_id": rows[rows.length-1]['id'], + "transaction_id": rows[rows.length-1]['value'] + })); + else + send(JSON.stringify({ + "generation": 0, + "doc_id": "", + "transaction_id": "", + })); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js new file mode 100644 index 00000000..2ec91794 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/trans_id_for_gen.js @@ -0,0 +1,19 @@ +function(head, req) { + var row; + var rows=[]; + var i = 1; + var gen = 1; + if (req.query.gen) + gen = parseInt(req.query['gen']); + // fetch all rows + while(row = getRow()) + rows.push(row); + if (gen <= rows.length) + send(JSON.stringify({ + "generation": gen, + "doc_id": rows[gen-1]['id'], + "transaction_id": rows[gen-1]['value'], + })); + else + send('{}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js new file mode 100644 index 00000000..b35cdf51 --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/lists/whats_changed.js @@ -0,0 +1,22 @@ +function(head, req) { + var row; + var gen = 1; + var old_gen = 0; + if (req.query.old_gen) + old_gen = parseInt(req.query['old_gen']); + send('{"transactions":[\n'); + // fetch all rows + while(row = getRow()) { + if (gen > old_gen) { + if (gen > old_gen+1) + send(',\n'); + send(JSON.stringify({ + "generation": gen, + "doc_id": row["id"], + "transaction_id": row["value"] + })); + } + gen++; + } + send('\n]}'); +} diff --git a/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js new file mode 100644 index 00000000..94ef63ca --- /dev/null +++ b/common/src/leap/soledad/common/ddocs/transactions/views/log/map.js @@ -0,0 +1,7 @@ +function(doc) { + if (doc.u1db_transactions) + doc.u1db_transactions.forEach(function(t) { + emit(t[0], // use timestamp as key so the results are ordered + t[1]); // value is the transaction_id + }); +} diff --git a/common/src/leap/soledad/common/objectstore.py b/common/src/leap/soledad/common/objectstore.py deleted file mode 100644 index 7aff3e32..00000000 --- a/common/src/leap/soledad/common/objectstore.py +++ /dev/null @@ -1,282 +0,0 @@ -# -*- coding: utf-8 -*- -# objectstore.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -Abstract U1DB backend to handle storage using object stores (like CouchDB, for -example). - -This backend uses special documents to store all U1DB data (replica uid, -indexes, transaction logs and info about other dbs). The id of these documents -are reserved and have prefix equal to ObjectStore.U1DB_DATA_DOC_ID_PREFIX. - -Right now, this is only used by CouchDatabase backend, but can also be -extended to implement OpenStack or Amazon S3 storage, for example. - -See U1DB documentation for more information on how to use databases. -""" - - -from base64 import b64encode, b64decode - - -import uuid -import simplejson as json - - -from u1db import errors -from u1db.backends.inmemory import ( - InMemoryDatabase, - InMemorySyncTarget, -) - - -class ObjectStoreDatabase(InMemoryDatabase): - """ - A backend for storing u1db data in an object store. - """ - - U1DB_DATA_DOC_ID_PREFIX = 'u1db/' - - @classmethod - def open_database(cls, url, create, document_factory=None): - """ - Open a U1DB database using an object store as backend. - - @param url: the url of the database replica - @type url: str - @param create: should the replica be created if it does not exist? - @type create: bool - @param document_factory: A function that will be called with the same - parameters as Document.__init__. - @type document_factory: callable - - @return: the database instance - @rtype: CouchDatabase - """ - raise NotImplementedError(cls.open_database) - - def __init__(self, replica_uid=None, document_factory=None): - """ - Initialize the object store database. - - @param replica_uid: an optional unique replica identifier - @type replica_uid: str - @param document_factory: A function that will be called with the same - parameters as Document.__init__. - @type document_factory: callable - """ - InMemoryDatabase.__init__( - self, - replica_uid, - document_factory=document_factory) - if self._replica_uid is None: - self._replica_uid = uuid.uuid4().hex - self._init_u1db_data() - - def _init_u1db_data(self): - """ - Initialize u1db configuration data on backend storage. - - A U1DB database needs to keep track of all database transactions, - document conflicts, the generation of other replicas it has seen, - indexes created by users and so on. - - In this implementation, all this information is stored in special - documents stored in the couch db with id prefix equal to - U1DB_DATA_DOC_ID_PREFIX. Those documents ids are reserved: - put_doc(), get_doc() and delete_doc() will not allow documents with - a doc_id with that prefix to be accessed or modified. - """ - raise NotImplementedError(self._init_u1db_data) - - #------------------------------------------------------------------------- - # methods from Database - #------------------------------------------------------------------------- - - def put_doc(self, doc): - """ - Update a document. - - If the document currently has conflicts, put will fail. - If the database specifies a maximum document size and the document - exceeds it, put will fail and raise a DocumentTooBig exception. - - This method prevents from updating the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc: A Document with new content. - @type doc: Document - - @return: new_doc_rev - The new revision identifier for the document. - The Document object will also be updated. - @rtype: str - """ - if doc.doc_id is not None and \ - doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - return InMemoryDatabase.put_doc(self, doc) - - def _put_doc(self, doc): - """ - Update a document. - - This is called everytime we just want to do a raw put on the db (i.e. - without index updates, document constraint checks, and conflict - checks). - - @param doc: The document to update. - @type doc: u1db.Document - - @return: The new revision identifier for the document. - @rtype: str - """ - raise NotImplementedError(self._put_doc) - - def get_doc(self, doc_id, include_deleted=False): - """ - Get the JSON string for the given document. - - This method prevents from getting the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise asking for a deleted - document will return None. - @type include_deleted: bool - - @return: a Document object. - @rtype: Document - """ - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - return InMemoryDatabase.get_doc(self, doc_id, include_deleted) - - def _get_doc(self, doc_id): - """ - Get just the document content, without fancy handling. - - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise asking for a deleted - document will return None. - @type include_deleted: bool - - @return: a Document object. - @type: u1db.Document - """ - raise NotImplementedError(self._get_doc) - - def get_all_docs(self, include_deleted=False): - """ - Get the JSON content for all documents in the database. - - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise deleted documents will not - be included in the results. - @type include_deleted: bool - - @return: (generation, [Document]) - The current generation of the database, followed by a list of all - the documents in the database. - @rtype: tuple - """ - generation = self._get_generation() - results = [] - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue - doc = self._get_doc(doc_id, check_for_conflicts=True) - if doc.content is None and not include_deleted: - continue - results.append(doc) - return (generation, results) - - def delete_doc(self, doc): - """ - Mark a document as deleted. - - This method prevents from deleting the document with doc_id equals to - self.U1DB_DATA_DOC_ID, which contains U1DB data. - - @param doc: The document to mark as deleted. - @type doc: u1db.Document - - @return: The new revision id of the document. - @type: str - """ - if doc.doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - raise errors.InvalidDocId() - old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) - if old_doc is None: - raise errors.DocumentDoesNotExist - if old_doc.rev != doc.rev: - raise errors.RevisionConflict() - if old_doc.is_tombstone(): - raise errors.DocumentAlreadyDeleted - if old_doc.has_conflicts: - raise errors.ConflictedDoc() - new_rev = self._allocate_doc_rev(doc.rev) - doc.rev = new_rev - doc.make_tombstone() - self._put_and_update_indexes(old_doc, doc) - return new_rev - - # index-related methods - - def create_index(self, index_name, *index_expressions): - """ - Create a named index, which can then be queried for future lookups. - - See U1DB documentation for more information. - - @param index_name: A unique name which can be used as a key prefix. - @param index_expressions: Index expressions defining the index - information. - """ - raise NotImplementedError(self.create_index) - - #------------------------------------------------------------------------- - # implemented methods from CommonBackend - #------------------------------------------------------------------------- - - def _put_and_update_indexes(self, old_doc, doc): - """ - Update a document and all indexes related to it. - - @param old_doc: The old version of the document. - @type old_doc: u1db.Document - @param doc: The new version of the document. - @type doc: u1db.Document - """ - for index in self._indexes.itervalues(): - if old_doc is not None and not old_doc.is_tombstone(): - index.remove_json(old_doc.doc_id, old_doc.get_json()) - if not doc.is_tombstone(): - index.add_json(doc.doc_id, doc.get_json()) - trans_id = self._allocate_transaction_id() - self._put_doc(doc) - self._transaction_log.append((doc.doc_id, trans_id)) - - -class ObjectStoreSyncTarget(InMemorySyncTarget): - """ - Functionality for using an ObjectStore as a synchronization target. - """ diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 42edf9fe..bdef4e0d 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -27,7 +27,6 @@ from base64 import b64decode from leap.common.files import mkdir_p -from leap.soledad.common.document import SoledadDocument from leap.soledad.common.tests import u1db_tests as tests from leap.soledad.common.tests.u1db_tests import test_backends from leap.soledad.common.tests.u1db_tests import test_sync @@ -188,7 +187,7 @@ def copy_couch_database_for_test(test, db): def make_document_for_test(test, doc_id, rev, content, has_conflicts=False): - return SoledadDocument(doc_id, rev, content, has_conflicts=has_conflicts) + return couch.CouchDocument(doc_id, rev, content, has_conflicts=has_conflicts) COUCH_SCENARIOS = [ |