diff options
Diffstat (limited to 'src/leap/soledad/common')
29 files changed, 6138 insertions, 0 deletions
diff --git a/src/leap/soledad/common/README.txt b/src/leap/soledad/common/README.txt new file mode 100644 index 00000000..0a252650 --- /dev/null +++ b/src/leap/soledad/common/README.txt @@ -0,0 +1,70 @@ +Soledad common package +====================== + +This package contains Soledad bits used by both server and client. + +Couch L2DB Backend +------------------ + +L2DB backends rely on some atomic operations that modify documents contents +and metadata (conflicts, transaction ids and indexes). The only atomic +operation in Couch is a document put, so every u1db atomic operation has to be +mapped to a couch document put. + +The atomic operations in the U1DB SQLite reference backend implementation may +be identified by the use of a context manager to access the underlying +database. A listing of the methods involved in each atomic operation are +depiced below. The top-level elements correpond to the atomic operations that +have to be mapped, and items on deeper levels of the list have to be +implemented in a way that all changes will be pushed with just one operation. + + * _set_replica_uid + * put_doc: + * _get_doc + * _put_and_update_indexes + * insert/update the document + * insert into transaction log + * delete_doc + * _get_doc + * _put_and_update_indexes + * get_doc_conflicts + * _get_conflicts + * _set_replica_gen_and_trans_id + * _do_set_replica_gen_and_trans_id + * _put_doc_if_newer + * _get_doc + * _validate_source (**) + * _get_replica_gen_and_trans_id + * cases: + * is newer: + * _prune_conflicts (**) + * _has_conflicts + * _delete_conflicts + * _put_and_update_indexes + * same content as: + * _put_and_update_indexes + * conflicted: + * _force_doc_sync_conflict + * _prune_conflicts + * _add_conflict + * _put_and_update_indexes + * _do_set_replica_gen_and_trans_id + * resolve_doc + * _get_doc + * cases: + * doc is superseded + * _put_and_update_indexes + * else + * _add_conflict + * _delete_conflicts + * delete_index + * create_index + +Notes: + + * Currently, the couch backend does not implement indexing, so what is + depicted as `_put_and_update_indexes` above will be found as `_put_doc` in + the backend. + + * Conflict updates are part of document put using couch update functions, + and as such are part of the same atomic operation as document put. diff --git a/src/leap/soledad/common/__init__.py b/src/leap/soledad/common/__init__.py new file mode 100644 index 00000000..4948ad20 --- /dev/null +++ b/src/leap/soledad/common/__init__.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from leap.common.check import leap_assert as soledad_assert +from leap.common.check import leap_assert_type as soledad_assert_type + +from ._version import get_versions + +""" +Soledad routines common to client and server. +""" + + +# +# Global constants +# + +SHARED_DB_NAME = 'shared' + + +# +# Global functions +# + +__version__ = get_versions()['version'] +del get_versions + + +__all__ = [ + "soledad_assert", + "soledad_assert_type", + "__version__", +] diff --git a/src/leap/soledad/common/backend.py b/src/leap/soledad/common/backend.py new file mode 100644 index 00000000..4a29ca87 --- /dev/null +++ b/src/leap/soledad/common/backend.py @@ -0,0 +1,642 @@ +# -*- coding: utf-8 -*- +# backend.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +"""A L2DB generic backend.""" + +import functools + +from leap.soledad.common.document import ServerDocument +from leap.soledad.common.l2db import vectorclock +from leap.soledad.common.l2db.errors import ( + RevisionConflict, + InvalidDocId, + ConflictedDoc, + DocumentDoesNotExist, + DocumentAlreadyDeleted, +) +from leap.soledad.common.l2db.backends import CommonBackend +from leap.soledad.common.l2db.backends import CommonSyncTarget + + +class SoledadBackend(CommonBackend): + BATCH_SUPPORT = False + + """ + A L2DB backend implementation. + """ + + def __init__(self, database, replica_uid=None): + """ + Create a new backend. + + :param database: the database implementation + :type database: Database + :param replica_uid: an optional unique replica identifier + :type replica_uid: str + """ + # save params + self._factory = ServerDocument + self._real_replica_uid = None + self._cache = None + self._dbname = database._dbname + self._database = database + self.batching = False + if replica_uid is not None: + self._set_replica_uid(replica_uid) + + def batch_start(self): + if not self.BATCH_SUPPORT: + return + self.batching = True + self.after_batch_callbacks = {} + self._database.batch_start() + if not self._cache: + # batching needs cache + self._cache = {} + self._get_generation() # warm up gen info + + def batch_end(self): + if not self.BATCH_SUPPORT: + return + self._database.batch_end() + self.batching = False + for name in self.after_batch_callbacks: + self.after_batch_callbacks[name]() + self.after_batch_callbacks = None + + @property + def cache(self): + if self._cache is not None: + return self._cache + else: + return {} + + def init_caching(self, cache): + """ + Start using cache by setting internal _cache attribute. + + :param cache: the cache instance, anything that behaves like a dict + :type cache: dict + """ + self._cache = cache + + def get_sync_target(self): + """ + Return a SyncTarget object, for another u1db to synchronize with. + + :return: The sync target. + :rtype: SoledadSyncTarget + """ + return SoledadSyncTarget(self) + + def delete_database(self): + """ + Delete a U1DB database. + """ + self._database.delete_database() + + def close(self): + """ + Release any resources associated with this database. + + :return: True if db was succesfully closed. + :rtype: bool + """ + self._database.close() + return True + + def __del__(self): + """ + Close the database upon garbage collection. + """ + self.close() + + def _set_replica_uid(self, replica_uid): + """ + Force the replica uid to be set. + + :param replica_uid: The new replica uid. + :type replica_uid: str + """ + self._database.set_replica_uid(replica_uid) + self._real_replica_uid = replica_uid + self.cache['replica_uid'] = self._real_replica_uid + + def _get_replica_uid(self): + """ + Get the replica uid. + + :return: The replica uid. + :rtype: str + """ + if self._real_replica_uid is not None: + self.cache['replica_uid'] = self._real_replica_uid + return self._real_replica_uid + if 'replica_uid' in self.cache: + return self.cache['replica_uid'] + self._real_replica_uid = self._database.get_replica_uid() + self._set_replica_uid(self._real_replica_uid) + return self._real_replica_uid + + _replica_uid = property(_get_replica_uid, _set_replica_uid) + + replica_uid = property(_get_replica_uid) + + def _get_generation(self): + """ + Return the current generation. + + :return: The current generation. + :rtype: int + + :raise SoledadError: Raised by database on operation failure + """ + return self._get_generation_info()[0] + + def _get_generation_info(self): + """ + Return the current generation. + + :return: A tuple containing the current generation and transaction id. + :rtype: (int, str) + + :raise SoledadError: Raised by database on operation failure + """ + cur_gen, newest_trans_id = self._database.get_generation_info() + return (cur_gen, newest_trans_id) + + def _get_trans_id_for_gen(self, generation): + """ + Get the transaction id corresponding to a particular generation. + + :param generation: The generation for which to get the transaction id. + :type generation: int + + :return: The transaction id for C{generation}. + :rtype: str + + :raise InvalidGeneration: Raised when the generation does not exist. + + """ + return self._database.get_trans_id_for_gen(generation) + + def _get_transaction_log(self): + """ + This is only for the test suite, it is not part of the api. + + :return: The complete transaction log. + :rtype: [(str, str)] + + """ + return self._database.get_transaction_log() + + def _get_doc(self, doc_id, check_for_conflicts=False): + """ + Extract the document from storage. + + This can return None if the document doesn't exist. + + :param doc_id: The unique document identifier + :type doc_id: str + :param check_for_conflicts: If set to False, then the conflict check + will be skipped. + :type check_for_conflicts: bool + + :return: The document. + :rtype: ServerDocument + """ + return self._database.get_doc(doc_id, check_for_conflicts) + + def get_doc(self, doc_id, include_deleted=False): + """ + Get the JSON string for the given document. + + :param doc_id: The unique document identifier + :type doc_id: str + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise asking for a deleted + document will return None. + :type include_deleted: bool + + :return: A document object. + :rtype: ServerDocument. + """ + doc = self._get_doc(doc_id, check_for_conflicts=True) + if doc is None: + return None + if doc.is_tombstone() and not include_deleted: + return None + return doc + + def get_all_docs(self, include_deleted=False): + """ + Get the JSON content for all documents in the database. + + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :type include_deleted: bool + + :return: (generation, [ServerDocument]) + The current generation of the database, followed by a list of all + the documents in the database. + :rtype: (int, [ServerDocument]) + """ + return self._database.get_all_docs(include_deleted) + + def _put_doc(self, old_doc, doc): + """ + Put the document in the backend database. + + Note that C{old_doc} must have been fetched with the parameter + C{check_for_conflicts} equal to True, so we can properly update the + new document using the conflict information from the old one. + + :param old_doc: The old document version. + :type old_doc: ServerDocument + :param doc: The document to be put. + :type doc: ServerDocument + """ + self._database.save_document(old_doc, doc, + self._allocate_transaction_id()) + + def put_doc(self, doc): + """ + Update a document. + + If the document currently has conflicts, put will fail. + If the database specifies a maximum document size and the document + exceeds it, put will fail and raise a DocumentTooBig exception. + + :param doc: A Document with new content. + :return: new_doc_rev - The new revision identifier for the document. + The Document object will also be updated. + + :raise InvalidDocId: Raised if the document's id is invalid. + :raise DocumentTooBig: Raised if the document size is too big. + :raise ConflictedDoc: Raised if the document has conflicts. + """ + if doc.doc_id is None: + raise InvalidDocId() + self._check_doc_id(doc.doc_id) + self._check_doc_size(doc) + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc and old_doc.has_conflicts: + raise ConflictedDoc() + if old_doc and doc.rev is None and old_doc.is_tombstone(): + new_rev = self._allocate_doc_rev(old_doc.rev) + else: + if old_doc is not None: + if old_doc.rev != doc.rev: + raise RevisionConflict() + else: + if doc.rev is not None: + raise RevisionConflict() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + self._put_doc(old_doc, doc) + return new_rev + + def whats_changed(self, old_generation=0): + """ + Return a list of documents that have changed since old_generation. + + :param old_generation: The generation of the database in the old + state. + :type old_generation: int + + :return: (generation, trans_id, [(doc_id, generation, trans_id),...]) + The current generation of the database, its associated + transaction id, and a list of of changed documents since + old_generation, represented by tuples with for each document + its doc_id and the generation and transaction id corresponding + to the last intervening change and sorted by generation (old + changes first) + :rtype: (int, str, [(str, int, str)]) + """ + return self._database.whats_changed(old_generation) + + def delete_doc(self, doc): + """ + Mark a document as deleted. + + Will abort if the current revision doesn't match doc.rev. + This will also set doc.content to None. + + :param doc: The document to mark as deleted. + :type doc: ServerDocument. + + :raise DocumentDoesNotExist: Raised if the document does not + exist. + :raise RevisionConflict: Raised if the revisions do not match. + :raise DocumentAlreadyDeleted: Raised if the document is + already deleted. + :raise ConflictedDoc: Raised if the doc has conflicts. + """ + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc is None: + raise DocumentDoesNotExist + if old_doc.rev != doc.rev: + raise RevisionConflict() + if old_doc.is_tombstone(): + raise DocumentAlreadyDeleted + if old_doc.has_conflicts: + raise ConflictedDoc() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + doc.make_tombstone() + self._put_doc(old_doc, doc) + return new_rev + + def get_doc_conflicts(self, doc_id): + """ + Get the conflicted versions of a document. + + :param doc_id: The document id. + :type doc_id: str + + :return: A list of conflicted versions of the document. + :rtype: list + """ + return self._database.get_doc_conflicts(doc_id) + + def _get_replica_gen_and_trans_id(self, other_replica_uid): + """ + Return the last known generation and transaction id for the other db + replica. + + When you do a synchronization with another replica, the Database keeps + track of what generation the other database replica was at, and what + the associated transaction id was. This is used to determine what data + needs to be sent, and if two databases are claiming to be the same + replica. + + :param other_replica_uid: The identifier for the other replica. + :type other_replica_uid: str + + :return: A tuple containing the generation and transaction id we + encountered during synchronization. If we've never + synchronized with the replica, this is (0, ''). + :rtype: (int, str) + """ + if other_replica_uid in self.cache: + return self.cache[other_replica_uid] + gen, trans_id = \ + self._database.get_replica_gen_and_trans_id(other_replica_uid) + self.cache[other_replica_uid] = (gen, trans_id) + return (gen, trans_id) + + def _set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + if other_replica_uid is not None and other_generation is not None: + self.cache[other_replica_uid] = (other_generation, + other_transaction_id) + self._database.set_replica_gen_and_trans_id(other_replica_uid, + other_generation, + other_transaction_id) + + def _do_set_replica_gen_and_trans_id( + self, other_replica_uid, other_generation, other_transaction_id): + """ + _put_doc_if_newer from super class is calling it. So we declare this. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + args = [other_replica_uid, other_generation, other_transaction_id] + callback = functools.partial(self._set_replica_gen_and_trans_id, *args) + if self.batching: + self.after_batch_callbacks['set_source_info'] = callback + else: + callback() + + def _force_doc_sync_conflict(self, doc): + """ + Add a conflict and force a document put. + + :param doc: The document to be put. + :type doc: ServerDocument + """ + my_doc = self._get_doc(doc.doc_id) + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + doc.add_conflict(self._factory(doc.doc_id, my_doc.rev, + my_doc.get_json())) + doc.has_conflicts = True + self._put_doc(my_doc, doc) + + def resolve_doc(self, doc, conflicted_doc_revs): + """ + Mark a document as no longer conflicted. + + We take the list of revisions that the client knows about that it is + superseding. This may be a different list from the actual current + conflicts, in which case only those are removed as conflicted. This + may fail if the conflict list is significantly different from the + supplied information. (sync could have happened in the background from + the time you GET_DOC_CONFLICTS until the point where you RESOLVE) + + :param doc: A Document with the new content to be inserted. + :type doc: ServerDocument + :param conflicted_doc_revs: A list of revisions that the new content + supersedes. + :type conflicted_doc_revs: [str] + + :raise SoledadError: Raised by database on operation failure + """ + cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + new_rev = self._ensure_maximal_rev(cur_doc.rev, + conflicted_doc_revs) + superseded_revs = set(conflicted_doc_revs) + doc.rev = new_rev + # this backend stores conflicts as properties of the documents, so we + # have to copy these conflicts over to the document being updated. + if cur_doc.rev in superseded_revs: + # the newer doc version will supersede the one in the database, so + # we copy conflicts before updating the backend. + doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over. + doc.delete_conflicts(superseded_revs) + self._put_doc(cur_doc, doc) + else: + # the newer doc version does not supersede the one in the + # database, so we will add a conflict to the database and copy + # those over to the document the user has in her hands. + cur_doc.add_conflict(doc) + cur_doc.delete_conflicts(superseded_revs) + self._put_doc(cur_doc, cur_doc) # just update conflicts + # backend has been updated with current conflicts, now copy them + # to the current document. + doc.set_conflicts(cur_doc.get_conflicts()) + + def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, + replica_trans_id='', number_of_docs=None, + doc_idx=None, sync_id=None): + """ + Insert/update document into the database with a given revision. + + This api is used during synchronization operations. + + If a document would conflict and save_conflict is set to True, the + content will be selected as the 'current' content for doc.doc_id, + even though doc.rev doesn't supersede the currently stored revision. + The currently stored document will be added to the list of conflict + alternatives for the given doc_id. + + This forces the new content to be 'current' so that we get convergence + after synchronizing, even if people don't resolve conflicts. Users can + then notice that their content is out of date, update it, and + synchronize again. (The alternative is that users could synchronize and + think the data has propagated, but their local copy looks fine, and the + remote copy is never updated again.) + + :param doc: A document object + :type doc: ServerDocument + :param save_conflict: If this document is a conflict, do you want to + save it as a conflict, or just ignore it. + :type save_conflict: bool + :param replica_uid: A unique replica identifier. + :type replica_uid: str + :param replica_gen: The generation of the replica corresponding to the + this document. The replica arguments are optional, + but are used during synchronization. + :type replica_gen: int + :param replica_trans_id: The transaction_id associated with the + generation. + :type replica_trans_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param doc_idx: The index of the current document being sent. + :type doc_idx: int + :param sync_id: The id of the current sync session. + :type sync_id: str + + :return: (state, at_gen) - If we don't have doc_id already, or if + doc_rev supersedes the existing document revision, then the + content will be inserted, and state is 'inserted'. If + doc_rev is less than or equal to the existing revision, then + the put is ignored and state is respecitvely 'superseded' or + 'converged'. If doc_rev is not strictly superseded or + supersedes, then state is 'conflicted'. The document will not + be inserted if save_conflict is False. For 'inserted' or + 'converged', at_gen is the insertion/current generation. + :rtype: (str, int) + """ + if not isinstance(doc, ServerDocument): + doc = self._factory(doc.doc_id, doc.rev, doc.get_json()) + my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if my_doc: + doc.set_conflicts(my_doc.get_conflicts()) + return CommonBackend._put_doc_if_newer(self, doc, save_conflict, + replica_uid, replica_gen, + replica_trans_id) + + def _put_and_update_indexes(self, cur_doc, doc): + self._put_doc(cur_doc, doc) + + def get_docs(self, doc_ids, check_for_conflicts=True, + include_deleted=False, read_content=True): + """ + Get the JSON content for many documents. + + :param doc_ids: A list of document identifiers or None for all. + :type doc_ids: list + :param check_for_conflicts: If set to False, then the conflict check + will be skipped, and 'None' will be + returned instead of True/False. + :type check_for_conflicts: bool + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :return: iterable giving the Document object for each document id + in matching doc_ids order. + :rtype: iterable + """ + return self._database.get_docs(doc_ids, check_for_conflicts, + include_deleted, read_content) + + def _prune_conflicts(self, doc, doc_vcr): + """ + Prune conflicts that are older then the current document's revision, or + whose content match to the current document's content. + Originally in u1db.CommonBackend + + :param doc: The document to have conflicts pruned. + :type doc: ServerDocument + :param doc_vcr: A vector clock representing the current document's + revision. + :type doc_vcr: u1db.vectorclock.VectorClock + """ + if doc.has_conflicts: + autoresolved = False + c_revs_to_prune = [] + for c_doc in doc._conflicts: + c_vcr = vectorclock.VectorClockRev(c_doc.rev) + if doc_vcr.is_newer(c_vcr): + c_revs_to_prune.append(c_doc.rev) + elif doc.same_content_as(c_doc): + c_revs_to_prune.append(c_doc.rev) + doc_vcr.maximize(c_vcr) + autoresolved = True + if autoresolved: + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + doc.delete_conflicts(c_revs_to_prune) + + +class SoledadSyncTarget(CommonSyncTarget): + + """ + Functionality for using a SoledadBackend as a synchronization target. + """ + + def get_sync_info(self, source_replica_uid): + source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( + source_replica_uid) + my_gen, my_trans_id = self._db._get_generation_info() + return ( + self._db._replica_uid, my_gen, my_trans_id, source_gen, + source_trans_id) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + if self._trace_hook: + self._trace_hook('record_sync_info') + self._db._set_replica_gen_and_trans_id( + source_replica_uid, source_replica_generation, + source_replica_transaction_id) diff --git a/src/leap/soledad/common/command.py b/src/leap/soledad/common/command.py new file mode 100644 index 00000000..66aa6b7a --- /dev/null +++ b/src/leap/soledad/common/command.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# command.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Utility to sanitize and run shell commands. +""" + + +import subprocess + + +def exec_validated_cmd(cmd, argument, validator=None): + """ + Executes cmd, validating argument with a validator function. + + :param cmd: command. + :type dbname: str + :param argument: argument. + :type argument: str + :param validator: optional function to validate argument + :type validator: function + + :return: exit code and stdout or stderr (if code != 0) + :rtype: (int, str) + """ + if validator and not validator(argument): + return 1, "invalid argument" + command = cmd.split(' ') + command.append(argument) + try: + process = subprocess.Popen(command, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except OSError as e: + return 1, e + (out, err) = process.communicate() + code = process.wait() + if code is not 0: + return code, err + else: + return code, out diff --git a/src/leap/soledad/common/couch/__init__.py b/src/leap/soledad/common/couch/__init__.py new file mode 100644 index 00000000..2343e849 --- /dev/null +++ b/src/leap/soledad/common/couch/__init__.py @@ -0,0 +1,812 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +"""A U1DB backend that uses CouchDB as its persistence layer.""" + + +import json +import copy +import re +import uuid +import binascii + +from six import StringIO +from six.moves.urllib.parse import urljoin +from contextlib import contextmanager + + +from couchdb.client import Server, Database +from couchdb.http import ( + ResourceConflict, + ResourceNotFound, + Session, + urljoin as couch_urljoin, + Resource, +) +from leap.soledad.common.l2db.errors import ( + DatabaseDoesNotExist, + InvalidGeneration, + RevisionConflict, +) +from leap.soledad.common.l2db.remote import http_app + + +from .support import MultipartWriter +from leap.soledad.common.errors import InvalidURLError +from leap.soledad.common.document import ServerDocument +from leap.soledad.common.backend import SoledadBackend + + +COUCH_TIMEOUT = 120 # timeout for transfers between Soledad server and Couch + + +def list_users_dbs(couch_url): + """ + Retrieves a list with all databases that starts with 'user-' on CouchDB. + Those databases belongs to users. So, the list will contain all the + database names in the form of 'user-{uuid4}'. + + :param couch_url: The couch url with needed credentials + :type couch_url: str + + :return: The list of all database names from users. + :rtype: [str] + """ + with couch_server(couch_url) as server: + users = [dbname for dbname in server if dbname.startswith('user-')] + return users + + +# monkey-patch the u1db http app to use ServerDocument +http_app.Document = ServerDocument + + +@contextmanager +def couch_server(url): + """ + Provide a connection to a couch server and cleanup after use. + + For database creation and deletion we use an ephemeral connection to the + couch server. That connection has to be properly closed, so we provide it + as a context manager. + + :param url: The URL of the Couch server. + :type url: str + """ + session = Session(timeout=COUCH_TIMEOUT) + server = Server(url=url, full_commit=False, session=session) + yield server + + +def _get_gen_doc_id(gen): + return 'gen-%s' % str(gen).zfill(10) + + +GENERATION_KEY = 'gen' +TRANSACTION_ID_KEY = 'trans_id' +REPLICA_UID_KEY = 'replica_uid' +DOC_ID_KEY = 'doc_id' +SCHEMA_VERSION_KEY = 'schema_version' + +CONFIG_DOC_ID = '_local/config' +SYNC_DOC_ID_PREFIX = '_local/sync_' +SCHEMA_VERSION = 1 + + +class CouchDatabase(object): + """ + Holds CouchDB related code. + This class gives methods to encapsulate database operations and hide + CouchDB details from backend code. + """ + + @classmethod + def open_database(cls, url, create, replica_uid=None, + database_security=None): + """ + Open a U1DB database using CouchDB as backend. + + :param url: the url of the database replica + :type url: str + :param create: should the replica be created if it does not exist? + :type create: bool + :param replica_uid: an optional unique replica identifier + :type replica_uid: str + :param database_security: security rules as CouchDB security doc + :type database_security: dict + + :return: the database instance + :rtype: SoledadBackend + + :raise DatabaseDoesNotExist: Raised if database does not exist. + """ + # get database from url + m = re.match('(^https?://[^/]+)/(.+)$', url) + if not m: + raise InvalidURLError + url = m.group(1) + dbname = m.group(2) + with couch_server(url) as server: + if dbname not in server: + if create: + server.create(dbname) + else: + raise DatabaseDoesNotExist() + db = cls(url, dbname, ensure_security=create, + database_security=database_security) + return SoledadBackend( + db, replica_uid=replica_uid) + + def __init__(self, url, dbname, ensure_security=False, + database_security=None): + """ + :param url: Couch server URL with necessary credentials + :type url: string + :param dbname: Couch database name + :type dbname: string + :param ensure_security: will PUT a _security ddoc if set + :type ensure_security: bool + :param database_security: security rules as CouchDB security doc + :type database_security: dict + """ + self._session = Session(timeout=COUCH_TIMEOUT) + self._url = url + self._dbname = dbname + self._database = self.get_couch_database(url, dbname) + self.batching = False + self.batch_generation = None + self.batch_docs = {} + if ensure_security: + self.ensure_security_ddoc(database_security) + + def batch_start(self): + self.batching = True + self.batch_generation = self.get_generation_info() + ids = set(row.id for row in self._database.view('_all_docs')) + self.batched_ids = ids + + def batch_end(self): + self.batching = False + self.batch_generation = None + self.__perform_batch() + + def get_couch_database(self, url, dbname): + """ + Generate a couchdb.Database instance given a url and dbname. + + :param url: CouchDB's server url with credentials + :type url: str + :param dbname: Database name + :type dbname: str + + :return: couch library database instance + :rtype: couchdb.Database + + :raise DatabaseDoesNotExist: Raised if database does not exist. + """ + try: + return Database( + urljoin(url, dbname), + self._session) + except ResourceNotFound: + raise DatabaseDoesNotExist() + + def ensure_security_ddoc(self, security_config=None): + """ + Make sure that only soledad user is able to access this database as + an unprivileged member, meaning that administration access will + be forbidden even inside an user database. + The goal is to make sure that only the lowest access level is given + to the unprivileged CouchDB user set on the server process. + This is achieved by creating a _security design document, see: + http://docs.couchdb.org/en/latest/api/database/security.html + + :param security_config: security configuration parsed from conf file + :type security_config: dict + """ + security_config = security_config or {} + security = self._database.resource.get_json('_security')[2] + security['members'] = {'names': [], 'roles': []} + security['members']['names'] = security_config.get('members', + ['soledad']) + security['members']['roles'] = security_config.get('members_roles', []) + security['admins'] = {'names': [], 'roles': []} + security['admins']['names'] = security_config.get('admins', []) + security['admins']['roles'] = security_config.get('admins_roles', []) + self._database.resource.put_json('_security', body=security) + + def delete_database(self): + """ + Delete a U1DB CouchDB database. + """ + with couch_server(self._url) as server: + del(server[self._dbname]) + + def set_replica_uid(self, replica_uid): + """ + Force the replica uid to be set. + + :param replica_uid: The new replica uid. + :type replica_uid: str + """ + try: + # set on existent config document + doc = self._database[CONFIG_DOC_ID] + doc[REPLICA_UID_KEY] = replica_uid + except ResourceNotFound: + # or create the config document + doc = { + '_id': CONFIG_DOC_ID, + REPLICA_UID_KEY: replica_uid, + SCHEMA_VERSION_KEY: SCHEMA_VERSION, + } + self._database.save(doc) + + def get_replica_uid(self): + """ + Get the replica uid. + + :return: The replica uid. + :rtype: str + """ + try: + # grab replica_uid from server + doc = self._database[CONFIG_DOC_ID] + replica_uid = doc[REPLICA_UID_KEY] + return replica_uid + except ResourceNotFound: + # create a unique replica_uid + replica_uid = uuid.uuid4().hex + self.set_replica_uid(replica_uid) + return replica_uid + + def close(self): + self._database = None + + def get_all_docs(self, include_deleted=False): + """ + Get the JSON content for all documents in the database. + + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :type include_deleted: bool + + :return: (generation, [ServerDocument]) + The current generation of the database, followed by a list of all + the documents in the database. + :rtype: (int, [ServerDocument]) + """ + + generation, _ = self.get_generation_info() + results = list( + self.get_docs(None, True, include_deleted)) + return (generation, results) + + def get_docs(self, doc_ids, check_for_conflicts=True, + include_deleted=False, read_content=True): + """ + Get the JSON content for many documents. + + Use couch's `_all_docs` view to get the documents indicated in + `doc_ids`, + + :param doc_ids: A list of document identifiers or None for all. + :type doc_ids: list + :param check_for_conflicts: If set to False, then the conflict check + will be skipped, and 'None' will be + returned instead of True/False. + :type check_for_conflicts: bool + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + + :return: iterable giving the Document object for each document id + in matching doc_ids order. + :rtype: iterable + """ + params = {'include_docs': 'true', 'attachments': 'false'} + if doc_ids is not None: + params['keys'] = doc_ids + view = self._database.view("_all_docs", **params) + for row in view.rows: + result = copy.deepcopy(row['doc']) + for file_name in result.get('_attachments', {}).keys(): + data = self._database.get_attachment(result, file_name) + if data: + if read_content: + data = data.read() + result['_attachments'][file_name] = {'data': data} + doc = self.__parse_doc_from_couch( + result, result['_id'], + check_for_conflicts=check_for_conflicts, decode=False) + # filter out non-u1db or deleted documents + if not doc or (not include_deleted and doc.is_tombstone()): + continue + yield doc + + def get_doc(self, doc_id, check_for_conflicts=False): + """ + Extract the document from storage. + + This can return None if the document doesn't exist. + + :param doc_id: The unique document identifier + :type doc_id: str + :param check_for_conflicts: If set to False, then the conflict check + will be skipped. + :type check_for_conflicts: bool + + :return: The document. + :rtype: ServerDocument + """ + doc_from_batch = self.__check_batch_before_get(doc_id) + if doc_from_batch: + return doc_from_batch + if self.batching and doc_id not in self.batched_ids: + return None + if doc_id not in self._database: + return None + # get document with all attachments (u1db content and eventual + # conflicts) + result = self.json_from_resource([doc_id], attachments=True) + return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts) + + def __check_batch_before_get(self, doc_id): + """ + If doc_id is staged for batching, then we need to commit the batch + before going ahead. This avoids consistency problems, like trying to + get a document that isn't persisted and processing like it is missing. + + :param doc_id: The unique document identifier + :type doc_id: str + """ + if doc_id in self.batch_docs: + couch_doc = self.batch_docs[doc_id] + rev = self.__perform_batch(doc_id) + couch_doc['_rev'] = rev + self.batched_ids.add(doc_id) + return self.__parse_doc_from_couch(couch_doc, doc_id, True) + return None + + def __perform_batch(self, doc_id=None): + status = self._database.update(self.batch_docs.values()) + rev = None + for ok, stored_doc_id, rev_or_error in status: + if not ok: + error = rev_or_error + if type(error) is ResourceConflict: + raise RevisionConflict + raise error + elif doc_id == stored_doc_id: + rev = rev_or_error + self.batch_docs.clear() + return rev + + def __parse_doc_from_couch(self, result, doc_id, + check_for_conflicts=False, decode=True): + # restrict to u1db documents + if 'u1db_rev' not in result: + return None + doc = ServerDocument(doc_id, result['u1db_rev']) + # set contents or make tombstone + if '_attachments' not in result \ + or 'u1db_content' not in result['_attachments']: + doc.make_tombstone() + elif decode: + doc.content = json.loads( + binascii.a2b_base64( + result['_attachments']['u1db_content']['data'])) + else: + doc._json = result['_attachments']['u1db_content']['data'] + # determine if there are conflicts + if check_for_conflicts \ + and '_attachments' in result \ + and 'u1db_conflicts' in result['_attachments']: + if decode: + conflicts = binascii.a2b_base64( + result['_attachments']['u1db_conflicts']['data']) + else: + conflicts = result['_attachments']['u1db_conflicts']['data'] + conflicts = json.loads(conflicts) + doc.set_conflicts(self._build_conflicts(doc.doc_id, conflicts)) + # store couch revision + doc.couch_rev = result['_rev'] + return doc + + def _build_conflicts(self, doc_id, attached_conflicts): + """ + Build the conflicted documents list from the conflicts attachment + fetched from a couch document. + + :param attached_conflicts: The document's conflicts as fetched from a + couch document attachment. + :type attached_conflicts: dict + """ + conflicts = [] + for doc_rev, content in attached_conflicts: + doc = ServerDocument(doc_id, doc_rev) + if content is None: + doc.make_tombstone() + else: + doc.content = content + conflicts.append(doc) + return conflicts + + def get_trans_id_for_gen(self, generation): + """ + Get the transaction id corresponding to a particular generation. + + :param generation: The generation for which to get the transaction id. + :type generation: int + + :return: The transaction id for C{generation}. + :rtype: str + + :raise InvalidGeneration: Raised when the generation does not exist. + """ + if generation == 0: + return '' + log = self._get_transaction_log(start=generation, end=generation) + if not log: + raise InvalidGeneration + _, _, trans_id = log[0] + return trans_id + + def get_replica_gen_and_trans_id(self, other_replica_uid): + """ + Return the last known generation and transaction id for the other db + replica. + + When you do a synchronization with another replica, the Database keeps + track of what generation the other database replica was at, and what + the associated transaction id was. This is used to determine what data + needs to be sent, and if two databases are claiming to be the same + replica. + + :param other_replica_uid: The identifier for the other replica. + :type other_replica_uid: str + + :return: A tuple containing the generation and transaction id we + encountered during synchronization. If we've never + synchronized with the replica, this is (0, ''). + :rtype: (int, str) + """ + doc_id = '%s%s' % (SYNC_DOC_ID_PREFIX, other_replica_uid) + try: + doc = self._database[doc_id] + except ResourceNotFound: + doc = { + '_id': doc_id, + GENERATION_KEY: 0, + REPLICA_UID_KEY: str(other_replica_uid), + TRANSACTION_ID_KEY: '', + } + self._database.save(doc) + gen, trans_id = doc[GENERATION_KEY], doc[TRANSACTION_ID_KEY] + return gen, trans_id + + def get_doc_conflicts(self, doc_id, couch_rev=None): + """ + Get the conflicted versions of a document. + + If the C{couch_rev} parameter is not None, conflicts for a specific + document's couch revision are returned. + + :param couch_rev: The couch document revision. + :type couch_rev: str + + :return: A list of conflicted versions of the document. + :rtype: list + """ + # request conflicts attachment from server + params = {} + conflicts = [] + if couch_rev is not None: + params['rev'] = couch_rev # restric document's couch revision + else: + # TODO: move into resource logic! + first_entry = self.get_doc(doc_id, check_for_conflicts=True) + conflicts.append(first_entry) + + try: + response = self.json_from_resource([doc_id, 'u1db_conflicts'], + **params) + return conflicts + self._build_conflicts( + doc_id, json.loads(response.read())) + except ResourceNotFound: + return [] + + def set_replica_gen_and_trans_id( + self, other_replica_uid, other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + get_replica_gen_and_trans_id. + + :param other_replica_uid: The U1DB identifier for the other replica. + :type other_replica_uid: str + :param other_generation: The generation number for the other replica. + :type other_generation: int + :param other_transaction_id: The transaction id associated with the + generation. + :type other_transaction_id: str + """ + doc_id = '%s%s' % (SYNC_DOC_ID_PREFIX, other_replica_uid) + try: + doc = self._database[doc_id] + except ResourceNotFound: + doc = {'_id': doc_id} + doc[GENERATION_KEY] = other_generation + doc[TRANSACTION_ID_KEY] = other_transaction_id + self._database.save(doc) + + def get_transaction_log(self): + """ + This is only for the test suite, it is not part of the api. + + :return: The complete transaction log. + :rtype: [(str, str)] + """ + log = self._get_transaction_log() + return map(lambda i: (i[1], i[2]), log) + + def _get_gen_docs( + self, start=0, end=9999999999, descending=None, limit=None): + params = {} + if descending: + params['descending'] = 'true' + # honor couch way of traversing the view tree in reverse order + start, end = end, start + params['startkey'] = _get_gen_doc_id(start) + params['endkey'] = _get_gen_doc_id(end) + params['include_docs'] = 'true' + if limit: + params['limit'] = limit + view = self._database.view("_all_docs", **params) + return view.rows + + def _get_transaction_log(self, start=0, end=9999999999): + # get current gen and trans_id + rows = self._get_gen_docs(start=start, end=end) + log = [] + for row in rows: + doc = row['doc'] + log.append(( + doc[GENERATION_KEY], + doc[DOC_ID_KEY], + doc[TRANSACTION_ID_KEY])) + return log + + def whats_changed(self, old_generation=0): + """ + Return a list of documents that have changed since old_generation. + + :param old_generation: The generation of the database in the old + state. + :type old_generation: int + + :return: (generation, trans_id, [(doc_id, generation, trans_id),...]) + The current generation of the database, its associated + transaction id, and a list of of changed documents since + old_generation, represented by tuples with for each document + its doc_id and the generation and transaction id corresponding + to the last intervening change and sorted by generation (old + changes first) + :rtype: (int, str, [(str, int, str)]) + """ + changes = [] + cur_generation, last_trans_id = self.get_generation_info() + relevant_tail = self._get_transaction_log(start=old_generation + 1) + seen = set() + for generation, doc_id, trans_id in reversed(relevant_tail): + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + changes.reverse() + return (cur_generation, last_trans_id, changes) + + def get_generation_info(self): + """ + Return the current generation. + + :return: A tuple containing the current generation and transaction id. + :rtype: (int, str) + """ + if self.batching and self.batch_generation: + return self.batch_generation + rows = self._get_gen_docs(descending=True, limit=1) + if not rows: + return 0, '' + gen_doc = rows.pop()['doc'] + return gen_doc[GENERATION_KEY], gen_doc[TRANSACTION_ID_KEY] + + def json_from_resource(self, doc_path, **kwargs): + """ + Get a resource from it's path and gets a doc's JSON using provided + parameters. + + :param doc_path: The path to resource. + :type doc_path: [str] + + :return: The request's data parsed from JSON to a dict. + :rtype: dict + """ + if doc_path is not None: + resource = self._database.resource(*doc_path) + else: + resource = self._database.resource() + _, _, data = resource.get_json(**kwargs) + return data + + def _allocate_new_generation(self, doc_id, transaction_id, save=True): + """ + Allocate a new generation number for a document modification. + + We need to allocate a new generation to this document modification by + creating a new gen doc. In order to avoid concurrent database updates + from allocating the same new generation, we will try to create the + document until we succeed, meaning that no other piece of code holds + the same generation number as ours. + + The loop below would only be executed more than once if: + + 1. there's more than one thread trying to modify the user's database, + and + + 2. the execution of getting the current generation and saving the gen + doc different threads get interleaved (one of them will succeed + and the others will fail and try again). + + Number 1 only happens when more than one user device is syncing at the + same time. Number 2 depends on not-so-frequent coincidence of + code execution. + + Also, in the race between threads for a generation number there's + always one thread that wins. so if there are N threads in the race, the + expected number of repetitions of the loop for each thread would be + N/2. If N is equal to the number of devices that the user has, the + number of possible repetitions of the loop should always be low. + """ + while True: + try: + # add the gen document + gen, _ = self.get_generation_info() + new_gen = gen + 1 + gen_doc = { + '_id': _get_gen_doc_id(new_gen), + GENERATION_KEY: new_gen, + DOC_ID_KEY: doc_id, + TRANSACTION_ID_KEY: transaction_id, + } + if save: + self._database.save(gen_doc) + break # succeeded allocating a new generation, proceed + except ResourceConflict: + pass # try again! + return gen_doc + + def save_document(self, old_doc, doc, transaction_id): + """ + Put the document in the Couch backend database. + + Note that C{old_doc} must have been fetched with the parameter + C{check_for_conflicts} equal to True, so we can properly update the + new document using the conflict information from the old one. + + :param old_doc: The old document version. + :type old_doc: ServerDocument + :param doc: The document to be put. + :type doc: ServerDocument + + :raise RevisionConflict: Raised when trying to update a document but + couch revisions mismatch. + """ + attachments = {} # we save content and conflicts as attachments + parts = [] # and we put it using couch's multipart PUT + # save content as attachment + if doc.is_tombstone() is False: + content = doc.get_json() + attachments['u1db_content'] = { + 'follows': True, + 'content_type': 'application/octet-stream', + 'length': len(content), + } + parts.append(content) + + # save conflicts as attachment + if doc.has_conflicts is True: + conflicts = json.dumps( + map(lambda cdoc: (cdoc.rev, cdoc.content), + doc.get_conflicts())) + attachments['u1db_conflicts'] = { + 'follows': True, + 'content_type': 'application/octet-stream', + 'length': len(conflicts), + } + parts.append(conflicts) + + # build the couch document + couch_doc = { + '_id': doc.doc_id, + 'u1db_rev': doc.rev, + '_attachments': attachments, + } + # if we are updating a doc we have to add the couch doc revision + if old_doc is not None and hasattr(old_doc, 'couch_rev'): + couch_doc['_rev'] = old_doc.couch_rev + # prepare the multipart PUT + if not self.batching: + buf = StringIO() + envelope = MultipartWriter(buf) + # the order in which attachments are described inside the + # serialization of the couch document must match the order in + # which they are actually written in the multipart structure. + # Because of that, we use `sorted_keys=True` in the json + # serialization (so "u1db_conflicts" comes before + # "u1db_content" on the couch document attachments + # description), and also reverse the order of the parts before + # writing them, so the "conflict" part is written before the + # "content" part. + envelope.add( + 'application/json', + json.dumps(couch_doc, sort_keys=True)) + parts.reverse() + for part in parts: + envelope.add('application/octet-stream', part) + envelope.close() + # try to save and fail if there's a revision conflict + try: + resource = self._new_resource() + resource.put_json( + doc.doc_id, body=str(buf.getvalue()), + headers=envelope.headers) + except ResourceConflict: + raise RevisionConflict() + self._allocate_new_generation(doc.doc_id, transaction_id) + else: + for name, attachment in attachments.items(): + del attachment['follows'] + del attachment['length'] + index = 0 if name is 'u1db_content' else 1 + attachment['data'] = binascii.b2a_base64( + parts[index]).strip() + couch_doc['_attachments'] = attachments + gen_doc = self._allocate_new_generation( + doc.doc_id, transaction_id, save=False) + self.batch_docs[doc.doc_id] = couch_doc + self.batch_docs[gen_doc['_id']] = gen_doc + last_gen, last_trans_id = self.batch_generation + self.batch_generation = (last_gen + 1, transaction_id) + + def _new_resource(self, *path): + """ + Return a new resource for accessing a couch database. + + :return: A resource for accessing a couch database. + :rtype: couchdb.http.Resource + """ + # Workaround for: https://leap.se/code/issues/5448 + url = couch_urljoin(self._database.resource.url, *path) + resource = Resource(url, Session(timeout=COUCH_TIMEOUT)) + resource.credentials = self._database.resource.credentials + resource.headers = self._database.resource.headers.copy() + return resource diff --git a/src/leap/soledad/common/couch/state.py b/src/leap/soledad/common/couch/state.py new file mode 100644 index 00000000..8cbe0934 --- /dev/null +++ b/src/leap/soledad/common/couch/state.py @@ -0,0 +1,158 @@ +# -*- coding: utf-8 -*- +# state.py +# Copyright (C) 2015,2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Server state using CouchDatabase as backend. +""" +import couchdb +import re + +from six.moves.urllib.parse import urljoin + +from leap.soledad.common.log import getLogger +from leap.soledad.common.couch import CouchDatabase +from leap.soledad.common.couch import CONFIG_DOC_ID +from leap.soledad.common.couch import SCHEMA_VERSION +from leap.soledad.common.couch import SCHEMA_VERSION_KEY +from leap.soledad.common.command import exec_validated_cmd +from leap.soledad.common.l2db.remote.server_state import ServerState +from leap.soledad.common.l2db.errors import Unauthorized +from leap.soledad.common.errors import WrongCouchSchemaVersionError +from leap.soledad.common.errors import MissingCouchConfigDocumentError + + +logger = getLogger(__name__) + + +def is_db_name_valid(name): + """ + Validate a user database using a regular expression. + + :param name: database name. + :type name: str + + :return: boolean for name vailidity + :rtype: bool + """ + db_name_regex = "^user-[a-f0-9]+$" + return re.match(db_name_regex, name) is not None + + +class CouchServerState(ServerState): + + """ + Inteface of the WSGI server with the CouchDB backend. + """ + + def __init__(self, couch_url, create_cmd=None, + check_schema_versions=False): + """ + Initialize the couch server state. + + :param couch_url: The URL for the couch database. + :type couch_url: str + :param create_cmd: Command to be executed for user db creation. It will + receive a properly sanitized parameter with user db + name and should access CouchDB with necessary + privileges, which server lacks for security reasons. + :type create_cmd: str + :param check_schema_versions: Whether to check couch schema version of + user dbs. Set to False as this is only + intended to run once during start-up. + :type check_schema_versions: bool + """ + self.couch_url = couch_url + self.create_cmd = create_cmd + if check_schema_versions: + self._check_schema_versions() + + def _check_schema_versions(self): + """ + Check that all user databases use the correct couch schema. + """ + server = couchdb.client.Server(self.couch_url) + for dbname in server: + if not dbname.startswith('user-'): + continue + db = server[dbname] + + # if there are documents, ensure that a config doc exists + config_doc = db.get(CONFIG_DOC_ID) + if config_doc: + if config_doc[SCHEMA_VERSION_KEY] != SCHEMA_VERSION: + logger.error( + "Unsupported database schema in database %s" % dbname) + raise WrongCouchSchemaVersionError(dbname) + else: + result = db.view('_all_docs', limit=1) + if result.total_rows != 0: + logger.error( + "Missing couch config document in database %s" + % dbname) + raise MissingCouchConfigDocumentError(dbname) + + def open_database(self, dbname): + """ + Open a couch database. + + :param dbname: The name of the database to open. + :type dbname: str + + :return: The SoledadBackend object. + :rtype: SoledadBackend + """ + url = urljoin(self.couch_url, dbname) + db = CouchDatabase.open_database(url, create=False) + return db + + def ensure_database(self, dbname): + """ + Ensure couch database exists. + + :param dbname: The name of the database to ensure. + :type dbname: str + + :raise Unauthorized: If disabled or other error was raised. + + :return: The SoledadBackend object and its replica_uid. + :rtype: (SoledadBackend, str) + """ + if not self.create_cmd: + raise Unauthorized() + else: + code, out = exec_validated_cmd(self.create_cmd, dbname, + validator=is_db_name_valid) + if code is not 0: + logger.error(""" + Error while creating database (%s) with (%s) command. + Output: %s + Exit code: %d + """ % (dbname, self.create_cmd, out, code)) + raise Unauthorized() + db = self.open_database(dbname) + return db, db.replica_uid + + def delete_database(self, dbname): + """ + Delete couch database. + + :param dbname: The name of the database to delete. + :type dbname: str + + :raise Unauthorized: Always, because Soledad server is not allowed to + delete databases. + """ + raise Unauthorized() diff --git a/src/leap/soledad/common/couch/support.py b/src/leap/soledad/common/couch/support.py new file mode 100644 index 00000000..bfc4fef6 --- /dev/null +++ b/src/leap/soledad/common/couch/support.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +# support.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import sys + + +""" +Monkey patches and temporary code that may be removed with version changes. +""" + + +# for bigcouch +# TODO: Remove if bigcouch support is dropped +class MultipartWriter(object): + + """ + A multipart writer adapted from python-couchdb's one so we can PUT + documents using couch's multipart PUT. + + This stripped down version does not allow for nested structures, and + contains only the essential things we need to PUT SoledadDocuments to the + couch backend. Also, please note that this is a patch. The couchdb lib has + another implementation that works fine with CouchDB 1.6, but removing this + now will break compatibility with bigcouch. + """ + + CRLF = '\r\n' + + def __init__(self, fileobj, headers=None, boundary=None): + """ + Initialize the multipart writer. + """ + self.fileobj = fileobj + if boundary is None: + boundary = self._make_boundary() + self._boundary = boundary + self._build_headers('related', headers) + + def add(self, mimetype, content, headers={}): + """ + Add a part to the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + self.fileobj.write(self.CRLF) + headers['Content-Type'] = mimetype + self._write_headers(headers) + if content: + # XXX: throw an exception if a boundary appears in the content?? + self.fileobj.write(content) + self.fileobj.write(self.CRLF) + + def close(self): + """ + Close the multipart stream. + """ + self.fileobj.write('--') + self.fileobj.write(self._boundary) + # be careful not to have anything after '--', otherwise old couch + # versions (including bigcouch) will fail. + self.fileobj.write('--') + + def _make_boundary(self): + """ + Create a boundary to discern multi parts. + """ + try: + from uuid import uuid4 + return '==' + uuid4().hex + '==' + except ImportError: + from random import randrange + token = randrange(sys.maxint) + format = '%%0%dd' % len(repr(sys.maxint - 1)) + return '===============' + (format % token) + '==' + + def _write_headers(self, headers): + """ + Write a part header in the buffer stream. + """ + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.fileobj.write(name) + self.fileobj.write(': ') + self.fileobj.write(value) + self.fileobj.write(self.CRLF) + self.fileobj.write(self.CRLF) + + def _build_headers(self, subtype, headers): + """ + Build the main headers of the multipart stream. + + This is here so we can send headers separete from content using + python-couchdb API. + """ + self.headers = {} + self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \ + (subtype, self._boundary) + if headers: + for name in sorted(headers.keys()): + value = headers[name] + self.headers[name] = value diff --git a/src/leap/soledad/common/crypto.py b/src/leap/soledad/common/crypto.py new file mode 100644 index 00000000..c13c4aa7 --- /dev/null +++ b/src/leap/soledad/common/crypto.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +# crypto.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Soledad common crypto bits. +""" + + +# +# Encryption schemes used for encryption. +# + +class EncryptionSchemes(object): + + """ + Representation of encryption schemes used to encrypt documents. + """ + + NONE = 'none' + SYMKEY = 'symkey' + PUBKEY = 'pubkey' + + +class UnknownEncryptionSchemeError(Exception): + + """ + Raised when trying to decrypt from unknown encryption schemes. + """ + pass + + +class EncryptionMethods(object): + + """ + Representation of encryption methods that can be used. + """ + + AES_256_CTR = 'aes-256-ctr' + + +class UnknownEncryptionMethodError(Exception): + + """ + Raised when trying to encrypt/decrypt with unknown method. + """ + pass + + +class MacMethods(object): + + """ + Representation of MAC methods used to authenticate document's contents. + """ + + HMAC = 'hmac' + + +class UnknownMacMethodError(Exception): + + """ + Raised when trying to authenticate document's content with unknown MAC + mehtod. + """ + pass + + +class WrongMacError(Exception): + + """ + Raised when failing to authenticate document's contents based on MAC. + """ + + +# +# Crypto utilities for a SoledadDocument. +# + +ENC_JSON_KEY = '_enc_json' +ENC_SCHEME_KEY = '_enc_scheme' +ENC_METHOD_KEY = '_enc_method' +ENC_IV_KEY = '_enc_iv' +MAC_KEY = '_mac' +MAC_METHOD_KEY = '_mac_method' diff --git a/src/leap/soledad/common/document.py b/src/leap/soledad/common/document.py new file mode 100644 index 00000000..6c26a29f --- /dev/null +++ b/src/leap/soledad/common/document.py @@ -0,0 +1,180 @@ +# -*- coding: utf-8 -*- +# document.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +A Soledad Document is an l2db.Document with lasers. +""" + + +from .l2db import Document + + +# +# SoledadDocument +# + +class SoledadDocument(Document): + + """ + Encryptable and syncable document. + + LEAP Documents can be flagged as syncable or not, so the replicas + might not sync every document. + """ + + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, + syncable=True): + """ + Container for handling an encryptable document. + + @param doc_id: The unique document identifier. + @type doc_id: str + @param rev: The revision identifier of the document. + @type rev: str + @param json: The JSON string for this document. + @type json: str + @param has_conflicts: Boolean indicating if this document has conflicts + @type has_conflicts: bool + @param syncable: Should this document be synced with remote replicas? + @type syncable: bool + """ + Document.__init__(self, doc_id, rev, json, has_conflicts) + self._syncable = syncable + + def _get_syncable(self): + """ + Return whether this document is syncable. + + @return: Is this document syncable? + @rtype: bool + """ + return self._syncable + + def _set_syncable(self, syncable=True): + """ + Determine if this document should be synced with remote replicas. + + @param syncable: Should this document be synced with remote replicas? + @type syncable: bool + """ + self._syncable = syncable + + syncable = property( + _get_syncable, + _set_syncable, + doc="Determine if document should be synced with server." + ) + + def _get_rev(self): + """ + Get the document revision. + + Returning the revision as string solves the following exception in + Twisted web: + exceptions.TypeError: Can only pass-through bytes on Python 2 + + @return: The document revision. + @rtype: str + """ + if self._rev is None: + return None + return str(self._rev) + + def _set_rev(self, rev): + """ + Set document revision. + + @param rev: The new document revision. + @type rev: bytes + """ + self._rev = rev + + rev = property( + _get_rev, + _set_rev, + doc="Wrapper to ensure `doc.rev` is always returned as bytes.") + + +class ServerDocument(SoledadDocument): + """ + This is the document used by server to hold conflicts and transactions + on a database. + + The goal is to ensure an atomic and consistent update of the database. + """ + + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False): + """ + Container for handling a document that stored on server. + + :param doc_id: The unique document identifier. + :type doc_id: str + :param rev: The revision identifier of the document. + :type rev: str + :param json: The JSON string for this document. + :type json: str + :param has_conflicts: Boolean indicating if this document has conflicts + :type has_conflicts: bool + """ + SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts) + self._conflicts = None + + def get_conflicts(self): + """ + Get the conflicted versions of the document. + + :return: The conflicted versions of the document. + :rtype: [ServerDocument] + """ + return self._conflicts or [] + + def set_conflicts(self, conflicts): + """ + Set the conflicted versions of the document. + + :param conflicts: The conflicted versions of the document. + :type conflicts: list + """ + self._conflicts = conflicts + self.has_conflicts = len(self._conflicts) > 0 + + def add_conflict(self, doc): + """ + Add a conflict to this document. + + :param doc: The conflicted version to be added. + :type doc: Document + """ + if self._conflicts is None: + raise Exception("Fetch conflicts first!") + self._conflicts.append(doc) + self.has_conflicts = len(self._conflicts) > 0 + + def delete_conflicts(self, conflict_revs): + """ + Delete conflicted versions of this document. + + :param conflict_revs: The conflicted revisions to be deleted. + :type conflict_revs: [str] + """ + if self._conflicts is None: + raise Exception("Fetch conflicts first!") + self._conflicts = filter( + lambda doc: doc.rev not in conflict_revs, + self._conflicts) + self.has_conflicts = len(self._conflicts) > 0 diff --git a/src/leap/soledad/common/errors.py b/src/leap/soledad/common/errors.py new file mode 100644 index 00000000..d543a3de --- /dev/null +++ b/src/leap/soledad/common/errors.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# errors.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Soledad errors. +""" + +from .l2db import errors +from .l2db.remote import http_errors + + +def register_exception(cls): + """ + A small decorator that registers exceptions in u1db maps. + """ + # update u1db "wire description to status" and "wire description to + # exception" maps. + http_errors.wire_description_to_status.update({ + cls.wire_description: cls.status}) + errors.wire_description_to_exc.update({ + cls.wire_description: cls}) + # do not modify the exception + return cls + + +class SoledadError(errors.U1DBError): + + """ + Base Soledad HTTP errors. + """ + pass + + +# +# Authorization errors +# + + +class DatabaseAccessError(Exception): + pass + + +@register_exception +class InvalidAuthTokenError(errors.Unauthorized): + + """ + Exception raised when failing to get authorization for some action because + the provided token either does not exist in the tokens database, has a + distinct structure from the expected one, or is associated with a user + with a distinct uuid than the one provided by the client. + """ + + wire_descrition = "invalid auth token" + status = 401 + + +# +# SoledadBackend errors +# u1db error statuses also have to be updated +http_errors.ERROR_STATUSES = set( + http_errors.wire_description_to_status.values()) + + +class InvalidURLError(Exception): + """ + Exception raised when Soledad encounters a malformed URL. + """ + + +class BackendNotReadyError(SoledadError): + """ + Generic exception raised when the backend is not ready to dispatch a client + request. + """ + wire_description = "backend not ready" + status = 500 + + +class WrongCouchSchemaVersionError(SoledadError): + """ + Raised in case there is a user database with wrong couch schema version. + """ + + +class MissingCouchConfigDocumentError(SoledadError): + """ + Raised if a database has documents but lacks the couch config document. + """ diff --git a/src/leap/soledad/common/l2db/__init__.py b/src/leap/soledad/common/l2db/__init__.py new file mode 100644 index 00000000..43d61b1d --- /dev/null +++ b/src/leap/soledad/common/l2db/__init__.py @@ -0,0 +1,694 @@ +# Copyright 2011 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +"""L2DB""" + +import json + +from leap.soledad.common.l2db.errors import InvalidJSON, InvalidContent + +__version_info__ = (13, 9) +__version__ = '.'.join(map(lambda x: '%02d' % x, __version_info__)) + + +def open(path, create, document_factory=None): + """Open a database at the given location. + + Will raise u1db.errors.DatabaseDoesNotExist if create=False and the + database does not already exist. + + :param path: The filesystem path for the database to open. + :param create: True/False, should the database be created if it doesn't + already exist? + :param document_factory: A function that will be called with the same + parameters as Document.__init__. + :return: An instance of Database. + """ + from leap.soledad.client._db import sqlite + return sqlite.SQLiteDatabase.open_database( + path, create=create, document_factory=document_factory) + + +# constraints on database names (relevant for remote access, as regex) +DBNAME_CONSTRAINTS = r"[a-zA-Z0-9][a-zA-Z0-9.-]*" + +# constraints on doc ids (as regex) +# (no slashes, and no characters outside the ascii range) +DOC_ID_CONSTRAINTS = r"[a-zA-Z0-9.%_-]+" + + +class Database(object): + """A JSON Document data store. + + This data store can be synchronized with other u1db.Database instances. + """ + + def set_document_factory(self, factory): + """Set the document factory that will be used to create objects to be + returned as documents by the database. + + :param factory: A function that returns an object which at minimum must + satisfy the same interface as does the class DocumentBase. + Subclassing that class is the easiest way to create such + a function. + """ + raise NotImplementedError(self.set_document_factory) + + def set_document_size_limit(self, limit): + """Set the maximum allowed document size for this database. + + :param limit: Maximum allowed document size in bytes. + """ + raise NotImplementedError(self.set_document_size_limit) + + def whats_changed(self, old_generation=0): + """Return a list of documents that have changed since old_generation. + This allows APPS to only store a db generation before going + 'offline', and then when coming back online they can use this + data to update whatever extra data they are storing. + + :param old_generation: The generation of the database in the old + state. + :return: (generation, trans_id, [(doc_id, generation, trans_id),...]) + The current generation of the database, its associated transaction + id, and a list of of changed documents since old_generation, + represented by tuples with for each document its doc_id and the + generation and transaction id corresponding to the last intervening + change and sorted by generation (old changes first) + """ + raise NotImplementedError(self.whats_changed) + + def get_doc(self, doc_id, include_deleted=False): + """Get the JSON string for the given document. + + :param doc_id: The unique document identifier + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise asking for a deleted + document will return None. + :return: a Document object. + """ + raise NotImplementedError(self.get_doc) + + def get_docs(self, doc_ids, check_for_conflicts=True, + include_deleted=False): + """Get the JSON content for many documents. + + :param doc_ids: A list of document identifiers. + :param check_for_conflicts: If set to False, then the conflict check + will be skipped, and 'None' will be returned instead of True/False. + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted documents will not + be included in the results. + :return: iterable giving the Document object for each document id + in matching doc_ids order. + """ + raise NotImplementedError(self.get_docs) + + def get_all_docs(self, include_deleted=False): + """Get the JSON content for all documents in the database. + + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted documents will not + be included in the results. + :return: (generation, [Document]) + The current generation of the database, followed by a list of all + the documents in the database. + """ + raise NotImplementedError(self.get_all_docs) + + def create_doc(self, content, doc_id=None): + """Create a new document. + + You can optionally specify the document identifier, but the document + must not already exist. See 'put_doc' if you want to override an + existing document. + If the database specifies a maximum document size and the document + exceeds it, create will fail and raise a DocumentTooBig exception. + + :param content: A Python dictionary. + :param doc_id: An optional identifier specifying the document id. + :return: Document + """ + raise NotImplementedError(self.create_doc) + + def create_doc_from_json(self, json, doc_id=None): + """Create a new document. + + You can optionally specify the document identifier, but the document + must not already exist. See 'put_doc' if you want to override an + existing document. + If the database specifies a maximum document size and the document + exceeds it, create will fail and raise a DocumentTooBig exception. + + :param json: The JSON document string + :param doc_id: An optional identifier specifying the document id. + :return: Document + """ + raise NotImplementedError(self.create_doc_from_json) + + def put_doc(self, doc): + """Update a document. + If the document currently has conflicts, put will fail. + If the database specifies a maximum document size and the document + exceeds it, put will fail and raise a DocumentTooBig exception. + + :param doc: A Document with new content. + :return: new_doc_rev - The new revision identifier for the document. + The Document object will also be updated. + """ + raise NotImplementedError(self.put_doc) + + def delete_doc(self, doc): + """Mark a document as deleted. + Will abort if the current revision doesn't match doc.rev. + This will also set doc.content to None. + """ + raise NotImplementedError(self.delete_doc) + + def create_index(self, index_name, *index_expressions): + """Create an named index, which can then be queried for future lookups. + Creating an index which already exists is not an error, and is cheap. + Creating an index which does not match the index_expressions of the + existing index is an error. + Creating an index will block until the expressions have been evaluated + and the index generated. + + :param index_name: A unique name which can be used as a key prefix + :param index_expressions: index expressions defining the index + information. + + Examples: + + "fieldname", or "fieldname.subfieldname" to index alphabetically + sorted on the contents of a field. + + "number(fieldname, width)", "lower(fieldname)" + """ + raise NotImplementedError(self.create_index) + + def delete_index(self, index_name): + """Remove a named index. + + :param index_name: The name of the index we are removing + """ + raise NotImplementedError(self.delete_index) + + def list_indexes(self): + """List the definitions of all known indexes. + + :return: A list of [('index-name', ['field', 'field2'])] definitions. + """ + raise NotImplementedError(self.list_indexes) + + def get_from_index(self, index_name, *key_values): + """Return documents that match the keys supplied. + + You must supply exactly the same number of values as have been defined + in the index. It is possible to do a prefix match by using '*' to + indicate a wildcard match. You can only supply '*' to trailing entries, + (eg 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) + It is also possible to append a '*' to the last supplied value (eg + 'val*', '*', '*' or 'val', 'val*', '*', but not 'val*', 'val', '*') + + :param index_name: The index to query + :param key_values: values to match. eg, if you have + an index with 3 fields then you would have: + get_from_index(index_name, val1, val2, val3) + :return: List of [Document] + """ + raise NotImplementedError(self.get_from_index) + + def get_range_from_index(self, index_name, start_value, end_value): + """Return documents that fall within the specified range. + + Both ends of the range are inclusive. For both start_value and + end_value, one must supply exactly the same number of values as have + been defined in the index, or pass None. In case of a single column + index, a string is accepted as an alternative for a tuple with a single + value. It is possible to do a prefix match by using '*' to indicate + a wildcard match. You can only supply '*' to trailing entries, (eg + 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) It is also + possible to append a '*' to the last supplied value (eg 'val*', '*', + '*' or 'val', 'val*', '*', but not 'val*', 'val', '*') + + :param index_name: The index to query + :param start_values: tuples of values that define the lower bound of + the range. eg, if you have an index with 3 fields then you would + have: (val1, val2, val3) + :param end_values: tuples of values that define the upper bound of the + range. eg, if you have an index with 3 fields then you would have: + (val1, val2, val3) + :return: List of [Document] + """ + raise NotImplementedError(self.get_range_from_index) + + def get_index_keys(self, index_name): + """Return all keys under which documents are indexed in this index. + + :param index_name: The index to query + :return: [] A list of tuples of indexed keys. + """ + raise NotImplementedError(self.get_index_keys) + + def get_doc_conflicts(self, doc_id): + """Get the list of conflicts for the given document. + + The order of the conflicts is such that the first entry is the value + that would be returned by "get_doc". + + :return: [doc] A list of the Document entries that are conflicted. + """ + raise NotImplementedError(self.get_doc_conflicts) + + def resolve_doc(self, doc, conflicted_doc_revs): + """Mark a document as no longer conflicted. + + We take the list of revisions that the client knows about that it is + superseding. This may be a different list from the actual current + conflicts, in which case only those are removed as conflicted. This + may fail if the conflict list is significantly different from the + supplied information. (sync could have happened in the background from + the time you GET_DOC_CONFLICTS until the point where you RESOLVE) + + :param doc: A Document with the new content to be inserted. + :param conflicted_doc_revs: A list of revisions that the new content + supersedes. + """ + raise NotImplementedError(self.resolve_doc) + + def get_sync_target(self): + """Return a SyncTarget object, for another u1db to synchronize with. + + :return: An instance of SyncTarget. + """ + raise NotImplementedError(self.get_sync_target) + + def close(self): + """Release any resources associated with this database.""" + raise NotImplementedError(self.close) + + def sync(self, url, creds=None, autocreate=True): + """Synchronize documents with remote replica exposed at url. + + :param url: the url of the target replica to sync with. + :param creds: optional dictionary giving credentials + to authorize the operation with the server. For using OAuth + the form of creds is: + {'oauth': { + 'consumer_key': ..., + 'consumer_secret': ..., + 'token_key': ..., + 'token_secret': ... + }} + :param autocreate: ask the target to create the db if non-existent. + :return: local_gen_before_sync The local generation before the + synchronisation was performed. This is useful to pass into + whatschanged, if an application wants to know which documents were + affected by a synchronisation. + """ + from u1db.sync import Synchronizer + from u1db.remote.http_target import HTTPSyncTarget + return Synchronizer(self, HTTPSyncTarget(url, creds=creds)).sync( + autocreate=autocreate) + + def _get_replica_gen_and_trans_id(self, other_replica_uid): + """Return the last known generation and transaction id for the other db + replica. + + When you do a synchronization with another replica, the Database keeps + track of what generation the other database replica was at, and what + the associated transaction id was. This is used to determine what data + needs to be sent, and if two databases are claiming to be the same + replica. + + :param other_replica_uid: The identifier for the other replica. + :return: (gen, trans_id) The generation and transaction id we + encountered during synchronization. If we've never synchronized + with the replica, this is (0, ''). + """ + raise NotImplementedError(self._get_replica_gen_and_trans_id) + + def _set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + """Set the last-known generation and transaction id for the other + database replica. + + We have just performed some synchronization, and we want to track what + generation the other replica was at. See also + _get_replica_gen_and_trans_id. + :param other_replica_uid: The U1DB identifier for the other replica. + :param other_generation: The generation number for the other replica. + :param other_transaction_id: The transaction id associated with the + generation. + """ + raise NotImplementedError(self._set_replica_gen_and_trans_id) + + def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, + replica_trans_id=''): + """Insert/update document into the database with a given revision. + + This api is used during synchronization operations. + + If a document would conflict and save_conflict is set to True, the + content will be selected as the 'current' content for doc.doc_id, + even though doc.rev doesn't supersede the currently stored revision. + The currently stored document will be added to the list of conflict + alternatives for the given doc_id. + + This forces the new content to be 'current' so that we get convergence + after synchronizing, even if people don't resolve conflicts. Users can + then notice that their content is out of date, update it, and + synchronize again. (The alternative is that users could synchronize and + think the data has propagated, but their local copy looks fine, and the + remote copy is never updated again.) + + :param doc: A Document object + :param save_conflict: If this document is a conflict, do you want to + save it as a conflict, or just ignore it. + :param replica_uid: A unique replica identifier. + :param replica_gen: The generation of the replica corresponding to the + this document. The replica arguments are optional, but are used + during synchronization. + :param replica_trans_id: The transaction_id associated with the + generation. + :return: (state, at_gen) - If we don't have doc_id already, + or if doc_rev supersedes the existing document revision, + then the content will be inserted, and state is 'inserted'. + If doc_rev is less than or equal to the existing revision, + then the put is ignored and state is respecitvely 'superseded' + or 'converged'. + If doc_rev is not strictly superseded or supersedes, then + state is 'conflicted'. The document will not be inserted if + save_conflict is False. + For 'inserted' or 'converged', at_gen is the insertion/current + generation. + """ + raise NotImplementedError(self._put_doc_if_newer) + + +class DocumentBase(object): + """Container for handling a single document. + + :ivar doc_id: Unique identifier for this document. + :ivar rev: The revision identifier of the document. + :ivar json_string: The JSON string for this document. + :ivar has_conflicts: Boolean indicating if this document has conflicts + """ + + def __init__(self, doc_id, rev, json_string, has_conflicts=False): + self.doc_id = doc_id + self.rev = rev + if json_string is not None: + try: + value = json.loads(json_string) + except ValueError: + raise InvalidJSON + if not isinstance(value, dict): + raise InvalidJSON + self._json = json_string + self.has_conflicts = has_conflicts + + def same_content_as(self, other): + """Compare the content of two documents.""" + if self._json: + c1 = json.loads(self._json) + else: + c1 = None + if other._json: + c2 = json.loads(other._json) + else: + c2 = None + return c1 == c2 + + def __repr__(self): + if self.has_conflicts: + extra = ', conflicted' + else: + extra = '' + return '%s(%s, %s%s, %r)' % (self.__class__.__name__, self.doc_id, + self.rev, extra, self.get_json()) + + def __hash__(self): + raise NotImplementedError(self.__hash__) + + def __eq__(self, other): + if not isinstance(other, Document): + return NotImplemented + return ( + self.doc_id == other.doc_id and self.rev == other.rev and + self.same_content_as(other) and self.has_conflicts == + other.has_conflicts) + + def __lt__(self, other): + """This is meant for testing, not part of the official api. + + It is implemented so that sorted([Document, Document]) can be used. + It doesn't imply that users would want their documents to be sorted in + this order. + """ + # Since this is just for testing, we don't worry about comparing + # against things that aren't a Document. + return ((self.doc_id, self.rev, self.get_json()) < + (other.doc_id, other.rev, other.get_json())) + + def get_json(self): + """Get the json serialization of this document.""" + if self._json is not None: + return self._json + return None + + def get_size(self): + """Calculate the total size of the document.""" + size = 0 + json = self.get_json() + if json: + size += len(json) + if self.rev: + size += len(self.rev) + if self.doc_id: + size += len(self.doc_id) + return size + + def set_json(self, json_string): + """Set the json serialization of this document.""" + if json_string is not None: + try: + value = json.loads(json_string) + except ValueError: + raise InvalidJSON + if not isinstance(value, dict): + raise InvalidJSON + self._json = json_string + + def make_tombstone(self): + """Make this document into a tombstone.""" + self._json = None + + def is_tombstone(self): + """Return True if the document is a tombstone, False otherwise.""" + if self._json is not None: + return False + return True + + +class Document(DocumentBase): + """Container for handling a single document. + + :ivar doc_id: Unique identifier for this document. + :ivar rev: The revision identifier of the document. + :ivar json: The JSON string for this document. + :ivar has_conflicts: Boolean indicating if this document has conflicts + """ + + # The following part of the API is optional: no implementation is forced to + # have it but if the language supports dictionaries/hashtables, it makes + # Documents a lot more user friendly. + + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False): + # TODO: We convert the json in the superclass to check its validity so + # we might as well set _content here directly since the price is + # already being paid. + super(Document, self).__init__(doc_id, rev, json, has_conflicts) + self._content = None + + def same_content_as(self, other): + """Compare the content of two documents.""" + if self._json: + c1 = json.loads(self._json) + else: + c1 = self._content + if other._json: + c2 = json.loads(other._json) + else: + c2 = other._content + return c1 == c2 + + def get_json(self): + """Get the json serialization of this document.""" + json_string = super(Document, self).get_json() + if json_string is not None: + return json_string + if self._content is not None: + return json.dumps(self._content) + return None + + def set_json(self, json): + """Set the json serialization of this document.""" + self._content = None + super(Document, self).set_json(json) + + def make_tombstone(self): + """Make this document into a tombstone.""" + self._content = None + super(Document, self).make_tombstone() + + def is_tombstone(self): + """Return True if the document is a tombstone, False otherwise.""" + if self._content is not None: + return False + return super(Document, self).is_tombstone() + + def _get_content(self): + """Get the dictionary representing this document.""" + if self._json is not None: + self._content = json.loads(self._json) + self._json = None + if self._content is not None: + return self._content + return None + + def _set_content(self, content): + """Set the dictionary representing this document.""" + try: + tmp = json.dumps(content) + except TypeError: + raise InvalidContent( + "Can not be converted to JSON: %r" % (content,)) + if not tmp.startswith('{'): + raise InvalidContent( + "Can not be converted to a JSON object: %r." % (content,)) + # We might as well store the JSON at this point since we did the work + # of encoding it, and it doesn't lose any information. + self._json = tmp + self._content = None + + content = property( + _get_content, _set_content, doc="Content of the Document.") + + # End of optional part. + + +class SyncTarget(object): + """Functionality for using a Database as a synchronization target.""" + + def get_sync_info(self, source_replica_uid): + """Return information about known state. + + Return the replica_uid and the current database generation of this + database, and the last-seen database generation for source_replica_uid + + :param source_replica_uid: Another replica which we might have + synchronized with in the past. + :return: (target_replica_uid, target_replica_generation, + target_trans_id, source_replica_last_known_generation, + source_replica_last_known_transaction_id) + """ + raise NotImplementedError(self.get_sync_info) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + """Record tip information for another replica. + + After sync_exchange has been processed, the caller will have + received new content from this replica. This call allows the + source replica instigating the sync to inform us what their + generation became after applying the documents we returned. + + This is used to allow future sync operations to not need to repeat data + that we just talked about. It also means that if this is called at the + wrong time, there can be database records that will never be + synchronized. + + :param source_replica_uid: The identifier for the source replica. + :param source_replica_generation: + The database generation for the source replica. + :param source_replica_transaction_id: The transaction id associated + with the source replica generation. + """ + raise NotImplementedError(self.record_sync_info) + + def sync_exchange(self, docs_by_generation, source_replica_uid, + last_known_generation, last_known_trans_id, + return_doc_cb, ensure_callback=None): + """Incorporate the documents sent from the source replica. + + This is not meant to be called by client code directly, but is used as + part of sync(). + + This adds docs to the local store, and determines documents that need + to be returned to the source replica. + + Documents must be supplied in docs_by_generation paired with + the generation of their latest change in order from the oldest + change to the newest, that means from the oldest generation to + the newest. + + Documents are also returned paired with the generation of + their latest change in order from the oldest change to the + newest. + + :param docs_by_generation: A list of [(Document, generation, + transaction_id)] tuples indicating documents which should be + updated on this replica paired with the generation and transaction + id of their latest change. + :param source_replica_uid: The source replica's identifier + :param last_known_generation: The last generation that the source + replica knows about this target replica + :param last_known_trans_id: The last transaction id that the source + replica knows about this target replica + :param: return_doc_cb(doc, gen): is a callback + used to return documents to the source replica, it will + be invoked in turn with Documents that have changed since + last_known_generation together with the generation of + their last change. + :param: ensure_callback(replica_uid): if set the target may create + the target db if not yet existent, the callback can then + be used to inform of the created db replica uid. + :return: new_generation - After applying docs_by_generation, this is + the current generation for this replica + """ + raise NotImplementedError(self.sync_exchange) + + def _set_trace_hook(self, cb): + """Set a callback that will be invoked to trace database actions. + + The callback will be passed a string indicating the current state, and + the sync target object. Implementations do not have to implement this + api, it is used by the test suite. + + :param cb: A callable that takes cb(state) + """ + raise NotImplementedError(self._set_trace_hook) + + def _set_trace_hook_shallow(self, cb): + """Set a callback that will be invoked to trace database actions. + + Similar to _set_trace_hook, for implementations that don't offer + state changes from the inner working of sync_exchange(). + + :param cb: A callable that takes cb(state) + """ + self._set_trace_hook(cb) diff --git a/src/leap/soledad/common/l2db/backends/__init__.py b/src/leap/soledad/common/l2db/backends/__init__.py new file mode 100644 index 00000000..c731c3d3 --- /dev/null +++ b/src/leap/soledad/common/l2db/backends/__init__.py @@ -0,0 +1,204 @@ +# Copyright 2011 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +"""Abstract classes and common implementations for the backends.""" + +import re +import json +import uuid + +from leap.soledad.common import l2db +from leap.soledad.common.l2db import sync as l2db_sync +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.vectorclock import VectorClockRev + + +check_doc_id_re = re.compile("^" + l2db.DOC_ID_CONSTRAINTS + "$", re.UNICODE) + + +class CommonSyncTarget(l2db_sync.LocalSyncTarget): + pass + + +class CommonBackend(l2db.Database): + + document_size_limit = 0 + + def _allocate_doc_id(self): + """Generate a unique identifier for this document.""" + return 'D-' + uuid.uuid4().hex # 'D-' stands for document + + def _allocate_transaction_id(self): + return 'T-' + uuid.uuid4().hex # 'T-' stands for transaction + + def _allocate_doc_rev(self, old_doc_rev): + vcr = VectorClockRev(old_doc_rev) + vcr.increment(self._replica_uid) + return vcr.as_str() + + def _check_doc_id(self, doc_id): + if not check_doc_id_re.match(doc_id): + raise errors.InvalidDocId() + + def _check_doc_size(self, doc): + if not self.document_size_limit: + return + if doc.get_size() > self.document_size_limit: + raise errors.DocumentTooBig + + def _get_generation(self): + """Return the current generation. + + """ + raise NotImplementedError(self._get_generation) + + def _get_generation_info(self): + """Return the current generation and transaction id. + + """ + raise NotImplementedError(self._get_generation_info) + + def _get_doc(self, doc_id, check_for_conflicts=False): + """Extract the document from storage. + + This can return None if the document doesn't exist. + """ + raise NotImplementedError(self._get_doc) + + def _has_conflicts(self, doc_id): + """Return True if the doc has conflicts, False otherwise.""" + raise NotImplementedError(self._has_conflicts) + + def create_doc(self, content, doc_id=None): + if not isinstance(content, dict): + raise errors.InvalidContent + json_string = json.dumps(content) + return self.create_doc_from_json(json_string, doc_id) + + def create_doc_from_json(self, json, doc_id=None): + if doc_id is None: + doc_id = self._allocate_doc_id() + doc = self._factory(doc_id, None, json) + self.put_doc(doc) + return doc + + def _get_transaction_log(self): + """This is only for the test suite, it is not part of the api.""" + raise NotImplementedError(self._get_transaction_log) + + def _put_and_update_indexes(self, doc_id, old_doc, new_rev, content): + raise NotImplementedError(self._put_and_update_indexes) + + def get_docs(self, doc_ids, check_for_conflicts=True, + include_deleted=False): + for doc_id in doc_ids: + doc = self._get_doc( + doc_id, check_for_conflicts=check_for_conflicts) + if doc.is_tombstone() and not include_deleted: + continue + yield doc + + def _get_trans_id_for_gen(self, generation): + """Get the transaction id corresponding to a particular generation. + + Raises an InvalidGeneration when the generation does not exist. + + """ + raise NotImplementedError(self._get_trans_id_for_gen) + + def validate_gen_and_trans_id(self, generation, trans_id): + """Validate the generation and transaction id. + + Raises an InvalidGeneration when the generation does not exist, and an + InvalidTransactionId when it does but with a different transaction id. + + """ + if generation == 0: + return + known_trans_id = self._get_trans_id_for_gen(generation) + if known_trans_id != trans_id: + raise errors.InvalidTransactionId + + def _validate_source(self, other_replica_uid, other_generation, + other_transaction_id): + """Validate the new generation and transaction id. + + other_generation must be greater than what we have stored for this + replica, *or* it must be the same and the transaction_id must be the + same as well. + """ + (old_generation, + old_transaction_id) = self._get_replica_gen_and_trans_id( + other_replica_uid) + if other_generation < old_generation: + raise errors.InvalidGeneration + if other_generation > old_generation: + return + if other_transaction_id == old_transaction_id: + return + raise errors.InvalidTransactionId + + def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, + replica_trans_id=''): + cur_doc = self._get_doc(doc.doc_id) + doc_vcr = VectorClockRev(doc.rev) + if cur_doc is None: + cur_vcr = VectorClockRev(None) + else: + cur_vcr = VectorClockRev(cur_doc.rev) + self._validate_source(replica_uid, replica_gen, replica_trans_id) + if doc_vcr.is_newer(cur_vcr): + rev = doc.rev + self._prune_conflicts(doc, doc_vcr) + if doc.rev != rev: + # conflicts have been autoresolved + state = 'superseded' + else: + state = 'inserted' + self._put_and_update_indexes(cur_doc, doc) + elif doc.rev == cur_doc.rev: + # magical convergence + state = 'converged' + elif cur_vcr.is_newer(doc_vcr): + # Don't add this to seen_ids, because we have something newer, + # so we should send it back, and we should not generate a + # conflict + state = 'superseded' + elif cur_doc.same_content_as(doc): + # the documents have been edited to the same thing at both ends + doc_vcr.maximize(cur_vcr) + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._put_and_update_indexes(cur_doc, doc) + state = 'superseded' + else: + state = 'conflicted' + if save_conflict: + self._force_doc_sync_conflict(doc) + if replica_uid is not None and replica_gen is not None: + self._do_set_replica_gen_and_trans_id( + replica_uid, replica_gen, replica_trans_id) + return state, self._get_generation() + + def _ensure_maximal_rev(self, cur_rev, extra_revs): + vcr = VectorClockRev(cur_rev) + for rev in extra_revs: + vcr.maximize(VectorClockRev(rev)) + vcr.increment(self._replica_uid) + return vcr.as_str() + + def set_document_size_limit(self, limit): + self.document_size_limit = limit diff --git a/src/leap/soledad/common/l2db/backends/inmemory.py b/src/leap/soledad/common/l2db/backends/inmemory.py new file mode 100644 index 00000000..6fd251af --- /dev/null +++ b/src/leap/soledad/common/l2db/backends/inmemory.py @@ -0,0 +1,466 @@ +# Copyright 2011 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +"""The in-memory Database class for U1DB.""" + +import json + +from leap.soledad.common.l2db import ( + Document, errors, + query_parser, vectorclock) +from leap.soledad.common.l2db.backends import CommonBackend, CommonSyncTarget + + +def get_prefix(value): + key_prefix = '\x01'.join(value) + return key_prefix.rstrip('*') + + +class InMemoryDatabase(CommonBackend): + """A database that only stores the data internally.""" + + def __init__(self, replica_uid, document_factory=None): + self._transaction_log = [] + self._docs = {} + # Map from doc_id => [(doc_rev, doc)] conflicts beyond 'winner' + self._conflicts = {} + self._other_generations = {} + self._indexes = {} + self._replica_uid = replica_uid + self._factory = document_factory or Document + + def _set_replica_uid(self, replica_uid): + """Force the replica_uid to be set.""" + self._replica_uid = replica_uid + + def set_document_factory(self, factory): + self._factory = factory + + def close(self): + # This is a no-op, We don't want to free the data because one client + # may be closing it, while another wants to inspect the results. + pass + + def _get_replica_gen_and_trans_id(self, other_replica_uid): + return self._other_generations.get(other_replica_uid, (0, '')) + + def _set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + self._do_set_replica_gen_and_trans_id( + other_replica_uid, other_generation, other_transaction_id) + + def _do_set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, + other_transaction_id): + # TODO: to handle race conditions, we may want to check if the current + # value is greater than this new value. + self._other_generations[other_replica_uid] = (other_generation, + other_transaction_id) + + def get_sync_target(self): + return InMemorySyncTarget(self) + + def _get_transaction_log(self): + # snapshot! + return self._transaction_log[:] + + def _get_generation(self): + return len(self._transaction_log) + + def _get_generation_info(self): + if not self._transaction_log: + return 0, '' + return len(self._transaction_log), self._transaction_log[-1][1] + + def _get_trans_id_for_gen(self, generation): + if generation == 0: + return '' + if generation > len(self._transaction_log): + raise errors.InvalidGeneration + return self._transaction_log[generation - 1][1] + + def put_doc(self, doc): + if doc.doc_id is None: + raise errors.InvalidDocId() + self._check_doc_id(doc.doc_id) + self._check_doc_size(doc) + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc and old_doc.has_conflicts: + raise errors.ConflictedDoc() + if old_doc and doc.rev is None and old_doc.is_tombstone(): + new_rev = self._allocate_doc_rev(old_doc.rev) + else: + if old_doc is not None: + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + else: + if doc.rev is not None: + raise errors.RevisionConflict() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + self._put_and_update_indexes(old_doc, doc) + return new_rev + + def _put_and_update_indexes(self, old_doc, doc): + for index in self._indexes.itervalues(): + if old_doc is not None and not old_doc.is_tombstone(): + index.remove_json(old_doc.doc_id, old_doc.get_json()) + if not doc.is_tombstone(): + index.add_json(doc.doc_id, doc.get_json()) + trans_id = self._allocate_transaction_id() + self._docs[doc.doc_id] = (doc.rev, doc.get_json()) + self._transaction_log.append((doc.doc_id, trans_id)) + + def _get_doc(self, doc_id, check_for_conflicts=False): + try: + doc_rev, content = self._docs[doc_id] + except KeyError: + return None + doc = self._factory(doc_id, doc_rev, content) + if check_for_conflicts: + doc.has_conflicts = (doc.doc_id in self._conflicts) + return doc + + def _has_conflicts(self, doc_id): + return doc_id in self._conflicts + + def get_doc(self, doc_id, include_deleted=False): + doc = self._get_doc(doc_id, check_for_conflicts=True) + if doc is None: + return None + if doc.is_tombstone() and not include_deleted: + return None + return doc + + def get_all_docs(self, include_deleted=False): + """Return all documents in the database.""" + generation = self._get_generation() + results = [] + for doc_id, (doc_rev, content) in self._docs.items(): + if content is None and not include_deleted: + continue + doc = self._factory(doc_id, doc_rev, content) + doc.has_conflicts = self._has_conflicts(doc_id) + results.append(doc) + return (generation, results) + + def get_doc_conflicts(self, doc_id): + if doc_id not in self._conflicts: + return [] + result = [self._get_doc(doc_id)] + result[0].has_conflicts = True + result.extend([self._factory(doc_id, rev, content) + for rev, content in self._conflicts[doc_id]]) + return result + + def _replace_conflicts(self, doc, conflicts): + if not conflicts: + del self._conflicts[doc.doc_id] + else: + self._conflicts[doc.doc_id] = conflicts + doc.has_conflicts = bool(conflicts) + + def _prune_conflicts(self, doc, doc_vcr): + if self._has_conflicts(doc.doc_id): + autoresolved = False + remaining_conflicts = [] + cur_conflicts = self._conflicts[doc.doc_id] + for c_rev, c_doc in cur_conflicts: + c_vcr = vectorclock.VectorClockRev(c_rev) + if doc_vcr.is_newer(c_vcr): + continue + if doc.same_content_as(Document(doc.doc_id, c_rev, c_doc)): + doc_vcr.maximize(c_vcr) + autoresolved = True + continue + remaining_conflicts.append((c_rev, c_doc)) + if autoresolved: + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._replace_conflicts(doc, remaining_conflicts) + + def resolve_doc(self, doc, conflicted_doc_revs): + cur_doc = self._get_doc(doc.doc_id) + if cur_doc is None: + cur_rev = None + else: + cur_rev = cur_doc.rev + new_rev = self._ensure_maximal_rev(cur_rev, conflicted_doc_revs) + superseded_revs = set(conflicted_doc_revs) + remaining_conflicts = [] + cur_conflicts = self._conflicts[doc.doc_id] + for c_rev, c_doc in cur_conflicts: + if c_rev in superseded_revs: + continue + remaining_conflicts.append((c_rev, c_doc)) + doc.rev = new_rev + if cur_rev in superseded_revs: + self._put_and_update_indexes(cur_doc, doc) + else: + remaining_conflicts.append((new_rev, doc.get_json())) + self._replace_conflicts(doc, remaining_conflicts) + + def delete_doc(self, doc): + if doc.doc_id not in self._docs: + raise errors.DocumentDoesNotExist + if self._docs[doc.doc_id][1] in ('null', None): + raise errors.DocumentAlreadyDeleted + doc.make_tombstone() + self.put_doc(doc) + + def create_index(self, index_name, *index_expressions): + if index_name in self._indexes: + if self._indexes[index_name]._definition == list( + index_expressions): + return + raise errors.IndexNameTakenError + index = InMemoryIndex(index_name, list(index_expressions)) + for doc_id, (doc_rev, doc) in self._docs.iteritems(): + if doc is not None: + index.add_json(doc_id, doc) + self._indexes[index_name] = index + + def delete_index(self, index_name): + try: + del self._indexes[index_name] + except KeyError: + pass + + def list_indexes(self): + definitions = [] + for idx in self._indexes.itervalues(): + definitions.append((idx._name, idx._definition)) + return definitions + + def get_from_index(self, index_name, *key_values): + try: + index = self._indexes[index_name] + except KeyError: + raise errors.IndexDoesNotExist + doc_ids = index.lookup(key_values) + result = [] + for doc_id in doc_ids: + result.append(self._get_doc(doc_id, check_for_conflicts=True)) + return result + + def get_range_from_index(self, index_name, start_value=None, + end_value=None): + """Return all documents with key values in the specified range.""" + try: + index = self._indexes[index_name] + except KeyError: + raise errors.IndexDoesNotExist + if isinstance(start_value, basestring): + start_value = (start_value,) + if isinstance(end_value, basestring): + end_value = (end_value,) + doc_ids = index.lookup_range(start_value, end_value) + result = [] + for doc_id in doc_ids: + result.append(self._get_doc(doc_id, check_for_conflicts=True)) + return result + + def get_index_keys(self, index_name): + try: + index = self._indexes[index_name] + except KeyError: + raise errors.IndexDoesNotExist + keys = index.keys() + # XXX inefficiency warning + return list(set([tuple(key.split('\x01')) for key in keys])) + + def whats_changed(self, old_generation=0): + changes = [] + relevant_tail = self._transaction_log[old_generation:] + # We don't use len(self._transaction_log) because _transaction_log may + # get mutated by a concurrent operation. + cur_generation = old_generation + len(relevant_tail) + last_trans_id = '' + if relevant_tail: + last_trans_id = relevant_tail[-1][1] + elif self._transaction_log: + last_trans_id = self._transaction_log[-1][1] + seen = set() + generation = cur_generation + for doc_id, trans_id in reversed(relevant_tail): + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + generation -= 1 + changes.reverse() + return (cur_generation, last_trans_id, changes) + + def _force_doc_sync_conflict(self, doc): + my_doc = self._get_doc(doc.doc_id) + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + self._conflicts.setdefault(doc.doc_id, []).append( + (my_doc.rev, my_doc.get_json())) + doc.has_conflicts = True + self._put_and_update_indexes(my_doc, doc) + + +class InMemoryIndex(object): + """Interface for managing an Index.""" + + def __init__(self, index_name, index_definition): + self._name = index_name + self._definition = index_definition + self._values = {} + parser = query_parser.Parser() + self._getters = parser.parse_all(self._definition) + + def evaluate_json(self, doc): + """Determine the 'key' after applying this index to the doc.""" + raw = json.loads(doc) + return self.evaluate(raw) + + def evaluate(self, obj): + """Evaluate a dict object, applying this definition.""" + all_rows = [[]] + for getter in self._getters: + new_rows = [] + keys = getter.get(obj) + if not keys: + return [] + for key in keys: + new_rows.extend([row + [key] for row in all_rows]) + all_rows = new_rows + all_rows = ['\x01'.join(row) for row in all_rows] + return all_rows + + def add_json(self, doc_id, doc): + """Add this json doc to the index.""" + keys = self.evaluate_json(doc) + if not keys: + return + for key in keys: + self._values.setdefault(key, []).append(doc_id) + + def remove_json(self, doc_id, doc): + """Remove this json doc from the index.""" + keys = self.evaluate_json(doc) + if keys: + for key in keys: + doc_ids = self._values[key] + doc_ids.remove(doc_id) + if not doc_ids: + del self._values[key] + + def _find_non_wildcards(self, values): + """Check if this should be a wildcard match. + + Further, this will raise an exception if the syntax is improperly + defined. + + :return: The offset of the last value we need to match against. + """ + if len(values) != len(self._definition): + raise errors.InvalidValueForIndex() + is_wildcard = False + last = 0 + for idx, val in enumerate(values): + if val.endswith('*'): + if val != '*': + # We have an 'x*' style wildcard + if is_wildcard: + # We were already in wildcard mode, so this is invalid + raise errors.InvalidGlobbing + last = idx + 1 + is_wildcard = True + else: + if is_wildcard: + # We were in wildcard mode, we can't follow that with + # non-wildcard + raise errors.InvalidGlobbing + last = idx + 1 + if not is_wildcard: + return -1 + return last + + def lookup(self, values): + """Find docs that match the values.""" + last = self._find_non_wildcards(values) + if last == -1: + return self._lookup_exact(values) + else: + return self._lookup_prefix(values[:last]) + + def lookup_range(self, start_values, end_values): + """Find docs within the range.""" + # TODO: Wildly inefficient, which is unlikely to be a problem for the + # inmemory implementation. + if start_values: + self._find_non_wildcards(start_values) + start_values = get_prefix(start_values) + if end_values: + if self._find_non_wildcards(end_values) == -1: + exact = True + else: + exact = False + end_values = get_prefix(end_values) + found = [] + for key, doc_ids in sorted(self._values.iteritems()): + if start_values and start_values > key: + continue + if end_values and end_values < key: + if exact: + break + else: + if not key.startswith(end_values): + break + found.extend(doc_ids) + return found + + def keys(self): + """Find the indexed keys.""" + return self._values.keys() + + def _lookup_prefix(self, value): + """Find docs that match the prefix string in values.""" + # TODO: We need a different data structure to make prefix style fast, + # some sort of sorted list would work, but a plain dict doesn't. + key_prefix = get_prefix(value) + all_doc_ids = [] + for key, doc_ids in sorted(self._values.iteritems()): + if key.startswith(key_prefix): + all_doc_ids.extend(doc_ids) + return all_doc_ids + + def _lookup_exact(self, value): + """Find docs that match exactly.""" + key = '\x01'.join(value) + if key in self._values: + return self._values[key] + return () + + +class InMemorySyncTarget(CommonSyncTarget): + + def get_sync_info(self, source_replica_uid): + source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( + source_replica_uid) + my_gen, my_trans_id = self._db._get_generation_info() + return ( + self._db._replica_uid, my_gen, my_trans_id, source_gen, + source_trans_id) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_transaction_id): + if self._trace_hook: + self._trace_hook('record_sync_info') + self._db._set_replica_gen_and_trans_id( + source_replica_uid, source_replica_generation, + source_transaction_id) diff --git a/src/leap/soledad/common/l2db/errors.py b/src/leap/soledad/common/l2db/errors.py new file mode 100644 index 00000000..b502fc2d --- /dev/null +++ b/src/leap/soledad/common/l2db/errors.py @@ -0,0 +1,194 @@ +# Copyright 2011-2012 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +"""A list of errors that u1db can raise.""" + + +class U1DBError(Exception): + """Generic base class for U1DB errors.""" + + # description/tag for identifying the error during transmission (http,...) + wire_description = "error" + + def __init__(self, message=None): + self.message = message + + +class RevisionConflict(U1DBError): + """The document revisions supplied does not match the current version.""" + + wire_description = "revision conflict" + + +class InvalidJSON(U1DBError): + """Content was not valid json.""" + + +class InvalidContent(U1DBError): + """Content was not a python dictionary.""" + + +class InvalidDocId(U1DBError): + """A document was requested with an invalid document identifier.""" + + wire_description = "invalid document id" + + +class MissingDocIds(U1DBError): + """Needs document ids.""" + + wire_description = "missing document ids" + + +class DocumentTooBig(U1DBError): + """Document exceeds the maximum document size for this database.""" + + wire_description = "document too big" + + +class UserQuotaExceeded(U1DBError): + """Document exceeds the maximum document size for this database.""" + + wire_description = "user quota exceeded" + + +class SubscriptionNeeded(U1DBError): + """User needs a subscription to be able to use this replica..""" + + wire_description = "user needs subscription" + + +class InvalidTransactionId(U1DBError): + """Invalid transaction for generation.""" + + wire_description = "invalid transaction id" + + +class InvalidGeneration(U1DBError): + """Generation was previously synced with a different transaction id.""" + + wire_description = "invalid generation" + + +class InvalidReplicaUID(U1DBError): + """Attempting to sync a database with itself.""" + + wire_description = "invalid replica uid" + + +class ConflictedDoc(U1DBError): + """The document is conflicted, you must call resolve before put()""" + + +class InvalidValueForIndex(U1DBError): + """The values supplied does not match the index definition.""" + + +class InvalidGlobbing(U1DBError): + """Raised if wildcard matches are not strictly at the tail of the request. + """ + + +class DocumentDoesNotExist(U1DBError): + """The document does not exist.""" + + wire_description = "document does not exist" + + +class DocumentAlreadyDeleted(U1DBError): + """The document was already deleted.""" + + wire_description = "document already deleted" + + +class DatabaseDoesNotExist(U1DBError): + """The database does not exist.""" + + wire_description = "database does not exist" + + +class IndexNameTakenError(U1DBError): + """The given index name is already taken.""" + + +class IndexDefinitionParseError(U1DBError): + """The index definition cannot be parsed.""" + + +class IndexDoesNotExist(U1DBError): + """No index of that name exists.""" + + +class Unauthorized(U1DBError): + """Request wasn't authorized properly.""" + + wire_description = "unauthorized" + + +class HTTPError(U1DBError): + """Unspecific HTTP errror.""" + + wire_description = None + + def __init__(self, status, message=None, headers={}): + self.status = status + self.message = message + self.headers = headers + + def __str__(self): + if not self.message: + return "HTTPError(%d)" % self.status + else: + return "HTTPError(%d, %r)" % (self.status, self.message) + + +class Unavailable(HTTPError): + """Server not available not serve request.""" + + wire_description = "unavailable" + + def __init__(self, message=None, headers={}): + super(Unavailable, self).__init__(503, message, headers) + + def __str__(self): + if not self.message: + return "Unavailable()" + else: + return "Unavailable(%r)" % self.message + + +class BrokenSyncStream(U1DBError): + """Unterminated or otherwise broken sync exchange stream.""" + + wire_description = None + + +class UnknownAuthMethod(U1DBError): + """Unknown auhorization method.""" + + wire_description = None + + +# mapping wire (transimission) descriptions/tags for errors to the exceptions +wire_description_to_exc = dict( + (x.wire_description, x) for x in globals().values() + if getattr(x, 'wire_description', None) not in (None, "error")) +wire_description_to_exc["error"] = U1DBError + + +# +# wire error descriptions not corresponding to an exception +DOCUMENT_DELETED = "document deleted" diff --git a/src/leap/soledad/common/l2db/query_parser.py b/src/leap/soledad/common/l2db/query_parser.py new file mode 100644 index 00000000..15a9ac80 --- /dev/null +++ b/src/leap/soledad/common/l2db/query_parser.py @@ -0,0 +1,371 @@ +# Copyright 2011 Canonical Ltd. +# Copyright 2016 LEAP Encryption Access Project +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. +""" +Code for parsing Index definitions. +""" + +import re + +from leap.soledad.common.l2db import errors + + +class Getter(object): + """Get values from a document based on a specification.""" + + def get(self, raw_doc): + """Get a value from the document. + + :param raw_doc: a python dictionary to get the value from. + :return: A list of values that match the description. + """ + raise NotImplementedError(self.get) + + +class StaticGetter(Getter): + """A getter that returns a defined value (independent of the doc).""" + + def __init__(self, value): + """Create a StaticGetter. + + :param value: the value to return when get is called. + """ + if value is None: + self.value = [] + elif isinstance(value, list): + self.value = value + else: + self.value = [value] + + def get(self, raw_doc): + return self.value + + +def extract_field(raw_doc, subfields, index=0): + if not isinstance(raw_doc, dict): + return [] + val = raw_doc.get(subfields[index]) + if val is None: + return [] + if index < len(subfields) - 1: + if isinstance(val, list): + results = [] + for item in val: + results.extend(extract_field(item, subfields, index + 1)) + return results + if isinstance(val, dict): + return extract_field(val, subfields, index + 1) + return [] + if isinstance(val, dict): + return [] + if isinstance(val, list): + # Strip anything in the list that isn't a simple type + return [v for v in val if not isinstance(v, (dict, list))] + return [val] + + +class ExtractField(Getter): + """Extract a field from the document.""" + + def __init__(self, field): + """Create an ExtractField object. + + When a document is passed to get() this will return a value + from the document based on the field specifier passed to + the constructor. + + None will be returned if the field is nonexistant, or refers to an + object, rather than a simple type or list of simple types. + + :param field: a specifier for the field to return. + This is either a field name, or a dotted field name. + """ + self.field = field.split('.') + + def get(self, raw_doc): + return extract_field(raw_doc, self.field) + + +class Transformation(Getter): + """A transformation on a value from another Getter.""" + + name = None + arity = 1 + args = ['expression'] + + def __init__(self, inner): + """Create a transformation. + + :param inner: the argument(s) to the transformation. + """ + self.inner = inner + + def get(self, raw_doc): + inner_values = self.inner.get(raw_doc) + assert isinstance(inner_values, list),\ + 'get() should always return a list' + return self.transform(inner_values) + + def transform(self, values): + """Transform the values. + + This should be implemented by subclasses to transform the + value when get() is called. + + :param values: the values from the other Getter + :return: the transformed values. + """ + raise NotImplementedError(self.transform) + + +class Lower(Transformation): + """Lowercase a string. + + This transformation will return None for non-string inputs. However, + it will lowercase any strings in a list, dropping any elements + that are not strings. + """ + + name = "lower" + + def _can_transform(self, val): + return isinstance(val, basestring) + + def transform(self, values): + if not values: + return [] + return [val.lower() for val in values if self._can_transform(val)] + + +class Number(Transformation): + """Convert an integer to a zero padded string. + + This transformation will return None for non-integer inputs. However, it + will transform any integers in a list, dropping any elements that are not + integers. + """ + + name = 'number' + arity = 2 + args = ['expression', int] + + def __init__(self, inner, number): + super(Number, self).__init__(inner) + self.padding = "%%0%sd" % number + + def _can_transform(self, val): + return isinstance(val, int) and not isinstance(val, bool) + + def transform(self, values): + """Transform any integers in values into zero padded strings.""" + if not values: + return [] + return [self.padding % (v,) for v in values if self._can_transform(v)] + + +class Bool(Transformation): + """Convert bool to string.""" + + name = "bool" + args = ['expression'] + + def _can_transform(self, val): + return isinstance(val, bool) + + def transform(self, values): + """Transform any booleans in values into strings.""" + if not values: + return [] + return [('1' if v else '0') for v in values if self._can_transform(v)] + + +class SplitWords(Transformation): + """Split a string on whitespace. + + This Getter will return [] for non-string inputs. It will however + split any strings in an input list, discarding any elements that + are not strings. + """ + + name = "split_words" + + def _can_transform(self, val): + return isinstance(val, basestring) + + def transform(self, values): + if not values: + return [] + result = set() + for value in values: + if self._can_transform(value): + for word in value.split(): + result.add(word) + return list(result) + + +class Combine(Transformation): + """Combine multiple expressions into a single index.""" + + name = "combine" + # variable number of args + arity = -1 + + def __init__(self, *inner): + super(Combine, self).__init__(inner) + + def get(self, raw_doc): + inner_values = [] + for inner in self.inner: + inner_values.extend(inner.get(raw_doc)) + return self.transform(inner_values) + + def transform(self, values): + return values + + +class IsNull(Transformation): + """Indicate whether the input is None. + + This Getter returns a bool indicating whether the input is nil. + """ + + name = "is_null" + + def transform(self, values): + return [len(values) == 0] + + +def check_fieldname(fieldname): + if fieldname.endswith('.'): + raise errors.IndexDefinitionParseError( + "Fieldname cannot end in '.':%s^" % (fieldname,)) + + +class Parser(object): + """Parse an index expression into a sequence of transformations.""" + + _transformations = {} + _delimiters = re.compile("\(|\)|,") + + def __init__(self): + self._tokens = [] + + def _set_expression(self, expression): + self._open_parens = 0 + self._tokens = [] + expression = expression.strip() + while expression: + delimiter = self._delimiters.search(expression) + if delimiter: + idx = delimiter.start() + if idx == 0: + result, expression = (expression[:1], expression[1:]) + self._tokens.append(result) + else: + result, expression = (expression[:idx], expression[idx:]) + result = result.strip() + if result: + self._tokens.append(result) + else: + expression = expression.strip() + if expression: + self._tokens.append(expression) + expression = None + + def _get_token(self): + if self._tokens: + return self._tokens.pop(0) + + def _peek_token(self): + if self._tokens: + return self._tokens[0] + + @staticmethod + def _to_getter(term): + if isinstance(term, Getter): + return term + check_fieldname(term) + return ExtractField(term) + + def _parse_op(self, op_name): + self._get_token() # '(' + op = self._transformations.get(op_name, None) + if op is None: + raise errors.IndexDefinitionParseError( + "Unknown operation: %s" % op_name) + args = [] + while True: + args.append(self._parse_term()) + sep = self._get_token() + if sep == ')': + break + if sep != ',': + raise errors.IndexDefinitionParseError( + "Unexpected token '%s' in parentheses." % (sep,)) + parsed = [] + for i, arg in enumerate(args): + arg_type = op.args[i % len(op.args)] + if arg_type == 'expression': + inner = self._to_getter(arg) + else: + try: + inner = arg_type(arg) + except ValueError as e: + raise errors.IndexDefinitionParseError( + "Invalid value %r for argument type %r " + "(%r)." % (arg, arg_type, e)) + parsed.append(inner) + return op(*parsed) + + def _parse_term(self): + term = self._get_token() + if term is None: + raise errors.IndexDefinitionParseError( + "Unexpected end of index definition.") + if term in (',', ')', '('): + raise errors.IndexDefinitionParseError( + "Unexpected token '%s' at start of expression." % (term,)) + next_token = self._peek_token() + if next_token == '(': + return self._parse_op(term) + return term + + def parse(self, expression): + self._set_expression(expression) + term = self._to_getter(self._parse_term()) + if self._peek_token(): + raise errors.IndexDefinitionParseError( + "Unexpected token '%s' after end of expression." + % (self._peek_token(),)) + return term + + def parse_all(self, fields): + return [self.parse(field) for field in fields] + + @classmethod + def register_transormation(cls, transform): + assert transform.name not in cls._transformations, ( + "Transform %s already registered for %s" + % (transform.name, cls._transformations[transform.name])) + cls._transformations[transform.name] = transform + + +Parser.register_transormation(SplitWords) +Parser.register_transormation(Lower) +Parser.register_transormation(Number) +Parser.register_transormation(Bool) +Parser.register_transormation(IsNull) +Parser.register_transormation(Combine) diff --git a/src/leap/soledad/common/l2db/remote/__init__.py b/src/leap/soledad/common/l2db/remote/__init__.py new file mode 100644 index 00000000..3f32e381 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2011 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. diff --git a/src/leap/soledad/common/l2db/remote/http_app.py b/src/leap/soledad/common/l2db/remote/http_app.py new file mode 100644 index 00000000..a4eddb36 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/http_app.py @@ -0,0 +1,660 @@ +# Copyright 2011-2012 Canonical Ltd. +# Copyright 2016 LEAP Encryption Access Project +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +""" +HTTP Application exposing U1DB. +""" +# TODO -- deprecate, use twisted/txaio. + +import functools +import six.moves.http_client as httplib +import inspect +import json +import sys +import six.moves.urllib.parse as urlparse + +import routes.mapper + +from leap.soledad.common.l2db import ( + __version__ as _u1db_version, + DBNAME_CONSTRAINTS, Document, + errors, sync) +from leap.soledad.common.l2db.remote import http_errors, utils + + +def parse_bool(expression): + """Parse boolean querystring parameter.""" + if expression == 'true': + return True + return False + + +def parse_list(expression): + if not expression: + return [] + return [t.strip() for t in expression.split(',')] + + +def none_or_str(expression): + if expression is None: + return None + return str(expression) + + +class BadRequest(Exception): + """Bad request.""" + + +class _FencedReader(object): + """Read and get lines from a file but not past a given length.""" + + MAXCHUNK = 8192 + + def __init__(self, rfile, total, max_entry_size): + self.rfile = rfile + self.remaining = total + self.max_entry_size = max_entry_size + self._kept = None + + def read_chunk(self, atmost): + if self._kept is not None: + # ignore atmost, kept data should be a subchunk anyway + kept, self._kept = self._kept, None + return kept + if self.remaining == 0: + return '' + data = self.rfile.read(min(self.remaining, atmost)) + self.remaining -= len(data) + return data + + def getline(self): + line_parts = [] + size = 0 + while True: + chunk = self.read_chunk(self.MAXCHUNK) + if chunk == '': + break + nl = chunk.find("\n") + if nl != -1: + size += nl + 1 + if size > self.max_entry_size: + raise BadRequest + line_parts.append(chunk[:nl + 1]) + rest = chunk[nl + 1:] + self._kept = rest or None + break + else: + size += len(chunk) + if size > self.max_entry_size: + raise BadRequest + line_parts.append(chunk) + return ''.join(line_parts) + + +def http_method(**control): + """Decoration for handling of query arguments and content for a HTTP + method. + + args and content here are the query arguments and body of the incoming + HTTP requests. + + Match query arguments to python method arguments: + w = http_method()(f) + w(self, args, content) => args["content"]=content; + f(self, **args) + + JSON deserialize content to arguments: + w = http_method(content_as_args=True,...)(f) + w(self, args, content) => args.update(json.loads(content)); + f(self, **args) + + Support conversions (e.g int): + w = http_method(Arg=Conv,...)(f) + w(self, args, content) => args["Arg"]=Conv(args["Arg"]); + f(self, **args) + + Enforce no use of query arguments: + w = http_method(no_query=True,...)(f) + w(self, args, content) raises BadRequest if args is not empty + + Argument mismatches, deserialisation failures produce BadRequest. + """ + content_as_args = control.pop('content_as_args', False) + no_query = control.pop('no_query', False) + conversions = control.items() + + def wrap(f): + argspec = inspect.getargspec(f) + assert argspec.args[0] == "self" + nargs = len(argspec.args) + ndefaults = len(argspec.defaults or ()) + required_args = set(argspec.args[1:nargs - ndefaults]) + all_args = set(argspec.args) + + @functools.wraps(f) + def wrapper(self, args, content): + if no_query and args: + raise BadRequest() + if content is not None: + if content_as_args: + try: + args.update(json.loads(content)) + except ValueError: + raise BadRequest() + else: + args["content"] = content + if not (required_args <= set(args) <= all_args): + raise BadRequest("Missing required arguments.") + for name, conv in conversions: + if name not in args: + continue + try: + args[name] = conv(args[name]) + except ValueError: + raise BadRequest() + return f(self, **args) + + return wrapper + + return wrap + + +class URLToResource(object): + """Mappings from URLs to resources.""" + + def __init__(self): + self._map = routes.mapper.Mapper(controller_scan=None) + + def register(self, resource_cls): + # register + self._map.connect(None, resource_cls.url_pattern, + resource_cls=resource_cls, + requirements={"dbname": DBNAME_CONSTRAINTS}) + self._map.create_regs() + return resource_cls + + def match(self, path): + params = self._map.match(path) + if params is None: + return None, None + resource_cls = params.pop('resource_cls') + return resource_cls, params + + +url_to_resource = URLToResource() + + +@url_to_resource.register +class GlobalResource(object): + """Global (root) resource.""" + + url_pattern = "/" + + def __init__(self, state, responder): + self.state = state + self.responder = responder + + @http_method() + def get(self): + info = self.state.global_info() + info['version'] = _u1db_version + self.responder.send_response_json(**info) + + +@url_to_resource.register +class DatabaseResource(object): + """Database resource.""" + + url_pattern = "/{dbname}" + + def __init__(self, dbname, state, responder): + self.dbname = dbname + self.state = state + self.responder = responder + + @http_method() + def get(self): + self.state.check_database(self.dbname) + self.responder.send_response_json(200) + + @http_method(content_as_args=True) + def put(self): + self.state.ensure_database(self.dbname) + self.responder.send_response_json(200, ok=True) + + @http_method() + def delete(self): + self.state.delete_database(self.dbname) + self.responder.send_response_json(200, ok=True) + + +@url_to_resource.register +class DocsResource(object): + """Documents resource.""" + + url_pattern = "/{dbname}/docs" + + def __init__(self, dbname, state, responder): + self.responder = responder + self.db = state.open_database(dbname) + + @http_method(doc_ids=parse_list, check_for_conflicts=parse_bool, + include_deleted=parse_bool) + def get(self, doc_ids=None, check_for_conflicts=True, + include_deleted=False): + if doc_ids is None: + raise errors.MissingDocIds + docs = self.db.get_docs(doc_ids, include_deleted=include_deleted) + self.responder.content_type = 'application/json' + self.responder.start_response(200) + self.responder.start_stream(), + for doc in docs: + entry = dict( + doc_id=doc.doc_id, doc_rev=doc.rev, content=doc.get_json(), + has_conflicts=doc.has_conflicts) + self.responder.stream_entry(entry) + self.responder.end_stream() + self.responder.finish_response() + + +@url_to_resource.register +class AllDocsResource(object): + """All Documents resource.""" + + url_pattern = "/{dbname}/all-docs" + + def __init__(self, dbname, state, responder): + self.responder = responder + self.db = state.open_database(dbname) + + @http_method(include_deleted=parse_bool) + def get(self, include_deleted=False): + gen, docs = self.db.get_all_docs(include_deleted=include_deleted) + self.responder.content_type = 'application/json' + # returning a x-u1db-generation header is optional + # HTTPDatabase will fallback to return -1 if it's missing + self.responder.start_response(200, + headers={'x-u1db-generation': str(gen)}) + self.responder.start_stream(), + for doc in docs: + entry = dict( + doc_id=doc.doc_id, doc_rev=doc.rev, content=doc.get_json(), + has_conflicts=doc.has_conflicts) + self.responder.stream_entry(entry) + self.responder.end_stream() + self.responder.finish_response() + + +@url_to_resource.register +class DocResource(object): + """Document resource.""" + + url_pattern = "/{dbname}/doc/{id:.*}" + + def __init__(self, dbname, id, state, responder): + self.id = id + self.responder = responder + self.db = state.open_database(dbname) + + @http_method(old_rev=str) + def put(self, content, old_rev=None): + doc = Document(self.id, old_rev, content) + doc_rev = self.db.put_doc(doc) + if old_rev is None: + status = 201 # created + else: + status = 200 + self.responder.send_response_json(status, rev=doc_rev) + + @http_method(old_rev=str) + def delete(self, old_rev=None): + doc = Document(self.id, old_rev, None) + self.db.delete_doc(doc) + self.responder.send_response_json(200, rev=doc.rev) + + @http_method(include_deleted=parse_bool) + def get(self, include_deleted=False): + doc = self.db.get_doc(self.id, include_deleted=include_deleted) + if doc is None: + wire_descr = errors.DocumentDoesNotExist.wire_description + self.responder.send_response_json( + http_errors.wire_description_to_status[wire_descr], + error=wire_descr, + headers={ + 'x-u1db-rev': '', + 'x-u1db-has-conflicts': 'false' + }) + return + headers = { + 'x-u1db-rev': doc.rev, + 'x-u1db-has-conflicts': json.dumps(doc.has_conflicts) + } + if doc.is_tombstone(): + self.responder.send_response_json( + http_errors.wire_description_to_status[ + errors.DOCUMENT_DELETED], + error=errors.DOCUMENT_DELETED, + headers=headers) + else: + self.responder.send_response_content( + doc.get_json(), headers=headers) + + +@url_to_resource.register +class SyncResource(object): + """Sync endpoint resource.""" + + # maximum allowed request body size + max_request_size = 15 * 1024 * 1024 # 15Mb + # maximum allowed entry/line size in request body + max_entry_size = 10 * 1024 * 1024 # 10Mb + + url_pattern = "/{dbname}/sync-from/{source_replica_uid}" + + # pluggable + sync_exchange_class = sync.SyncExchange + + def __init__(self, dbname, source_replica_uid, state, responder): + self.source_replica_uid = source_replica_uid + self.responder = responder + self.state = state + self.dbname = dbname + self.replica_uid = None + + def get_target(self): + return self.state.open_database(self.dbname).get_sync_target() + + @http_method() + def get(self): + result = self.get_target().get_sync_info(self.source_replica_uid) + self.responder.send_response_json( + target_replica_uid=result[0], target_replica_generation=result[1], + target_replica_transaction_id=result[2], + source_replica_uid=self.source_replica_uid, + source_replica_generation=result[3], + source_transaction_id=result[4]) + + @http_method(generation=int, + content_as_args=True, no_query=True) + def put(self, generation, transaction_id): + self.get_target().record_sync_info(self.source_replica_uid, + generation, + transaction_id) + self.responder.send_response_json(ok=True) + + # Implements the same logic as LocalSyncTarget.sync_exchange + + @http_method(last_known_generation=int, last_known_trans_id=none_or_str, + content_as_args=True) + def post_args(self, last_known_generation, last_known_trans_id=None, + ensure=False): + if ensure: + db, self.replica_uid = self.state.ensure_database(self.dbname) + else: + db = self.state.open_database(self.dbname) + db.validate_gen_and_trans_id( + last_known_generation, last_known_trans_id) + self.sync_exch = self.sync_exchange_class( + db, self.source_replica_uid, last_known_generation) + + @http_method(content_as_args=True) + def post_stream_entry(self, id, rev, content, gen, trans_id): + doc = Document(id, rev, content) + self.sync_exch.insert_doc_from_source(doc, gen, trans_id) + + def post_end(self): + + def send_doc(doc, gen, trans_id): + entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), + gen=gen, trans_id=trans_id) + self.responder.stream_entry(entry) + + new_gen = self.sync_exch.find_changes_to_return() + self.responder.content_type = 'application/x-u1db-sync-stream' + self.responder.start_response(200) + self.responder.start_stream(), + header = {"new_generation": new_gen, + "new_transaction_id": self.sync_exch.new_trans_id} + if self.replica_uid is not None: + header['replica_uid'] = self.replica_uid + self.responder.stream_entry(header) + self.sync_exch.return_docs(send_doc) + self.responder.end_stream() + self.responder.finish_response() + + +class HTTPResponder(object): + """Encode responses from the server back to the client.""" + + # a multi document response will put args and documents + # each on one line of the response body + + def __init__(self, start_response): + self._started = False + self._stream_state = -1 + self._no_initial_obj = True + self.sent_response = False + self._start_response = start_response + self._write = None + self.content_type = 'application/json' + self.content = [] + + def start_response(self, status, obj_dic=None, headers={}): + """start sending response with optional first json object.""" + if self._started: + return + self._started = True + status_text = httplib.responses[status] + self._write = self._start_response( + '%d %s' % (status, status_text), + [('content-type', self.content_type), + ('cache-control', 'no-cache')] + + headers.items()) + # xxx version in headers + if obj_dic is not None: + self._no_initial_obj = False + self._write(json.dumps(obj_dic) + "\r\n") + + def finish_response(self): + """finish sending response.""" + self.sent_response = True + + def send_response_json(self, status=200, headers={}, **kwargs): + """send and finish response with json object body from keyword args.""" + content = json.dumps(kwargs) + "\r\n" + self.send_response_content(content, headers=headers, status=status) + + def send_response_content(self, content, status=200, headers={}): + """send and finish response with content""" + headers['content-length'] = str(len(content)) + self.start_response(status, headers=headers) + if self._stream_state == 1: + self.content = [',\r\n', content] + else: + self.content = [content] + self.finish_response() + + def start_stream(self): + "start stream (array) as part of the response." + assert self._started and self._no_initial_obj + self._stream_state = 0 + self._write("[") + + def stream_entry(self, entry): + "send stream entry as part of the response." + assert self._stream_state != -1 + if self._stream_state == 0: + self._stream_state = 1 + self._write('\r\n') + else: + self._write(',\r\n') + if type(entry) == dict: + entry = json.dumps(entry) + self._write(entry) + + def end_stream(self): + "end stream (array)." + assert self._stream_state != -1 + self._write("\r\n]\r\n") + + +class HTTPInvocationByMethodWithBody(object): + """Invoke methods on a resource.""" + + def __init__(self, resource, environ, parameters): + self.resource = resource + self.environ = environ + self.max_request_size = getattr( + resource, 'max_request_size', parameters.max_request_size) + self.max_entry_size = getattr( + resource, 'max_entry_size', parameters.max_entry_size) + + def _lookup(self, method): + try: + return getattr(self.resource, method) + except AttributeError: + raise BadRequest() + + def __call__(self): + args = urlparse.parse_qsl(self.environ['QUERY_STRING'], + strict_parsing=False) + try: + args = dict( + (k.decode('utf-8'), v.decode('utf-8')) for k, v in args) + except ValueError: + raise BadRequest() + method = self.environ['REQUEST_METHOD'].lower() + if method in ('get', 'delete'): + meth = self._lookup(method) + return meth(args, None) + else: + # we expect content-length > 0, reconsider if we move + # to support chunked enconding + try: + content_length = int(self.environ['CONTENT_LENGTH']) + except (ValueError, KeyError): + raise BadRequest + if content_length <= 0: + raise BadRequest + if content_length > self.max_request_size: + raise BadRequest + reader = _FencedReader(self.environ['wsgi.input'], content_length, + self.max_entry_size) + content_type = self.environ.get('CONTENT_TYPE', '') + content_type = content_type.split(';', 1)[0].strip() + if content_type == 'application/json': + meth = self._lookup(method) + body = reader.read_chunk(sys.maxint) + return meth(args, body) + elif content_type == 'application/x-u1db-sync-stream': + meth_args = self._lookup('%s_args' % method) + meth_entry = self._lookup('%s_stream_entry' % method) + meth_end = self._lookup('%s_end' % method) + body_getline = reader.getline + if body_getline().strip() != '[': + raise BadRequest() + line = body_getline() + line, comma = utils.check_and_strip_comma(line.strip()) + meth_args(args, line) + while True: + line = body_getline() + entry = line.strip() + if entry == ']': + break + if not entry or not comma: # empty or no prec comma + raise BadRequest + entry, comma = utils.check_and_strip_comma(entry) + meth_entry({}, entry) + if comma or body_getline(): # extra comma or data + raise BadRequest + return meth_end() + else: + raise BadRequest() + + +class HTTPApp(object): + + # maximum allowed request body size + max_request_size = 15 * 1024 * 1024 # 15Mb + # maximum allowed entry/line size in request body + max_entry_size = 10 * 1024 * 1024 # 10Mb + + def __init__(self, state): + self.state = state + + def _lookup_resource(self, environ, responder): + resource_cls, params = url_to_resource.match(environ['PATH_INFO']) + if resource_cls is None: + raise BadRequest # 404 instead? + resource = resource_cls( + state=self.state, responder=responder, **params) + return resource + + def __call__(self, environ, start_response): + responder = HTTPResponder(start_response) + self.request_begin(environ) + try: + resource = self._lookup_resource(environ, responder) + HTTPInvocationByMethodWithBody(resource, environ, self)() + except errors.U1DBError as e: + self.request_u1db_error(environ, e) + status = http_errors.wire_description_to_status.get( + e.wire_description, 500) + responder.send_response_json(status, error=e.wire_description) + except BadRequest: + self.request_bad_request(environ) + responder.send_response_json(400, error="bad request") + except KeyboardInterrupt: + raise + except: + self.request_failed(environ) + raise + else: + self.request_done(environ) + return responder.content + + # hooks for tracing requests + + def request_begin(self, environ): + """Hook called at the beginning of processing a request.""" + pass + + def request_done(self, environ): + """Hook called when done processing a request.""" + pass + + def request_u1db_error(self, environ, exc): + """Hook called when processing a request resulted in a U1DBError. + + U1DBError passed as exc. + """ + pass + + def request_bad_request(self, environ): + """Hook called when processing a bad request. + + No actual processing was done. + """ + pass + + def request_failed(self, environ): + """Hook called when processing a request failed unexpectedly. + + Invoked from an except block, so there's interpreter exception + information available. + """ + pass diff --git a/src/leap/soledad/common/l2db/remote/http_client.py b/src/leap/soledad/common/l2db/remote/http_client.py new file mode 100644 index 00000000..1124b038 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/http_client.py @@ -0,0 +1,178 @@ +# Copyright 2011-2012 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +"""Base class to make requests to a remote HTTP server.""" + +import json +import socket +import ssl +import sys +import urllib +import six.moves.urllib.parse as urlparse +import six.moves.http_client as httplib +from time import sleep +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import http_errors + +from leap.soledad.common.l2db.remote.ssl_match_hostname import match_hostname + +# Ubuntu/debian +# XXX other... +CA_CERTS = "/etc/ssl/certs/ca-certificates.crt" + + +def _encode_query_parameter(value): + """Encode query parameter.""" + if isinstance(value, bool): + if value: + value = 'true' + else: + value = 'false' + return unicode(value).encode('utf-8') + + +class _VerifiedHTTPSConnection(httplib.HTTPSConnection): + """HTTPSConnection verifying server side certificates.""" + # derived from httplib.py + + def connect(self): + "Connect to a host on a given (SSL) port." + + sock = socket.create_connection((self.host, self.port), + self.timeout, self.source_address) + if self._tunnel_host: + self.sock = sock + self._tunnel() + if sys.platform.startswith('linux'): + cert_opts = { + 'cert_reqs': ssl.CERT_REQUIRED, + 'ca_certs': CA_CERTS + } + else: + # XXX no cert verification implemented elsewhere for now + cert_opts = {} + self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, + ssl_version=ssl.PROTOCOL_SSLv3, + **cert_opts + ) + if cert_opts: + match_hostname(self.sock.getpeercert(), self.host) + + +class HTTPClientBase(object): + """Base class to make requests to a remote HTTP server.""" + + # Will use these delays to retry on 503 befor finally giving up. The final + # 0 is there to not wait after the final try fails. + _delays = (1, 1, 2, 4, 0) + + def __init__(self, url, creds=None): + self._url = urlparse.urlsplit(url) + self._conn = None + self._creds = {} + if creds is not None: + if len(creds) != 1: + raise errors.UnknownAuthMethod() + auth_meth, credentials = creds.items()[0] + try: + set_creds = getattr(self, 'set_%s_credentials' % auth_meth) + except AttributeError: + raise errors.UnknownAuthMethod(auth_meth) + set_creds(**credentials) + + def _ensure_connection(self): + if self._conn is not None: + return + if self._url.scheme == 'https': + connClass = _VerifiedHTTPSConnection + else: + connClass = httplib.HTTPConnection + self._conn = connClass(self._url.hostname, self._url.port) + + def close(self): + if self._conn: + self._conn.close() + self._conn = None + + # xxx retry mechanism? + + def _error(self, respdic): + descr = respdic.get("error") + exc_cls = errors.wire_description_to_exc.get(descr) + if exc_cls is not None: + message = respdic.get("message") + raise exc_cls(message) + + def _response(self): + resp = self._conn.getresponse() + body = resp.read() + headers = dict(resp.getheaders()) + if resp.status in (200, 201): + return body, headers + elif resp.status in http_errors.ERROR_STATUSES: + try: + respdic = json.loads(body) + except ValueError: + pass + else: + self._error(respdic) + # special case + if resp.status == 503: + raise errors.Unavailable(body, headers) + raise errors.HTTPError(resp.status, body, headers) + + def _sign_request(self, method, url_query, params): + raise NotImplementedError + + def _request(self, method, url_parts, params=None, body=None, + content_type=None): + self._ensure_connection() + unquoted_url = url_query = self._url.path + if url_parts: + if not url_query.endswith('/'): + url_query += '/' + unquoted_url = url_query + url_query += '/'.join(urllib.quote(part, safe='') + for part in url_parts) + # oauth performs its own quoting + unquoted_url += '/'.join(url_parts) + encoded_params = {} + if params: + for key, value in params.items(): + key = unicode(key).encode('utf-8') + encoded_params[key] = _encode_query_parameter(value) + url_query += ('?' + urllib.urlencode(encoded_params)) + if body is not None and not isinstance(body, basestring): + body = json.dumps(body) + content_type = 'application/json' + headers = {} + if content_type: + headers['content-type'] = content_type + headers.update( + self._sign_request(method, unquoted_url, encoded_params)) + for delay in self._delays: + try: + self._conn.request(method, url_query, body, headers) + return self._response() + except errors.Unavailable as e: + sleep(delay) + raise e + + def _request_json(self, method, url_parts, params=None, body=None, + content_type=None): + res, headers = self._request(method, url_parts, params, body, + content_type) + return json.loads(res), headers diff --git a/src/leap/soledad/common/l2db/remote/http_database.py b/src/leap/soledad/common/l2db/remote/http_database.py new file mode 100644 index 00000000..7e61e5a4 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/http_database.py @@ -0,0 +1,158 @@ +# Copyright 2011 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +"""HTTPDatabase to access a remote db over the HTTP API.""" + +import json +import uuid + +from leap.soledad.common.l2db import ( + Database, + Document, + errors) +from leap.soledad.common.l2db.remote import ( + http_client, + http_errors, + http_target) + + +DOCUMENT_DELETED_STATUS = http_errors.wire_description_to_status[ + errors.DOCUMENT_DELETED] + + +class HTTPDatabase(http_client.HTTPClientBase, Database): + """Implement the Database API to a remote HTTP server.""" + + def __init__(self, url, document_factory=None, creds=None): + super(HTTPDatabase, self).__init__(url, creds=creds) + self._factory = document_factory or Document + + def set_document_factory(self, factory): + self._factory = factory + + @staticmethod + def open_database(url, create): + db = HTTPDatabase(url) + db.open(create) + return db + + @staticmethod + def delete_database(url): + db = HTTPDatabase(url) + db._delete() + db.close() + + def open(self, create): + if create: + self._ensure() + else: + self._check() + + def _check(self): + return self._request_json('GET', [])[0] + + def _ensure(self): + self._request_json('PUT', [], {}, {}) + + def _delete(self): + self._request_json('DELETE', [], {}, {}) + + def put_doc(self, doc): + if doc.doc_id is None: + raise errors.InvalidDocId() + params = {} + if doc.rev is not None: + params['old_rev'] = doc.rev + res, headers = self._request_json('PUT', ['doc', doc.doc_id], params, + doc.get_json(), 'application/json') + doc.rev = res['rev'] + return res['rev'] + + def get_doc(self, doc_id, include_deleted=False): + try: + res, headers = self._request( + 'GET', ['doc', doc_id], {"include_deleted": include_deleted}) + except errors.DocumentDoesNotExist: + return None + except errors.HTTPError as e: + if (e.status == DOCUMENT_DELETED_STATUS and + 'x-u1db-rev' in e.headers): + res = None + headers = e.headers + else: + raise + doc_rev = headers['x-u1db-rev'] + has_conflicts = json.loads(headers['x-u1db-has-conflicts']) + doc = self._factory(doc_id, doc_rev, res) + doc.has_conflicts = has_conflicts + return doc + + def _build_docs(self, res): + for doc_dict in json.loads(res): + doc = self._factory( + doc_dict['doc_id'], doc_dict['doc_rev'], doc_dict['content']) + doc.has_conflicts = doc_dict['has_conflicts'] + yield doc + + def get_docs(self, doc_ids, check_for_conflicts=True, + include_deleted=False): + if not doc_ids: + return [] + doc_ids = ','.join(doc_ids) + res, headers = self._request( + 'GET', ['docs'], { + "doc_ids": doc_ids, "include_deleted": include_deleted, + "check_for_conflicts": check_for_conflicts}) + return self._build_docs(res) + + def get_all_docs(self, include_deleted=False): + res, headers = self._request( + 'GET', ['all-docs'], {"include_deleted": include_deleted}) + gen = -1 + if 'x-u1db-generation' in headers: + gen = int(headers['x-u1db-generation']) + return gen, list(self._build_docs(res)) + + def _allocate_doc_id(self): + return 'D-%s' % (uuid.uuid4().hex,) + + def create_doc(self, content, doc_id=None): + if not isinstance(content, dict): + raise errors.InvalidContent + json_string = json.dumps(content) + return self.create_doc_from_json(json_string, doc_id) + + def create_doc_from_json(self, content, doc_id=None): + if doc_id is None: + doc_id = self._allocate_doc_id() + res, headers = self._request_json('PUT', ['doc', doc_id], {}, + content, 'application/json') + new_doc = self._factory(doc_id, res['rev'], content) + return new_doc + + def delete_doc(self, doc): + if doc.doc_id is None: + raise errors.InvalidDocId() + params = {'old_rev': doc.rev} + res, headers = self._request_json( + 'DELETE', ['doc', doc.doc_id], params) + doc.make_tombstone() + doc.rev = res['rev'] + + def get_sync_target(self): + st = http_target.HTTPSyncTarget(self._url.geturl()) + st._creds = self._creds + return st diff --git a/src/leap/soledad/common/l2db/remote/http_errors.py b/src/leap/soledad/common/l2db/remote/http_errors.py new file mode 100644 index 00000000..ee4cfefa --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/http_errors.py @@ -0,0 +1,48 @@ +# Copyright 2011-2012 Canonical Ltd. +# Copyright 2016 LEAP Encryption Access Project +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +""" +Information about the encoding of errors over HTTP. +""" + +from leap.soledad.common.l2db import errors + + +# error wire descriptions mapping to HTTP status codes +wire_description_to_status = dict([ + (errors.InvalidDocId.wire_description, 400), + (errors.MissingDocIds.wire_description, 400), + (errors.Unauthorized.wire_description, 401), + (errors.DocumentTooBig.wire_description, 403), + (errors.UserQuotaExceeded.wire_description, 403), + (errors.SubscriptionNeeded.wire_description, 403), + (errors.DatabaseDoesNotExist.wire_description, 404), + (errors.DocumentDoesNotExist.wire_description, 404), + (errors.DocumentAlreadyDeleted.wire_description, 404), + (errors.RevisionConflict.wire_description, 409), + (errors.InvalidGeneration.wire_description, 409), + (errors.InvalidReplicaUID.wire_description, 409), + (errors.InvalidTransactionId.wire_description, 409), + (errors.Unavailable.wire_description, 503), + # without matching exception + (errors.DOCUMENT_DELETED, 404) +]) + + +ERROR_STATUSES = set(wire_description_to_status.values()) +# 400 included explicitly for tests +ERROR_STATUSES.add(400) diff --git a/src/leap/soledad/common/l2db/remote/http_target.py b/src/leap/soledad/common/l2db/remote/http_target.py new file mode 100644 index 00000000..38804f01 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/http_target.py @@ -0,0 +1,125 @@ +# Copyright 2011-2012 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +"""SyncTarget API implementation to a remote HTTP server.""" + +import json + +from leap.soledad.common.l2db import Document, SyncTarget +from leap.soledad.common.l2db.errors import BrokenSyncStream +from leap.soledad.common.l2db.remote import ( + http_client, utils) + + +class HTTPSyncTarget(http_client.HTTPClientBase, SyncTarget): + """Implement the SyncTarget api to a remote HTTP server.""" + + @staticmethod + def connect(url): + return HTTPSyncTarget(url) + + def get_sync_info(self, source_replica_uid): + self._ensure_connection() + res, _ = self._request_json('GET', ['sync-from', source_replica_uid]) + return (res['target_replica_uid'], res['target_replica_generation'], + res['target_replica_transaction_id'], + res['source_replica_generation'], res['source_transaction_id']) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_transaction_id): + self._ensure_connection() + if self._trace_hook: # for tests + self._trace_hook('record_sync_info') + self._request_json('PUT', ['sync-from', source_replica_uid], {}, + {'generation': source_replica_generation, + 'transaction_id': source_transaction_id}) + + def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None): + parts = data.splitlines() # one at a time + if not parts or parts[0] != '[': + raise BrokenSyncStream + data = parts[1:-1] + comma = False + if data: + line, comma = utils.check_and_strip_comma(data[0]) + res = json.loads(line) + if ensure_callback and 'replica_uid' in res: + ensure_callback(res['replica_uid']) + for entry in data[1:]: + if not comma: # missing in between comma + raise BrokenSyncStream + line, comma = utils.check_and_strip_comma(entry) + entry = json.loads(line) + doc = Document(entry['id'], entry['rev'], entry['content']) + return_doc_cb(doc, entry['gen'], entry['trans_id']) + if parts[-1] != ']': + try: + partdic = json.loads(parts[-1]) + except ValueError: + pass + else: + if isinstance(partdic, dict): + self._error(partdic) + raise BrokenSyncStream + if not data or comma: # no entries or bad extra comma + raise BrokenSyncStream + return res + + def sync_exchange(self, docs_by_generations, source_replica_uid, + last_known_generation, last_known_trans_id, + return_doc_cb, ensure_callback=None): + self._ensure_connection() + if self._trace_hook: # for tests + self._trace_hook('sync_exchange') + url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) + self._conn.putrequest('POST', url) + self._conn.putheader('content-type', 'application/x-u1db-sync-stream') + for header_name, header_value in self._sign_request('POST', url, {}): + self._conn.putheader(header_name, header_value) + entries = ['['] + size = 1 + + def prepare(**dic): + entry = comma + '\r\n' + json.dumps(dic) + entries.append(entry) + return len(entry) + + comma = '' + size += prepare( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + ensure=ensure_callback is not None) + comma = ',' + for doc, gen, trans_id in docs_by_generations: + size += prepare(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), + gen=gen, trans_id=trans_id) + entries.append('\r\n]') + size += len(entries[-1]) + self._conn.putheader('content-length', str(size)) + self._conn.endheaders() + for entry in entries: + self._conn.send(entry) + entries = None + data, _ = self._response() + res = self._parse_sync_stream(data, return_doc_cb, ensure_callback) + data = None + return res['new_generation'], res['new_transaction_id'] + + # for tests + _trace_hook = None + + def _set_trace_hook_shallow(self, cb): + self._trace_hook = cb diff --git a/src/leap/soledad/common/l2db/remote/server_state.py b/src/leap/soledad/common/l2db/remote/server_state.py new file mode 100644 index 00000000..d4c3c45f --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/server_state.py @@ -0,0 +1,68 @@ +# Copyright 2011 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +"""State for servers exposing a set of U1DB databases.""" + + +class ServerState(object): + """Passed to a Request when it is instantiated. + + This is used to track server-side state, such as working-directory, open + databases, etc. + """ + + def __init__(self): + self._workingdir = None + + def set_workingdir(self, path): + self._workingdir = path + + def global_info(self): + """Return global information about the server.""" + return {} + + def _relpath(self, relpath): + # Note: We don't want to allow absolute paths here, because we + # don't want to expose the filesystem. We should also check that + # relpath doesn't have '..' in it, etc. + return self._workingdir + '/' + relpath + + def open_database(self, path): + """Open a database at the given location.""" + from leap.soledad.client._db import sqlite + full_path = self._relpath(path) + return sqlite.SQLiteDatabase.open_database(full_path, create=False) + + def check_database(self, path): + """Check if the database at the given location exists. + + Simply returns if it does or raises DatabaseDoesNotExist. + """ + db = self.open_database(path) + db.close() + + def ensure_database(self, path): + """Ensure database at the given location.""" + from leap.soledad.client._db import sqlite + full_path = self._relpath(path) + db = sqlite.SQLiteDatabase.open_database(full_path, create=True) + return db, db._replica_uid + + def delete_database(self, path): + """Delete database at the given location.""" + from leap.soledad.client._db import sqlite + full_path = self._relpath(path) + sqlite.SQLiteDatabase.delete_database(full_path) diff --git a/src/leap/soledad/common/l2db/remote/ssl_match_hostname.py b/src/leap/soledad/common/l2db/remote/ssl_match_hostname.py new file mode 100644 index 00000000..ce82f1b2 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/ssl_match_hostname.py @@ -0,0 +1,65 @@ +"""The match_hostname() function from Python 3.2, essential when using SSL.""" +# XXX put it here until it's packaged + +import re + +__version__ = '3.2a3' + + +class CertificateError(ValueError): + pass + + +def _dnsname_to_pat(dn): + pats = [] + for frag in dn.split(r'.'): + if frag == '*': + # When '*' is a fragment by itself, it matches a non-empty dotless + # fragment. + pats.append('[^.]+') + else: + # Otherwise, '*' matches any dotless fragment. + frag = re.escape(frag) + pats.append(frag.replace(r'\*', '[^.]*')) + return re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE) + + +def match_hostname(cert, hostname): + """Verify that *cert* (in decoded format as returned by + SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 rules + are mostly followed, but IP addresses are not accepted for *hostname*. + + CertificateError is raised on failure. On success, the function + returns nothing. + """ + if not cert: + raise ValueError("empty or no certificate") + dnsnames = [] + san = cert.get('subjectAltName', ()) + for key, value in san: + if key == 'DNS': + if _dnsname_to_pat(value).match(hostname): + return + dnsnames.append(value) + if not san: + # The subject is only checked when subjectAltName is empty + for sub in cert.get('subject', ()): + for key, value in sub: + # XXX according to RFC 2818, the most specific Common Name + # must be used. + if key == 'commonName': + if _dnsname_to_pat(value).match(hostname): + return + dnsnames.append(value) + if len(dnsnames) > 1: + raise CertificateError( + "hostname %r doesn't match either of %s" + % (hostname, ', '.join(map(repr, dnsnames)))) + elif len(dnsnames) == 1: + raise CertificateError( + "hostname %r doesn't match %r" + % (hostname, dnsnames[0])) + else: + raise CertificateError( + "no appropriate commonName or " + "subjectAltName fields were found") diff --git a/src/leap/soledad/common/l2db/remote/utils.py b/src/leap/soledad/common/l2db/remote/utils.py new file mode 100644 index 00000000..14cedea9 --- /dev/null +++ b/src/leap/soledad/common/l2db/remote/utils.py @@ -0,0 +1,23 @@ +# Copyright 2012 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +"""Utilities for details of the procotol.""" + + +def check_and_strip_comma(line): + if line and line[-1] == ',': + return line[:-1], True + return line, False diff --git a/src/leap/soledad/common/l2db/sync.py b/src/leap/soledad/common/l2db/sync.py new file mode 100644 index 00000000..32281f30 --- /dev/null +++ b/src/leap/soledad/common/l2db/sync.py @@ -0,0 +1,311 @@ +# Copyright 2011 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +"""The synchronization utilities for U1DB.""" +from six.moves import zip as izip + +from leap.soledad.common import l2db +from leap.soledad.common.l2db import errors + + +class Synchronizer(object): + """Collect the state around synchronizing 2 U1DB replicas. + + Synchronization is bi-directional, in that new items in the source are sent + to the target, and new items in the target are returned to the source. + However, it still recognizes that one side is initiating the request. Also, + at the moment, conflicts are only created in the source. + """ + + def __init__(self, source, sync_target): + """Create a new Synchronization object. + + :param source: A Database + :param sync_target: A SyncTarget + """ + self.source = source + self.sync_target = sync_target + self.target_replica_uid = None + self.num_inserted = 0 + + def _insert_doc_from_target(self, doc, replica_gen, trans_id): + """Try to insert synced document from target. + + Implements TAKE OTHER semantics: any document from the target + that is in conflict will be taken as the new official value, + while the current conflicting value will be stored alongside + as a conflict. In the process indexes will be updated etc. + + :return: None + """ + # Increases self.num_inserted depending whether the document + # was effectively inserted. + state, _ = self.source._put_doc_if_newer( + doc, save_conflict=True, + replica_uid=self.target_replica_uid, replica_gen=replica_gen, + replica_trans_id=trans_id) + if state == 'inserted': + self.num_inserted += 1 + elif state == 'converged': + # magical convergence + pass + elif state == 'superseded': + # we have something newer, will be taken care of at the next sync + pass + else: + assert state == 'conflicted' + # The doc was saved as a conflict, so the database was updated + self.num_inserted += 1 + + def _record_sync_info_with_the_target(self, start_generation): + """Record our new after sync generation with the target if gapless. + + Any documents received from the target will cause the local + database to increment its generation. We do not want to send + them back to the target in a future sync. However, there could + also be concurrent updates from another process doing eg + 'put_doc' while the sync was running. And we do want to + synchronize those documents. We can tell if there was a + concurrent update by comparing our new generation number + versus the generation we started, and how many documents we + inserted from the target. If it matches exactly, then we can + record with the target that they are fully up to date with our + new generation. + """ + cur_gen, trans_id = self.source._get_generation_info() + last_gen = start_generation + self.num_inserted + if (cur_gen == last_gen and self.num_inserted > 0): + self.sync_target.record_sync_info( + self.source._replica_uid, cur_gen, trans_id) + + def sync(self, callback=None, autocreate=False): + """Synchronize documents between source and target.""" + sync_target = self.sync_target + # get target identifier, its current generation, + # and its last-seen database generation for this source + try: + (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id) = sync_target.get_sync_info( + self.source._replica_uid) + except errors.DatabaseDoesNotExist: + if not autocreate: + raise + # will try to ask sync_exchange() to create the db + self.target_replica_uid = None + target_gen, target_trans_id = 0, '' + target_my_gen, target_my_trans_id = 0, '' + + def ensure_callback(replica_uid): + self.target_replica_uid = replica_uid + + else: + ensure_callback = None + if self.target_replica_uid == self.source._replica_uid: + raise errors.InvalidReplicaUID + # validate the generation and transaction id the target knows about us + self.source.validate_gen_and_trans_id( + target_my_gen, target_my_trans_id) + # what's changed since that generation and this current gen + my_gen, _, changes = self.source.whats_changed(target_my_gen) + + # this source last-seen database generation for the target + if self.target_replica_uid is None: + target_last_known_gen, target_last_known_trans_id = 0, '' + else: + target_last_known_gen, target_last_known_trans_id = ( + self.source._get_replica_gen_and_trans_id( # nopep8 + self.target_replica_uid)) + if not changes and target_last_known_gen == target_gen: + if target_trans_id != target_last_known_trans_id: + raise errors.InvalidTransactionId + return my_gen + changed_doc_ids = [doc_id for doc_id, _, _ in changes] + # prepare to send all the changed docs + docs_to_send = self.source.get_docs( + changed_doc_ids, + check_for_conflicts=False, include_deleted=True) + # TODO: there must be a way to not iterate twice + docs_by_generation = zip( + docs_to_send, (gen for _, gen, _ in changes), + (trans for _, _, trans in changes)) + + # exchange documents and try to insert the returned ones with + # the target, return target synced-up-to gen + new_gen, new_trans_id = sync_target.sync_exchange( + docs_by_generation, self.source._replica_uid, + target_last_known_gen, target_last_known_trans_id, + self._insert_doc_from_target, ensure_callback=ensure_callback) + # record target synced-up-to generation including applying what we sent + self.source._set_replica_gen_and_trans_id( + self.target_replica_uid, new_gen, new_trans_id) + + # if gapless record current reached generation with target + self._record_sync_info_with_the_target(my_gen) + + return my_gen + + +class SyncExchange(object): + """Steps and state for carrying through a sync exchange on a target.""" + + def __init__(self, db, source_replica_uid, last_known_generation): + self._db = db + self.source_replica_uid = source_replica_uid + self.source_last_known_generation = last_known_generation + self.seen_ids = {} # incoming ids not superseded + self.changes_to_return = None + self.new_gen = None + self.new_trans_id = None + # for tests + self._incoming_trace = [] + self._trace_hook = None + self._db._last_exchange_log = { + 'receive': {'docs': self._incoming_trace}, + 'return': None + } + + def _set_trace_hook(self, cb): + self._trace_hook = cb + + def _trace(self, state): + if not self._trace_hook: + return + self._trace_hook(state) + + def insert_doc_from_source(self, doc, source_gen, trans_id): + """Try to insert synced document from source. + + Conflicting documents are not inserted but will be sent over + to the sync source. + + It keeps track of progress by storing the document source + generation as well. + + The 1st step of a sync exchange is to call this repeatedly to + try insert all incoming documents from the source. + + :param doc: A Document object. + :param source_gen: The source generation of doc. + :return: None + """ + state, at_gen = self._db._put_doc_if_newer( + doc, save_conflict=False, + replica_uid=self.source_replica_uid, replica_gen=source_gen, + replica_trans_id=trans_id) + if state == 'inserted': + self.seen_ids[doc.doc_id] = at_gen + elif state == 'converged': + # magical convergence + self.seen_ids[doc.doc_id] = at_gen + elif state == 'superseded': + # we have something newer that we will return + pass + else: + # conflict that we will returne + assert state == 'conflicted' + # for tests + self._incoming_trace.append((doc.doc_id, doc.rev)) + self._db._last_exchange_log['receive'].update({ + 'source_uid': self.source_replica_uid, + 'source_gen': source_gen + }) + + def find_changes_to_return(self): + """Find changes to return. + + Find changes since last_known_generation in db generation + order using whats_changed. It excludes documents ids that have + already been considered (superseded by the sender, etc). + + :return: new_generation - the generation of this database + which the caller can consider themselves to be synchronized after + processing the returned documents. + """ + self._db._last_exchange_log['receive'].update({ # for tests + 'last_known_gen': self.source_last_known_generation + }) + self._trace('before whats_changed') + gen, trans_id, changes = self._db.whats_changed( + self.source_last_known_generation) + self._trace('after whats_changed') + self.new_gen = gen + self.new_trans_id = trans_id + seen_ids = self.seen_ids + # changed docs that weren't superseded by or converged with + self.changes_to_return = [ + (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes if + # there was a subsequent update + doc_id not in seen_ids or seen_ids.get(doc_id) < gen] + return self.new_gen + + def return_docs(self, return_doc_cb): + """Return the changed documents and their last change generation + repeatedly invoking the callback return_doc_cb. + + The final step of a sync exchange. + + :param: return_doc_cb(doc, gen, trans_id): is a callback + used to return the documents with their last change generation + to the target replica. + :return: None + """ + changes_to_return = self.changes_to_return + # return docs, including conflicts + changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] + self._trace('before get_docs') + docs = self._db.get_docs( + changed_doc_ids, check_for_conflicts=False, include_deleted=True) + + docs_by_gen = izip( + docs, (gen for _, gen, _ in changes_to_return), + (trans_id for _, _, trans_id in changes_to_return)) + _outgoing_trace = [] # for tests + for doc, gen, trans_id in docs_by_gen: + return_doc_cb(doc, gen, trans_id) + _outgoing_trace.append((doc.doc_id, doc.rev)) + # for tests + self._db._last_exchange_log['return'] = { + 'docs': _outgoing_trace, + 'last_gen': self.new_gen} + + +class LocalSyncTarget(l2db.SyncTarget): + """Common sync target implementation logic for all local sync targets.""" + + def __init__(self, db): + self._db = db + self._trace_hook = None + + def sync_exchange(self, docs_by_generations, source_replica_uid, + last_known_generation, last_known_trans_id, + return_doc_cb, ensure_callback=None): + self._db.validate_gen_and_trans_id( + last_known_generation, last_known_trans_id) + sync_exch = SyncExchange( + self._db, source_replica_uid, last_known_generation) + if self._trace_hook: + sync_exch._set_trace_hook(self._trace_hook) + # 1st step: try to insert incoming docs and record progress + for doc, doc_gen, trans_id in docs_by_generations: + sync_exch.insert_doc_from_source(doc, doc_gen, trans_id) + # 2nd step: find changed documents (including conflicts) to return + new_gen = sync_exch.find_changes_to_return() + # final step: return docs and record source replica sync point + sync_exch.return_docs(return_doc_cb) + return new_gen, sync_exch.new_trans_id + + def _set_trace_hook(self, cb): + self._trace_hook = cb diff --git a/src/leap/soledad/common/l2db/vectorclock.py b/src/leap/soledad/common/l2db/vectorclock.py new file mode 100644 index 00000000..42bceaa8 --- /dev/null +++ b/src/leap/soledad/common/l2db/vectorclock.py @@ -0,0 +1,89 @@ +# Copyright 2011 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +"""VectorClockRev helper class.""" + + +class VectorClockRev(object): + """Track vector clocks for multiple replica ids. + + This allows simple comparison to determine if one VectorClockRev is + newer/older/in-conflict-with another VectorClockRev without having to + examine history. Every replica has a strictly increasing revision. When + creating a new revision, they include all revisions for all other replicas + which the new revision dominates, and increment their own revision to + something greater than the current value. + """ + + def __init__(self, value): + self._values = self._expand(value) + + def __repr__(self): + s = self.as_str() + return '%s(%s)' % (self.__class__.__name__, s) + + def as_str(self): + s = '|'.join(['%s:%d' % (m, r) for m, r + in sorted(self._values.items())]) + return s + + def _expand(self, value): + result = {} + if value is None: + return result + for replica_info in value.split('|'): + replica_uid, counter = replica_info.split(':') + counter = int(counter) + result[replica_uid] = counter + return result + + def is_newer(self, other): + """Is this VectorClockRev strictly newer than other. + """ + if not self._values: + return False + if not other._values: + return True + this_is_newer = False + other_expand = dict(other._values) + for key, value in self._values.iteritems(): + if key in other_expand: + other_value = other_expand.pop(key) + if other_value > value: + return False + elif other_value < value: + this_is_newer = True + else: + this_is_newer = True + if other_expand: + return False + return this_is_newer + + def increment(self, replica_uid): + """Increase the 'replica_uid' section of this vector clock. + + :return: A string representing the new vector clock value + """ + self._values[replica_uid] = self._values.get(replica_uid, 0) + 1 + + def maximize(self, other_vcr): + for replica_uid, counter in other_vcr._values.iteritems(): + if replica_uid not in self._values: + self._values[replica_uid] = counter + else: + this_counter = self._values[replica_uid] + if this_counter < counter: + self._values[replica_uid] = counter diff --git a/src/leap/soledad/common/log.py b/src/leap/soledad/common/log.py new file mode 100644 index 00000000..59a47726 --- /dev/null +++ b/src/leap/soledad/common/log.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +# log.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +This module centralizes logging facilities and allows for different behaviours, +as using the python logging module instead of twisted logger, and to print logs +to stdout, mainly for development purposes. +""" + + +import os +import sys +import time + +from twisted.logger import Logger +from twisted.logger import textFileLogObserver +from twisted.logger import LogLevel +from twisted.logger import InvalidLogLevelError +from twisted.python.failure import Failure + + +# What follows is a patched class to correctly log namespace and level when +# using the default formatter and --syslog option in twistd. This seems to be a +# known bug but it has not been reported to upstream yet. + +class SyslogLogger(Logger): + + def emit(self, level, format=None, **kwargs): + if level not in LogLevel.iterconstants(): + self.failure( + "Got invalid log level {invalidLevel!r} in {logger}.emit().", + Failure(InvalidLogLevelError(level)), + invalidLevel=level, + logger=self, + ) + return + + event = kwargs + event.update( + log_logger=self, log_level=level, log_namespace=self.namespace, + log_source=self.source, log_format=format, log_time=time.time(), + ) + + # ---------------------------------8<--------------------------------- + # this is a workaround for the mess between twisted's legacy log system + # and twistd's --syslog option. + event["system"] = "%s#%s" % (self.namespace, level.name) + # ---------------------------------8<--------------------------------- + + if "log_trace" in event: + event["log_trace"].append((self, self.observer)) + + self.observer(event) + + +def getLogger(*args, **kwargs): + + if os.environ.get('SOLEDAD_USE_PYTHON_LOGGING'): + import logging + return logging.getLogger(__name__) + + if os.environ.get('SOLEDAD_LOG_TO_STDOUT'): + kwargs({'observer': textFileLogObserver(sys.stdout)}) + + return SyslogLogger(*args, **kwargs) + + +__all__ = ['getLogger'] diff --git a/src/leap/soledad/common/tests/__init__.py b/src/leap/soledad/common/tests/__init__.py new file mode 100644 index 00000000..acebb77b --- /dev/null +++ b/src/leap/soledad/common/tests/__init__.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Tests to make sure Soledad provides U1DB functionality and more. +""" + + +import os + + +def load_tests(): + """ + Build a test suite that includes all tests in leap.soledad.common.tests + but does not include tests in the u1db_tests/ subfolder. The reason for + not including those tests are: + + 1. they by themselves only test u1db functionality in the u1db module + (despite we use them as basis for testing soledad functionalities). + + 2. they would fail because we monkey patch u1db's remote http server + to add soledad functionality we need. + """ + import unittest + import glob + import imp + tests_prefix = os.path.join( + '.', 'src', 'leap', 'soledad', 'common', 'tests') + suite = unittest.TestSuite() + for testcase in glob.glob(os.path.join(tests_prefix, 'test_*.py')): + modname = os.path.basename(os.path.splitext(testcase)[0]) + f, pathname, description = imp.find_module(modname, [tests_prefix]) + module = imp.load_module(modname, f, pathname, description) + suite.addTest(unittest.TestLoader().loadTestsFromModule(module)) + return suite diff --git a/src/leap/soledad/common/tests/test_command.py b/src/leap/soledad/common/tests/test_command.py new file mode 100644 index 00000000..2136bb8f --- /dev/null +++ b/src/leap/soledad/common/tests/test_command.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# test_command.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for command execution using a validator function for arguments. +""" +from twisted.trial import unittest +from leap.soledad.common.command import exec_validated_cmd + + +def validator(arg): + return True if arg is 'valid' else False + + +class ExecuteValidatedCommandTest(unittest.TestCase): + + def test_argument_validation(self): + status, out = exec_validated_cmd("command", "invalid arg", validator) + self.assertEquals(status, 1) + self.assertEquals(out, "invalid argument") + status, out = exec_validated_cmd("echo", "valid", validator) + self.assertEquals(status, 0) + self.assertEquals(out, "valid\n") + + def test_return_status_code_success(self): + status, out = exec_validated_cmd("echo", "arg") + self.assertEquals(status, 0) + self.assertEquals(out, "arg\n") + + def test_handle_command_with_spaces(self): + status, out = exec_validated_cmd("echo I am", "an argument") + self.assertEquals(status, 0, out) + self.assertEquals(out, "I am an argument\n") + + def test_handle_oserror_on_invalid_command(self): + status, out = exec_validated_cmd("inexistent command with", "args") + self.assertEquals(status, 1) + self.assertIn("No such file or directory", out) + + def test_return_status_code_number_on_failure(self): + status, out = exec_validated_cmd("ls", "user-bebacafe") + self.assertNotEquals(status, 0) + self.assertIn('No such file or directory\n', out) |