diff options
Diffstat (limited to 'common/src/leap/soledad/common/couch.py')
-rw-r--r-- | common/src/leap/soledad/common/couch.py | 1229 |
1 files changed, 853 insertions, 376 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 1396f4d7..3b0e042a 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -18,26 +18,34 @@ """A U1DB backend that uses CouchDB as its persistence layer.""" -import re +import os import simplejson as json -import socket +import sys +import time +import uuid import logging +import binascii +import socket -from u1db import errors -from u1db.sync import Synchronizer -from u1db.backends.inmemory import InMemoryIndex +from u1db.backends import CommonBackend, CommonSyncTarget from u1db.remote.server_state import ServerState -from u1db.errors import DatabaseDoesNotExist -from couchdb.client import Server, Document as CouchDocument +from u1db import ( + errors, + query_parser, + vectorclock, +) +from couchdb.client import ( + Server, + Document as CouchDocument, + _doc_resource, +) from couchdb.http import ResourceNotFound, Unauthorized from leap.soledad.common import USER_DB_PREFIX -from leap.soledad.common.objectstore import ( - ObjectStoreDatabase, - ObjectStoreSyncTarget, -) +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common.ddocs import ensure_ddocs_on_remote_db logger = logging.getLogger(__name__) @@ -49,158 +57,125 @@ class InvalidURLError(Exception): """ -def persistent_class(cls): +class CouchDocument(SoledadDocument): """ - Decorator that modifies a class to ensure u1db metadata persists on - underlying storage. + This is the document used for maintaining the Couch backend. - @param cls: The class that will be modified. - @type cls: type + A CouchDocument can fetch and manipulate conflicts and also holds a + reference to the couch document revision. This data is used to ensure an + atomic and consistent update of the database. """ - def _create_persistent_method(old_method_name, key, load_method_name, - dump_method_name, store): - """ - Create a persistent method to replace C{old_method_name}. - - The new method will load C{key} using C{load_method_name} and stores - it using C{dump_method_name} depending on the value of C{store}. - """ - # get methods - old_method = getattr(cls, old_method_name) - load_method = getattr(cls, load_method_name) \ - if load_method_name is not None \ - else lambda self, data: setattr(self, key, data) - dump_method = getattr(cls, dump_method_name) \ - if dump_method_name is not None \ - else lambda self: getattr(self, key) - - def _new_method(self, *args, **kwargs): - # get u1db data from couch db - doc = self._get_doc('%s%s' % - (self.U1DB_DATA_DOC_ID_PREFIX, key)) - load_method(self, doc.content['content']) - # run old method - retval = old_method(self, *args, **kwargs) - # store u1db data on couch - if store: - doc.content = {'content': dump_method(self)} - self._put_doc(doc) - return retval - - return _new_method - - # ensure the class has a persistency map - if not hasattr(cls, 'PERSISTENCY_MAP'): - logger.error('Class %s has no PERSISTENCY_MAP attribute, skipping ' - 'persistent methods substitution.' % cls) - return cls - # replace old methods with new persistent ones - for key, ((load_method_name, dump_method_name), - persistent_methods) in cls.PERSISTENCY_MAP.iteritems(): - for (method_name, store) in persistent_methods: - setattr(cls, method_name, - _create_persistent_method( - method_name, - key, - load_method_name, - dump_method_name, - store)) - return cls - - -@persistent_class -class CouchDatabase(ObjectStoreDatabase): + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, + syncable=True): + """ + Container for handling a document that is stored in couch backend. + + :param doc_id: The unique document identifier. + :type doc_id: str + :param rev: The revision identifier of the document. + :type rev: str + :param json: The JSON string for this document. + :type json: str + :param has_conflicts: Boolean indicating if this document has conflicts + :type has_conflicts: bool + :param syncable: Should this document be synced with remote replicas? + :type syncable: bool + """ + SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) + self._couch_rev = None + self._conflicts = None + self._modified_conflicts = False + + def ensure_fetch_conflicts(self, get_conflicts_fun): + """ + Ensure conflict data has been fetched from the server. + + :param get_conflicts_fun: A function which, given the document id and + the couch revision, return the conflicted + versions of the current document. + :type get_conflicts_fun: function + """ + if self._conflicts is None: + self._conflicts = get_conflicts_fun(self.doc_id, + couch_rev=self.couch_rev) + self.has_conflicts = len(self._conflicts) > 0 + + def get_conflicts(self): + """ + Get the conflicted versions of the document. + + :return: The conflicted versions of the document. + :rtype: [CouchDocument] + """ + return self._conflicts + + def add_conflict(self, doc): + """ + Add a conflict to this document. + + :param doc: The conflicted version to be added. + :type doc: CouchDocument + """ + if self._conflicts is None: + raise Exception("Run self.ensure_fetch_conflicts first!") + self._modified_conflicts = True + self._conflicts.append(doc) + self.has_conflicts = len(self._conflicts) > 0 + + def delete_conflicts(self, conflict_revs): + """ + Delete conflicted versions of this document. + + :param conflict_revs: The conflicted revisions to be deleted. + :type conflict_revs: [str] + """ + if self._conflicts is None: + raise Exception("Run self.ensure_fetch_conflicts first!") + conflicts_len = len(self._conflicts) + self._conflicts = filter( + lambda doc: doc.rev not in conflict_revs, + self._conflicts) + if len(self._conflicts) < conflicts_len: + self._modified_conflicts = True + self.has_conflicts = len(self._conflicts) > 0 + + def modified_conflicts(self): + """ + Return whether this document's conflicts have been modified. + + :return: Whether this document's conflicts have been modified. + :rtype: bool + """ + return self._conflicts is not None and \ + self._modified_conflicts is True + + def _get_couch_rev(self): + return self._couch_rev + + def _set_couch_rev(self, rev): + self._couch_rev = rev + + couch_rev = property(_get_couch_rev, _set_couch_rev) + + +class CouchDatabase(CommonBackend): """ - A U1DB backend that uses Couch as its persistence layer. + A U1DB implementation that uses CouchDB as its persistence layer. """ - U1DB_TRANSACTION_LOG_KEY = '_transaction_log' - U1DB_CONFLICTS_KEY = '_conflicts' - U1DB_OTHER_GENERATIONS_KEY = '_other_generations' - U1DB_INDEXES_KEY = '_indexes' - U1DB_REPLICA_UID_KEY = '_replica_uid' - - U1DB_DATA_KEYS = [ - U1DB_TRANSACTION_LOG_KEY, - U1DB_CONFLICTS_KEY, - U1DB_OTHER_GENERATIONS_KEY, - U1DB_INDEXES_KEY, - U1DB_REPLICA_UID_KEY, - ] - - COUCH_ID_KEY = '_id' - COUCH_REV_KEY = '_rev' - COUCH_U1DB_ATTACHMENT_KEY = 'u1db_json' - COUCH_U1DB_REV_KEY = 'u1db_rev' - - # the following map describes information about methods usage of - # properties that have to persist on the underlying database. The format - # of the map is assumed to be: - # - # { - # 'property_name': [ - # ('property_load_method_name', 'property_dump_method_name'), - # [('method_1_name', bool), - # ... - # ('method_N_name', bool)]], - # ... - # } - # - # where the booleans indicate if the property should be stored after - # each method execution (i.e. if the method alters the property). Property - # load/dump methods will be run after/before properties are read/written - # to the underlying db. - PERSISTENCY_MAP = { - U1DB_TRANSACTION_LOG_KEY: [ - ('_load_transaction_log_from_json', None), - [('_get_transaction_log', False), - ('_get_generation', False), - ('_get_generation_info', False), - ('_get_trans_id_for_gen', False), - ('whats_changed', False), - ('_put_and_update_indexes', True)]], - U1DB_CONFLICTS_KEY: [ - (None, None), - [('_has_conflicts', False), - ('get_doc_conflicts', False), - ('_prune_conflicts', False), - ('resolve_doc', False), - ('_replace_conflicts', True), - ('_force_doc_sync_conflict', True)]], - U1DB_OTHER_GENERATIONS_KEY: [ - ('_load_other_generations_from_json', None), - [('_get_replica_gen_and_trans_id', False), - ('_do_set_replica_gen_and_trans_id', True)]], - U1DB_INDEXES_KEY: [ - ('_load_indexes_from_json', '_dump_indexes_as_json'), - [('list_indexes', False), - ('get_from_index', False), - ('get_range_from_index', False), - ('get_index_keys', False), - ('_put_and_update_indexes', True), - ('create_index', True), - ('delete_index', True)]], - U1DB_REPLICA_UID_KEY: [ - (None, None), - [('_allocate_doc_rev', False), - ('_put_doc_if_newer', False), - ('_ensure_maximal_rev', False), - ('_prune_conflicts', False), - ('_set_replica_uid', True)]]} - @classmethod def open_database(cls, url, create): """ Open a U1DB database using CouchDB as backend. - @param url: the url of the database replica - @type url: str - @param create: should the replica be created if it does not exist? - @type create: bool + :param url: the url of the database replica + :type url: str + :param create: should the replica be created if it does not exist? + :type create: bool - @return: the database instance - @rtype: CouchDatabase + :return: the database instance + :rtype: CouchDatabase """ # get database from url m = re.match('(^https?://[^/]+)/(.+)$', url) @@ -221,293 +196,795 @@ class CouchDatabase(ObjectStoreDatabase): """ Create a new Couch data container. - @param url: the url of the couch database - @type url: str - @param dbname: the database name - @type dbname: str - @param replica_uid: an optional unique replica identifier - @type replica_uid: str - @param full_commit: turn on the X-Couch-Full-Commit header - @type full_commit: bool - @param session: an http.Session instance or None for a default session - @type session: http.Session + :param url: the url of the couch database + :type url: str + :param dbname: the database name + :type dbname: str + :param replica_uid: an optional unique replica identifier + :type replica_uid: str + :param full_commit: turn on the X-Couch-Full-Commit header + :type full_commit: bool + :param session: an http.Session instance or None for a default session + :type session: http.Session + :param ensure_ddocs: Ensure that the design docs exist on server. + :type ensure_ddocs: bool """ # save params self._url = url self._full_commit = full_commit self._session = session + self._factory = CouchDocument + self._real_replica_uid = None # configure couch self._server = Server(url=self._url, full_commit=self._full_commit, session=self._session) self._dbname = dbname - # this will ensure that transaction and sync logs exist and are - # up-to-date. try: self._database = self._server[self._dbname] except ResourceNotFound: self._server.create(self._dbname) self._database = self._server[self._dbname] - ObjectStoreDatabase.__init__(self, replica_uid=replica_uid) + self._initialize(replica_uid or uuid.uuid4().hex) + + def get_sync_target(self): + """ + Return a SyncTarget object, for another u1db to synchronize with. + + :return: The sync target. + :rtype: CouchSyncTarget + """ + return CouchSyncTarget(self) + + def delete_database(self): + """ + Delete a U1DB CouchDB database. + """ + del(self._server[self._dbname]) + + def close(self): + """ + Release any resources associated with this database. + + :return: True if db was succesfully closed. + :rtype: bool + """ + self._url = None + self._full_commit = None + self._session = None + self._server = None + self._database = None + return True + + def _set_replica_uid(self, replica_uid): + """ + Force the replica uid to be set. + + :param replica_uid: The new replica uid. + :type replica_uid: str + """ + try: + doc = self._database['u1db_config'] + except ResourceNotFound: + doc = { + '_id': 'u1db_config', + 'replica_uid': replica_uid, + } + self._database.save(doc) + + def _ensure_design_docs(self): + """ + Ensure that the design docs have been created. + """ + if self._is_initialized(): + return + self._initialize() + + def _set_replica_uid(self, replica_uid): + """Force the replica_uid to be set.""" + doc = self._database['u1db_config'] + doc['replica_uid'] = replica_uid + self._database.save(doc) + self._real_replica_uid = replica_uid + + def _get_replica_uid(self): + """ + Get the replica uid. + + :return: The replica uid. + :rtype: str + """ + if self._real_replica_uid is not None: + return self._real_replica_uid + doc = self._database['u1db_config'] + self._real_replica_uid = doc['replica_uid'] + return self._real_replica_uid + + _replica_uid = property(_get_replica_uid, _set_replica_uid) + + def _get_generation(self): + """ + Return the current generation. + + :return: The current generation. + :rtype: int + """ + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'generation', 'log') + response = res.get_json() + return response[2]['generation'] + + def _get_generation_info(self): + """ + Return the current generation. - #------------------------------------------------------------------------- - # methods from Database - #------------------------------------------------------------------------- + :return: A tuple containing the current generation and transaction id. + :rtype: (int, str) + """ + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'generation', 'log') + response = res.get_json() + return (response[2]['generation'], response[2]['transaction_id']) + + def _get_trans_id_for_gen(self, generation): + """ + Get the transaction id corresponding to a particular generation. + + :param generation: The generation for which to get the transaction id. + :type generation: int + + :return: The transaction id for C{generation}. + :rtype: str + + :raise InvalidGeneration: Raised when the generation does not exist. + """ + if generation == 0: + return '' + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'trans_id_for_gen', 'log') + response = res.get_json(gen=generation) + if response[2] == {}: + raise errors.InvalidGeneration + return response[2]['transaction_id'] + + def _get_transaction_log(self): + """ + This is only for the test suite, it is not part of the api. + + :return: The complete transaction log. + :rtype: [(str, str)] + """ + # query a couch view + res = self._database.resource( + '_design', 'transactions', '_view', 'log') + response = res.get_json() + return map(lambda row: (row['id'], row['value']), response[2]['rows']) def _get_doc(self, doc_id, check_for_conflicts=False): """ - Get just the document content, without fancy handling. + Extract the document from storage. + + This can return None if the document doesn't exist. + + :param doc_id: The unique document identifier + :type doc_id: str + :param check_for_conflicts: If set to False, then the conflict check + will be skipped. + :type check_for_conflicts: bool + + :return: The document. + :rtype: CouchDocument + """ + # get document with all attachments (u1db content and eventual + # conflicts) + try: + result = \ + self._database.resource(doc_id).get_json( + attachments=True)[2] + except ResourceNotFound: + return None + # restrict to u1db documents + if 'u1db_rev' not in result: + return None + doc = self._factory(doc_id, result['u1db_rev']) + # set contents or make tombstone + if '_attachments' not in result \ + or 'u1db_content' not in result['_attachments']: + doc.make_tombstone() + else: + doc.content = json.loads( + binascii.a2b_base64( + result['_attachments']['u1db_content']['data'])) + # determine if there are conflicts + if check_for_conflicts \ + and '_attachments' in result \ + and 'u1db_conflicts' in result['_attachments']: + doc.has_conflicts = True + # store couch revision + doc.couch_rev = result['_rev'] + return doc + + def get_doc(self, doc_id, include_deleted=False): + """ + Get the JSON string for the given document. - @param doc_id: The unique document identifier - @type doc_id: str - @param include_deleted: If set to True, deleted documents will be + :param doc_id: The unique document identifier + :type doc_id: str + :param include_deleted: If set to True, deleted documents will be returned with empty content. Otherwise asking for a deleted document will return None. - @type include_deleted: bool + :type include_deleted: bool - @return: a Document object. - @type: u1db.Document + :return: A document object. + :rtype: CouchDocument. """ - cdoc = self._database.get(doc_id) - if cdoc is None: + doc = self._get_doc(doc_id, check_for_conflicts=True) + if doc is None: + return None + if doc.is_tombstone() and not include_deleted: return None - has_conflicts = False - if check_for_conflicts: - has_conflicts = self._has_conflicts(doc_id) - doc = self._factory( - doc_id=doc_id, - rev=cdoc[self.COUCH_U1DB_REV_KEY], - has_conflicts=has_conflicts) - contents = self._database.get_attachment( - cdoc, - self.COUCH_U1DB_ATTACHMENT_KEY) - if contents: - doc.content = json.loads(contents.read()) - else: - doc.make_tombstone() return doc def get_all_docs(self, include_deleted=False): """ Get the JSON content for all documents in the database. - @param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise deleted documents will not - be included in the results. - @type include_deleted: bool + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :type include_deleted: bool - @return: (generation, [Document]) + :return: (generation, [CouchDocument]) The current generation of the database, followed by a list of all the documents in the database. - @rtype: tuple + :rtype: (int, [CouchDocument]) """ + generation = self._get_generation() results = [] - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue - doc = self._get_doc(doc_id, check_for_conflicts=True) - if doc.content is None and not include_deleted: - continue - results.append(doc) + for row in self._database.view('_all_docs'): + doc = self.get_doc(row.id, include_deleted=include_deleted) + if doc is not None: + results.append(doc) return (generation, results) - def _put_doc(self, doc): + def _put_doc(self, old_doc, doc): """ - Update a document. + Put the document in the Couch backend database. - This is called everytime we just want to do a raw put on the db (i.e. - without index updates, document constraint checks, and conflict - checks). - - @param doc: The document to update. - @type doc: u1db.Document - - @return: The new revision identifier for the document. - @rtype: str - """ - # prepare couch's Document - cdoc = CouchDocument() - cdoc[self.COUCH_ID_KEY] = doc.doc_id - # we have to guarantee that couch's _rev is consistent - old_cdoc = self._database.get(doc.doc_id) - if old_cdoc is not None: - cdoc[self.COUCH_REV_KEY] = old_cdoc[self.COUCH_REV_KEY] - # store u1db's rev - cdoc[self.COUCH_U1DB_REV_KEY] = doc.rev - # save doc in db - self._database.save(cdoc) - # store u1db's content as json string - if not doc.is_tombstone(): - self._database.put_attachment( - cdoc, doc.get_json(), - filename=self.COUCH_U1DB_ATTACHMENT_KEY) - else: - self._database.delete_attachment( - cdoc, - self.COUCH_U1DB_ATTACHMENT_KEY) + :param old_doc: The old document version. + :type old_doc: CouchDocument + :param doc: The document to be put. + :type doc: CouchDocument - def get_sync_target(self): + :raise RevisionConflict: Raised when trying to update a document but + couch revisions mismatch. """ - Return a SyncTarget object, for another u1db to synchronize with. - - @return: The sync target. - @rtype: CouchSyncTarget + trans_id = self._allocate_transaction_id() + # encode content + content = doc.get_json() + if content is not None: + content = binascii.b2a_base64(content)[:-1] # exclude trailing \n + # encode conflicts + conflicts = None + update_conflicts = doc.modified_conflicts() + if update_conflicts is True: + if doc.has_conflicts: + conflicts = binascii.b2a_base64( + json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + )[:-1] # exclude \n + # perform the request + resource = self._database.resource( + '_design', 'docs', '_update', 'put', doc.doc_id) + response = resource.put_json( + body={ + 'couch_rev': old_doc.couch_rev + if old_doc is not None else None, + 'u1db_rev': doc.rev, + 'content': content, + 'trans_id': trans_id, + 'conflicts': conflicts, + 'update_conflicts': update_conflicts, + }, + headers={'content-type': 'application/json'}) + # the document might have been updated in between, so we check for the + # return message + msg = response[2].read() + if msg == 'ok': + return + elif msg == 'revision conflict': + raise errors.RevisionConflict() + + def put_doc(self, doc): """ - return CouchSyncTarget(self) + Update a document. - def create_index(self, index_name, *index_expressions): - """ - Create a named index, which can then be queried for future lookups. + If the document currently has conflicts, put will fail. + If the database specifies a maximum document size and the document + exceeds it, put will fail and raise a DocumentTooBig exception. - @param index_name: A unique name which can be used as a key prefix. - @param index_expressions: Index expressions defining the index - information. - """ - if index_name in self._indexes: - if self._indexes[index_name]._definition == list( - index_expressions): - return - raise errors.IndexNameTakenError - index = InMemoryIndex(index_name, list(index_expressions)) - for doc_id in self._database: - if doc_id.startswith(self.U1DB_DATA_DOC_ID_PREFIX): - continue # skip special files - doc = self._get_doc(doc_id) - if doc.content is not None: - index.add_json(doc_id, doc.get_json()) - self._indexes[index_name] = index + :param doc: A Document with new content. + :return: new_doc_rev - The new revision identifier for the document. + The Document object will also be updated. - def close(self): + :raise errors.InvalidDocId: Raised if the document's id is invalid. + :raise errors.DocumentTooBig: Raised if the document size is too big. + :raise errors.ConflictedDoc: Raised if the document has conflicts. """ - Release any resources associated with this database. - - @return: True if db was succesfully closed. - @rtype: bool + if doc.doc_id is None: + raise errors.InvalidDocId() + self._check_doc_id(doc.doc_id) + self._check_doc_size(doc) + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc and old_doc.has_conflicts: + raise errors.ConflictedDoc() + if old_doc and doc.rev is None and old_doc.is_tombstone(): + new_rev = self._allocate_doc_rev(old_doc.rev) + else: + if old_doc is not None: + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + else: + if doc.rev is not None: + raise errors.RevisionConflict() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + self._put_doc(old_doc, doc) + return new_rev + + def whats_changed(self, old_generation=0): """ - # TODO: fix this method so the connection is properly closed and - # test_close (+tearDown, which deletes the db) works without problems. - self._url = None - self._full_commit = None - self._session = None - #self._server = None - self._database = None - return True - - def sync(self, url, creds=None, autocreate=True): + Return a list of documents that have changed since old_generation. + + :param old_generation: The generation of the database in the old + state. + :type old_generation: int + + :return: (generation, trans_id, [(doc_id, generation, trans_id),...]) + The current generation of the database, its associated + transaction id, and a list of of changed documents since + old_generation, represented by tuples with for each document + its doc_id and the generation and transaction id corresponding + to the last intervening change and sorted by generation (old + changes first) + :rtype: (int, str, [(str, int, str)]) """ - Synchronize documents with remote replica exposed at url. + # query a couch list function + res = self._database.resource( + '_design', 'transactions', '_list', 'whats_changed', 'log') + response = res.get_json(old_gen=old_generation) + results = map( + lambda row: + (row['generation'], row['doc_id'], row['transaction_id']), + response[2]['transactions']) + results.reverse() + cur_gen = old_generation + seen = set() + changes = [] + newest_trans_id = '' + for generation, doc_id, trans_id in results: + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + if changes: + cur_gen = changes[0][1] # max generation + newest_trans_id = changes[0][2] + changes.reverse() + else: + cur_gen, newest_trans_id = self._get_generation_info() - @param url: The url of the target replica to sync with. - @type url: str - @param creds: optional dictionary giving credentials. - to authorize the operation with the server. - @type creds: dict - @param autocreate: Ask the target to create the db if non-existent. - @type autocreate: bool + return cur_gen, newest_trans_id, changes - @return: The local generation before the synchronisation was performed. - @rtype: int + def delete_doc(self, doc): """ - return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync( - autocreate=autocreate) + Mark a document as deleted. + + Will abort if the current revision doesn't match doc.rev. + This will also set doc.content to None. - #------------------------------------------------------------------------- - # methods from ObjectStoreDatabase - #------------------------------------------------------------------------- + :param doc: The document to mark as deleted. + :type doc: CouchDocument. - def _init_u1db_data(self): + :raise errors.DocumentDoesNotExist: Raised if the document does not + exist. + :raise errors.RevisionConflict: Raised if the revisions do not match. + :raise errors.DocumentAlreadyDeleted: Raised if the document is + already deleted. + :raise errors.ConflictedDoc: Raised if the doc has conflicts. """ - Initialize u1db configuration data on backend storage. + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc is None: + raise errors.DocumentDoesNotExist + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + if old_doc.is_tombstone(): + raise errors.DocumentAlreadyDeleted + if old_doc.has_conflicts: + raise errors.ConflictedDoc() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + doc.make_tombstone() + self._put_doc(old_doc, doc) + return new_rev + + def _get_conflicts(self, doc_id, couch_rev=None): + """ + Get the conflicted versions of a document. + + If the C{couch_rev} parameter is not None, conflicts for a specific + document's couch revision are returned. - A U1DB database needs to keep track of all database transactions, - document conflicts, the generation of other replicas it has seen, - indexes created by users and so on. + :param couch_rev: The couch document revision. + :type couch_rev: str - In this implementation, all this information is stored in special - documents stored in the underlying with doc_id prefix equal to - U1DB_DATA_DOC_ID_PREFIX. Those documents ids are reserved: put_doc(), - get_doc() and delete_doc() will not allow documents with a doc_id with - that prefix to be accessed or modified. + :return: A list of conflicted versions of the document. + :rtype: list """ - for key in self.U1DB_DATA_KEYS: - doc_id = '%s%s' % (self.U1DB_DATA_DOC_ID_PREFIX, key) - doc = self._get_doc(doc_id) - if doc is None: - doc = self._factory(doc_id) - doc.content = {'content': getattr(self, key)} - self._put_doc(doc) + # request conflicts attachment from server + params = {} + if couch_rev is not None: + params['rev'] = couch_rev # restric document's couch revision + resource = self._database.resource(doc_id, 'u1db_conflicts') + try: + response = resource.get_json(**params) + conflicts = [] + # build the conflicted versions + for doc_rev, content in json.loads(response[2].read()): + doc = self._factory(doc_id, doc_rev) + if content is None: + doc.make_tombstone() + else: + doc.content = content + conflicts.append(doc) + return conflicts + except ResourceNotFound: + return [] - #------------------------------------------------------------------------- - # Couch specific methods - #------------------------------------------------------------------------- + def get_doc_conflicts(self, doc_id): + """ + Get the list of conflicts for the given document. - INDEX_NAME_KEY = 'name' - INDEX_DEFINITION_KEY = 'definition' - INDEX_VALUES_KEY = 'values' + The order of the conflicts is such that the first entry is the value + that would be returned by "get_doc". - def delete_database(self): - """ - Delete a U1DB CouchDB database. + :return: A list of the document entries that are conflicted. + :rtype: [CouchDocument] """ - del(self._server[self._dbname]) + conflict_docs = self._get_conflicts(doc_id) + if len(conflict_docs) == 0: + return [] + this_doc = self._get_doc(doc_id, check_for_conflicts=True) + return [this_doc] + conflict_docs - def _dump_indexes_as_json(self): + def _get_replica_gen_and_trans_id(self, other_replica_uid): """ - Dump index definitions as JSON. + Return the last known generation and transaction id for the other db + replica. + + When you do a synchronization with another replica, the Database keeps + track of what generation the other database replica was at, and what + the associated transaction id was. This is used to determine what data + needs to be sent, and if two databases are claiming to be the same + replica. + + :param other_replica_uid: The identifier for the other replica. + :type other_replica_uid: str + + :return: A tuple containing the generation and transaction id we + encountered during synchronization. If we've never + synchronized with the replica, this is (0, ''). + :rtype: (int, str) """ - indexes = {} - for name, idx in self._indexes.iteritems(): - indexes[name] = {} - for attr in [self.INDEX_NAME_KEY, self.INDEX_DEFINITION_KEY, - self.INDEX_VALUES_KEY]: - indexes[name][attr] = getattr(idx, '_' + attr) - return indexes + # query a couch view + result = self._database.view('syncs/log') + if len(result[other_replica_uid].rows) == 0: + return (0, '') + return ( + result[other_replica_uid].rows[0]['value']['known_generation'], + result[other_replica_uid].rows[0]['value']['known_transaction_id'] + ) + + def _set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + self._do_set_replica_gen_and_trans_id( + other_replica_uid, other_generation, other_transaction_id) - def _load_indexes_from_json(self, indexes): + def _do_set_replica_gen_and_trans_id( + self, other_replica_uid, other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + # query a couch update function + res = self._database.resource( + '_design', 'syncs', '_update', 'put', 'u1db_sync_log') + res.put_json( + body={ + 'other_replica_uid': other_replica_uid, + 'other_generation': other_generation, + 'other_transaction_id': other_transaction_id, + }, + headers={'content-type': 'application/json'}) + + def _add_conflict(self, doc, my_doc_rev, my_content): + """ + Add a conflict to the document. + + Note that this method does not actually update the backend; rather, it + updates the CouchDocument object which will provide the conflict data + when the atomic document update is made. + + :param doc: The document to have conflicts added to. + :type doc: CouchDocument + :param my_doc_rev: The revision of the conflicted document. + :type my_doc_rev: str + :param my_content: The content of the conflicted document as a JSON + serialized string. + :type my_content: str """ - Load index definitions from stored JSON. + doc.ensure_fetch_conflicts(self._get_conflicts) + doc.add_conflict( + self._factory(doc_id=doc.doc_id, rev=my_doc_rev, + json=my_content)) - @param indexes: A JSON representation of indexes as - [('index-name', ['field', 'field2', ...]), ...]. - @type indexes: str + def _delete_conflicts(self, doc, conflict_revs): """ - self._indexes = {} - for name, idx_dict in indexes.iteritems(): - idx = InMemoryIndex(name, idx_dict[self.INDEX_DEFINITION_KEY]) - idx._values = idx_dict[self.INDEX_VALUES_KEY] - self._indexes[name] = idx + Delete the conflicted revisions from the list of conflicts of C{doc}. - def _load_transaction_log_from_json(self, transaction_log): + Note that thie method does not actually update the backed; rather, it + updates the CouchDocument object which will provide the conflict data + when the atomic document update is made. + + :param doc: The document to have conflicts deleted. + :type doc: CouchDocument + :param conflict_revs: A list of the revisions to be deleted. + :param conflict_revs: [str] """ - Load transaction log from stored JSON. + doc.ensure_fetch_conflicts(self._get_conflicts) + doc.delete_conflicts(conflict_revs) - @param transaction_log: A JSON representation of transaction_log as - [('generation', 'transaction_id'), ...]. - @type transaction_log: list + def _prune_conflicts(self, doc, doc_vcr): + """ + Prune conflicts that are older then the current document's revision, or + whose content match to the current document's content. + + :param doc: The document to have conflicts pruned. + :type doc: CouchDocument + :param doc_vcr: A vector clock representing the current document's + revision. + :type doc_vcr: u1db.vectorclock.VectorClock """ - self._transaction_log = [] - for gen, trans_id in transaction_log: - self._transaction_log.append((gen, trans_id)) + if doc.has_conflicts is True: + autoresolved = False + c_revs_to_prune = [] + for c_doc in doc.get_conflicts(): + c_vcr = vectorclock.VectorClockRev(c_doc.rev) + if doc_vcr.is_newer(c_vcr): + c_revs_to_prune.append(c_doc.rev) + elif doc.same_content_as(c_doc): + c_revs_to_prune.append(c_doc.rev) + doc_vcr.maximize(c_vcr) + autoresolved = True + if autoresolved: + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._delete_conflicts(doc, c_revs_to_prune) + + def _force_doc_sync_conflict(self, doc): + """ + Add a conflict and force a document put. - def _load_other_generations_from_json(self, other_generations): + :param doc: The document to be put. + :type doc: CouchDocument """ - Load other generations from stored JSON. + my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + self._add_conflict(doc, my_doc.rev, my_doc.get_json()) + doc.has_conflicts = True + self._put_doc(my_doc, doc) - @param other_generations: A JSON representation of other_generations - as {'replica_uid': ('generation', 'transaction_id'), ...}. - @type other_generations: dict + def resolve_doc(self, doc, conflicted_doc_revs): + """ + Mark a document as no longer conflicted. + + We take the list of revisions that the client knows about that it is + superseding. This may be a different list from the actual current + conflicts, in which case only those are removed as conflicted. This + may fail if the conflict list is significantly different from the + supplied information. (sync could have happened in the background from + the time you GET_DOC_CONFLICTS until the point where you RESOLVE) + + :param doc: A Document with the new content to be inserted. + :type doc: CouchDocument + :param conflicted_doc_revs: A list of revisions that the new content + supersedes. + :type conflicted_doc_revs: [str] """ - self._other_generations = {} - for replica_uid, [gen, trans_id] in other_generations.iteritems(): - self._other_generations[replica_uid] = (gen, trans_id) + cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + new_rev = self._ensure_maximal_rev(cur_doc.rev, + conflicted_doc_revs) + superseded_revs = set(conflicted_doc_revs) + doc.rev = new_rev + if cur_doc.rev in superseded_revs: + self._delete_conflicts(doc, superseded_revs) + self._put_doc(cur_doc, doc) + else: + self._add_conflict(doc, new_rev, doc.get_json()) + self._delete_conflicts(doc, superseded_revs) + # perform request to resolve document in server + resource = self._database.resource( + '_design', 'docs', '_update', 'resolve_doc', doc.doc_id) + conflicts = None + if doc.has_conflicts: + conflicts = binascii.b2a_base64( + json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + )[:-1] # exclude \n + response = resource.put_json( + body={ + 'couch_rev': cur_doc.couch_rev, + 'conflicts': conflicts, + }, + headers={'content-type': 'application/json'}) + + def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, + replica_trans_id=''): + """ + Insert/update document into the database with a given revision. + + This api is used during synchronization operations. + + If a document would conflict and save_conflict is set to True, the + content will be selected as the 'current' content for doc.doc_id, + even though doc.rev doesn't supersede the currently stored revision. + The currently stored document will be added to the list of conflict + alternatives for the given doc_id. + + This forces the new content to be 'current' so that we get convergence + after synchronizing, even if people don't resolve conflicts. Users can + then notice that their content is out of date, update it, and + synchronize again. (The alternative is that users could synchronize and + think the data has propagated, but their local copy looks fine, and the + remote copy is never updated again.) + + :param doc: A document object + :type doc: CouchDocument + :param save_conflict: If this document is a conflict, do you want to + save it as a conflict, or just ignore it. + :type save_conflict: bool + :param replica_uid: A unique replica identifier. + :type replica_uid: str + :param replica_gen: The generation of the replica corresponding to the + this document. The replica arguments are optional, + but are used during synchronization. + :type replica_gen: int + :param replica_trans_id: The transaction_id associated with the + generation. + :type replica_trans_id: str + + :return: (state, at_gen) - If we don't have doc_id already, or if + doc_rev supersedes the existing document revision, then the + content will be inserted, and state is 'inserted'. If + doc_rev is less than or equal to the existing revision, then + the put is ignored and state is respecitvely 'superseded' or + 'converged'. If doc_rev is not strictly superseded or + supersedes, then state is 'conflicted'. The document will not + be inserted if save_conflict is False. For 'inserted' or + 'converged', at_gen is the insertion/current generation. + :rtype: (str, int) + """ + cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + # at this point, `doc` has arrived from the other syncing party, and + # we will decide what to do with it. + # First, we prepare the arriving doc to update couch database. + old_doc = doc + doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) + if cur_doc is not None: + doc.couch_rev = cur_doc.couch_rev + # fetch conflicts because we will eventually manipulate them + doc.ensure_fetch_conflicts(self._get_conflicts) + # from now on, it works just like u1db sqlite backend + doc_vcr = vectorclock.VectorClockRev(doc.rev) + if cur_doc is None: + cur_vcr = vectorclock.VectorClockRev(None) + else: + cur_vcr = vectorclock.VectorClockRev(cur_doc.rev) + self._validate_source(replica_uid, replica_gen, replica_trans_id) + if doc_vcr.is_newer(cur_vcr): + rev = doc.rev + self._prune_conflicts(doc, doc_vcr) + if doc.rev != rev: + # conflicts have been autoresolved + state = 'superseded' + else: + state = 'inserted' + self._put_doc(cur_doc, doc) + elif doc.rev == cur_doc.rev: + # magical convergence + state = 'converged' + elif cur_vcr.is_newer(doc_vcr): + # Don't add this to seen_ids, because we have something newer, + # so we should send it back, and we should not generate a + # conflict + state = 'superseded' + elif cur_doc.same_content_as(doc): + # the documents have been edited to the same thing at both ends + doc_vcr.maximize(cur_vcr) + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._put_doc(cur_doc, doc) + state = 'superseded' + else: + state = 'conflicted' + if save_conflict: + self._force_doc_sync_conflict(doc) + if replica_uid is not None and replica_gen is not None: + self._do_set_replica_gen_and_trans_id( + replica_uid, replica_gen, replica_trans_id) + # update info + old_doc.rev = doc.rev + if doc.is_tombstone(): + old_doc.is_tombstone() + else: + old_doc.content = doc.content + old_doc.has_conflicts = doc.has_conflicts + return state, self._get_generation() -class CouchSyncTarget(ObjectStoreSyncTarget): +class CouchSyncTarget(CommonSyncTarget): """ Functionality for using a CouchDatabase as a synchronization target. """ - pass + + def get_sync_info(self, source_replica_uid): + source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( + source_replica_uid) + my_gen, my_trans_id = self._db._get_generation_info() + return ( + self._db._replica_uid, my_gen, my_trans_id, source_gen, + source_trans_id) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + if self._trace_hook: + self._trace_hook('record_sync_info') + self._db._set_replica_gen_and_trans_id( + source_replica_uid, source_replica_generation, + source_replica_transaction_id) class NotEnoughCouchPermissions(Exception): @@ -527,12 +1004,12 @@ class CouchServerState(ServerState): """ Initialize the couch server state. - @param couch_url: The URL for the couch database. - @type couch_url: str - @param shared_db_name: The name of the shared database. - @type shared_db_name: str - @param tokens_db_name: The name of the tokens database. - @type tokens_db_name: str + :param couch_url: The URL for the couch database. + :type couch_url: str + :param shared_db_name: The name of the shared database. + :type shared_db_name: str + :param tokens_db_name: The name of the tokens database. + :type tokens_db_name: str """ self._couch_url = couch_url self._shared_db_name = shared_db_name @@ -563,12 +1040,12 @@ class CouchServerState(ServerState): couch library to ensure that Soledad Server can do everything it needs on the underlying couch database. - @param couch_url: The URL of the couch database. - @type couch_url: str + :param couch_url: The URL of the couch database. + :type couch_url: str @raise NotEnoughCouchPermissions: Raised in case there are not enough permissions to read/write/create the needed couch databases. - @rtype: bool + :rtype: bool """ def _open_couch_db(dbname): @@ -601,11 +1078,11 @@ class CouchServerState(ServerState): """ Open a couch database. - @param dbname: The name of the database to open. - @type dbname: str + :param dbname: The name of the database to open. + :type dbname: str - @return: The CouchDatabase object. - @rtype: CouchDatabase + :return: The CouchDatabase object. + :rtype: CouchDatabase """ # TODO: open couch return CouchDatabase.open_database( @@ -616,11 +1093,11 @@ class CouchServerState(ServerState): """ Ensure couch database exists. - @param dbname: The name of the database to ensure. - @type dbname: str + :param dbname: The name of the database to ensure. + :type dbname: str - @return: The CouchDatabase object and the replica uid. - @rtype: (CouchDatabase, str) + :return: The CouchDatabase object and the replica uid. + :rtype: (CouchDatabase, str) """ db = CouchDatabase.open_database( self._couch_url + '/' + dbname, @@ -631,8 +1108,8 @@ class CouchServerState(ServerState): """ Delete couch database. - @param dbname: The name of the database to delete. - @type dbname: str + :param dbname: The name of the database to delete. + :type dbname: str """ CouchDatabase.delete_database(self._couch_url + '/' + dbname) @@ -640,8 +1117,8 @@ class CouchServerState(ServerState): """ Set the couchdb URL - @param url: CouchDB URL - @type url: str + :param url: CouchDB URL + :type url: str """ self._couch_url = url @@ -649,7 +1126,7 @@ class CouchServerState(ServerState): """ Return CouchDB URL - @rtype: str + :rtype: str """ return self._couch_url |