summaryrefslogtreecommitdiff
path: root/common/src
diff options
context:
space:
mode:
Diffstat (limited to 'common/src')
-rw-r--r--common/src/leap/soledad/common/backend.py681
-rw-r--r--common/src/leap/soledad/common/couch.py1514
-rw-r--r--common/src/leap/soledad/common/couch/__init__.py746
-rw-r--r--common/src/leap/soledad/common/couch/state.py114
-rw-r--r--common/src/leap/soledad/common/errors.py2
-rw-r--r--common/src/leap/soledad/common/tests/test_couch.py149
-rw-r--r--common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py5
-rw-r--r--common/src/leap/soledad/common/tests/test_server.py10
-rw-r--r--common/src/leap/soledad/common/tests/test_sync_mutex.py7
-rw-r--r--common/src/leap/soledad/common/tests/test_sync_target.py6
-rw-r--r--common/src/leap/soledad/common/tests/util.py9
11 files changed, 1593 insertions, 1650 deletions
diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py
new file mode 100644
index 00000000..cb37b4ac
--- /dev/null
+++ b/common/src/leap/soledad/common/backend.py
@@ -0,0 +1,681 @@
+# -*- coding: utf-8 -*-
+# backend.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""A U1DB generic backend."""
+
+
+from u1db import vectorclock
+from u1db.errors import (
+ RevisionConflict,
+ InvalidDocId,
+ ConflictedDoc,
+ DocumentDoesNotExist,
+ DocumentAlreadyDeleted,
+)
+from u1db.backends import CommonBackend
+from u1db.backends import CommonSyncTarget
+from leap.soledad.common.document import CouchDocument
+
+
+class SoledadBackend(CommonBackend):
+
+ """
+ A U1DB backend implementation.
+ """
+
+ def __init__(self, database, replica_uid=None):
+ """
+ Create a new backend.
+
+ :param database: the database implementation
+ :type database: Database
+ :param replica_uid: an optional unique replica identifier
+ :type replica_uid: str
+ """
+ # save params
+ self._factory = CouchDocument
+ self._real_replica_uid = None
+ self._cache = None
+ self._dbname = database._dbname
+ self._database = database
+ if replica_uid is not None:
+ self._set_replica_uid(replica_uid)
+
+ @property
+ def cache(self):
+ if self._cache is not None:
+ return self._cache
+ else:
+ return {}
+
+ def init_caching(self, cache):
+ """
+ Start using cache by setting internal _cache attribute.
+
+ :param cache: the cache instance, anything that behaves like a dict
+ :type cache: dict
+ """
+ self._cache = cache
+
+ def get_sync_target(self):
+ """
+ Return a SyncTarget object, for another u1db to synchronize with.
+
+ :return: The sync target.
+ :rtype: SoledadSyncTarget
+ """
+ return SoledadSyncTarget(self)
+
+ def delete_database(self):
+ """
+ Delete a U1DB CouchDB database.
+ """
+ self._database.delete_database()
+
+ def close(self):
+ """
+ Release any resources associated with this database.
+
+ :return: True if db was succesfully closed.
+ :rtype: bool
+ """
+ self._database.close()
+ return True
+
+ def __del__(self):
+ """
+ Close the database upon garbage collection.
+ """
+ self.close()
+
+ def _set_replica_uid(self, replica_uid):
+ """
+ Force the replica uid to be set.
+
+ :param replica_uid: The new replica uid.
+ :type replica_uid: str
+ """
+ self._database.set_replica_uid(replica_uid)
+ self._real_replica_uid = replica_uid
+ self.cache['replica_uid'] = self._real_replica_uid
+
+ def _get_replica_uid(self):
+ """
+ Get the replica uid.
+
+ :return: The replica uid.
+ :rtype: str
+ """
+ if self._real_replica_uid is not None:
+ self.cache['replica_uid'] = self._real_replica_uid
+ return self._real_replica_uid
+ if 'replica_uid' in self.cache:
+ return self.cache['replica_uid']
+ self._real_replica_uid = self._database.get_replica_uid()
+ self._set_replica_uid(self._real_replica_uid)
+ return self._real_replica_uid
+
+ _replica_uid = property(_get_replica_uid, _set_replica_uid)
+
+ replica_uid = property(_get_replica_uid)
+
+ def _get_generation(self):
+ """
+ Return the current generation.
+
+ :return: The current generation.
+ :rtype: int
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ return self._get_generation_info()[0]
+
+ def _get_generation_info(self):
+ """
+ Return the current generation.
+
+ :return: A tuple containing the current generation and transaction id.
+ :rtype: (int, str)
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ if self.replica_uid + '_gen' in self.cache:
+ response = self.cache[self.replica_uid + '_gen']
+ return response
+ cur_gen, newest_trans_id = self._database._get_generation_info()
+ self.cache[self.replica_uid + '_gen'] = (cur_gen, newest_trans_id)
+ return (cur_gen, newest_trans_id)
+
+ def _get_trans_id_for_gen(self, generation):
+ """
+ Get the transaction id corresponding to a particular generation.
+
+ :param generation: The generation for which to get the transaction id.
+ :type generation: int
+
+ :return: The transaction id for C{generation}.
+ :rtype: str
+
+ """
+ return self._database._get_trans_id_for_gen(generation)
+
+ def _get_transaction_log(self):
+ """
+ This is only for the test suite, it is not part of the api.
+
+ :return: The complete transaction log.
+ :rtype: [(str, str)]
+
+ """
+ return self._database._get_transaction_log()
+
+ def _get_doc(self, doc_id, check_for_conflicts=False):
+ """
+ Extract the document from storage.
+
+ This can return None if the document doesn't exist.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped.
+ :type check_for_conflicts: bool
+
+ :return: The document.
+ :rtype: CouchDocument
+ """
+ return self._database._get_doc(doc_id, check_for_conflicts)
+
+ def get_doc(self, doc_id, include_deleted=False):
+ """
+ Get the JSON string for the given document.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise asking for a deleted
+ document will return None.
+ :type include_deleted: bool
+
+ :return: A document object.
+ :rtype: CouchDocument.
+ """
+ doc = self._get_doc(doc_id, check_for_conflicts=True)
+ if doc is None:
+ return None
+ if doc.is_tombstone() and not include_deleted:
+ return None
+ return doc
+
+ def get_all_docs(self, include_deleted=False):
+ """
+ Get the JSON content for all documents in the database.
+
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :type include_deleted: bool
+
+ :return: (generation, [CouchDocument])
+ The current generation of the database, followed by a list of all
+ the documents in the database.
+ :rtype: (int, [CouchDocument])
+ """
+ return self._database.get_all_docs(include_deleted)
+
+ def _put_doc(self, old_doc, doc):
+ """
+ Put the document in the Couch backend database.
+
+ Note that C{old_doc} must have been fetched with the parameter
+ C{check_for_conflicts} equal to True, so we can properly update the
+ new document using the conflict information from the old one.
+
+ :param old_doc: The old document version.
+ :type old_doc: CouchDocument
+ :param doc: The document to be put.
+ :type doc: CouchDocument
+ """
+ last_transaction =\
+ self._database.save_document(old_doc, doc,
+ self._allocate_transaction_id())
+ if self.replica_uid + '_gen' in self.cache:
+ gen, trans = self.cache[self.replica_uid + '_gen']
+ gen += 1
+ trans = last_transaction
+ self.cache[self.replica_uid + '_gen'] = (gen, trans)
+
+ def put_doc(self, doc):
+ """
+ Update a document.
+
+ If the document currently has conflicts, put will fail.
+ If the database specifies a maximum document size and the document
+ exceeds it, put will fail and raise a DocumentTooBig exception.
+
+ :param doc: A Document with new content.
+ :return: new_doc_rev - The new revision identifier for the document.
+ The Document object will also be updated.
+
+ :raise InvalidDocId: Raised if the document's id is invalid.
+ :raise DocumentTooBig: Raised if the document size is too big.
+ :raise ConflictedDoc: Raised if the document has conflicts.
+ """
+ if doc.doc_id is None:
+ raise InvalidDocId()
+ self._check_doc_id(doc.doc_id)
+ self._check_doc_size(doc)
+ old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
+ if old_doc and old_doc.has_conflicts:
+ raise ConflictedDoc()
+ if old_doc and doc.rev is None and old_doc.is_tombstone():
+ new_rev = self._allocate_doc_rev(old_doc.rev)
+ else:
+ if old_doc is not None:
+ if old_doc.rev != doc.rev:
+ raise RevisionConflict()
+ else:
+ if doc.rev is not None:
+ raise RevisionConflict()
+ new_rev = self._allocate_doc_rev(doc.rev)
+ doc.rev = new_rev
+ self._put_doc(old_doc, doc)
+ return new_rev
+
+ def whats_changed(self, old_generation=0):
+ """
+ Return a list of documents that have changed since old_generation.
+
+ :param old_generation: The generation of the database in the old
+ state.
+ :type old_generation: int
+
+ :return: (generation, trans_id, [(doc_id, generation, trans_id),...])
+ The current generation of the database, its associated
+ transaction id, and a list of of changed documents since
+ old_generation, represented by tuples with for each document
+ its doc_id and the generation and transaction id corresponding
+ to the last intervening change and sorted by generation (old
+ changes first)
+ :rtype: (int, str, [(str, int, str)])
+ """
+ return self._database.whats_changed(old_generation)
+
+ def delete_doc(self, doc):
+ """
+ Mark a document as deleted.
+
+ Will abort if the current revision doesn't match doc.rev.
+ This will also set doc.content to None.
+
+ :param doc: The document to mark as deleted.
+ :type doc: CouchDocument.
+
+ :raise DocumentDoesNotExist: Raised if the document does not
+ exist.
+ :raise RevisionConflict: Raised if the revisions do not match.
+ :raise DocumentAlreadyDeleted: Raised if the document is
+ already deleted.
+ :raise ConflictedDoc: Raised if the doc has conflicts.
+ """
+ old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
+ if old_doc is None:
+ raise DocumentDoesNotExist
+ if old_doc.rev != doc.rev:
+ raise RevisionConflict()
+ if old_doc.is_tombstone():
+ raise DocumentAlreadyDeleted
+ if old_doc.has_conflicts:
+ raise ConflictedDoc()
+ new_rev = self._allocate_doc_rev(doc.rev)
+ doc.rev = new_rev
+ doc.make_tombstone()
+ self._put_doc(old_doc, doc)
+ return new_rev
+
+ def get_doc_conflicts(self, doc_id, couch_rev=None):
+ """
+ Get the conflicted versions of a document.
+
+ If the C{couch_rev} parameter is not None, conflicts for a specific
+ document's couch revision are returned.
+
+ :param couch_rev: The couch document revision.
+ :type couch_rev: str
+
+ :return: A list of conflicted versions of the document.
+ :rtype: list
+ """
+ return self._database.get_doc_conflicts(doc_id, couch_rev)
+
+ def _get_replica_gen_and_trans_id(self, other_replica_uid):
+ """
+ Return the last known generation and transaction id for the other db
+ replica.
+
+ When you do a synchronization with another replica, the Database keeps
+ track of what generation the other database replica was at, and what
+ the associated transaction id was. This is used to determine what data
+ needs to be sent, and if two databases are claiming to be the same
+ replica.
+
+ :param other_replica_uid: The identifier for the other replica.
+ :type other_replica_uid: str
+
+ :return: A tuple containing the generation and transaction id we
+ encountered during synchronization. If we've never
+ synchronized with the replica, this is (0, '').
+ :rtype: (int, str)
+ """
+ return self._database._get_replica_gen_and_trans_id(other_replica_uid)
+
+ def _set_replica_gen_and_trans_id(self, other_replica_uid,
+ other_generation, other_transaction_id,
+ number_of_docs=None, doc_idx=None,
+ sync_id=None):
+ """
+ Set the last-known generation and transaction id for the other
+ database replica.
+
+ We have just performed some synchronization, and we want to track what
+ generation the other replica was at. See also
+ _get_replica_gen_and_trans_id.
+
+ :param other_replica_uid: The U1DB identifier for the other replica.
+ :type other_replica_uid: str
+ :param other_generation: The generation number for the other replica.
+ :type other_generation: int
+ :param other_transaction_id: The transaction id associated with the
+ generation.
+ :type other_transaction_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
+ """
+ if other_replica_uid is not None and other_generation is not None:
+ self._do_set_replica_gen_and_trans_id(
+ other_replica_uid, other_generation, other_transaction_id,
+ number_of_docs=number_of_docs, doc_idx=doc_idx,
+ sync_id=sync_id)
+
+ def _do_set_replica_gen_and_trans_id(
+ self, other_replica_uid, other_generation, other_transaction_id,
+ number_of_docs=None, doc_idx=None, sync_id=None):
+ """
+ Set the last-known generation and transaction id for the other
+ database replica.
+
+ We have just performed some synchronization, and we want to track what
+ generation the other replica was at. See also
+ _get_replica_gen_and_trans_id.
+
+ :param other_replica_uid: The U1DB identifier for the other replica.
+ :type other_replica_uid: str
+ :param other_generation: The generation number for the other replica.
+ :type other_generation: int
+ :param other_transaction_id: The transaction id associated with the
+ generation.
+ :type other_transaction_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
+ """
+ self.cache[other_replica_uid] = (other_generation,
+ other_transaction_id)
+ self._database._do_set_replica_gen_and_trans_id(other_replica_uid,
+ other_generation,
+ other_transaction_id)
+
+ def _force_doc_sync_conflict(self, doc):
+ """
+ Add a conflict and force a document put.
+
+ :param doc: The document to be put.
+ :type doc: CouchDocument
+ """
+ my_doc = self._get_doc(doc.doc_id)
+ self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev))
+ doc.add_conflict(self._factory(doc.doc_id, my_doc.rev,
+ my_doc.get_json()))
+ doc.has_conflicts = True
+ self._put_doc(my_doc, doc)
+
+ def resolve_doc(self, doc, conflicted_doc_revs):
+ """
+ Mark a document as no longer conflicted.
+
+ We take the list of revisions that the client knows about that it is
+ superseding. This may be a different list from the actual current
+ conflicts, in which case only those are removed as conflicted. This
+ may fail if the conflict list is significantly different from the
+ supplied information. (sync could have happened in the background from
+ the time you GET_DOC_CONFLICTS until the point where you RESOLVE)
+
+ :param doc: A Document with the new content to be inserted.
+ :type doc: CouchDocument
+ :param conflicted_doc_revs: A list of revisions that the new content
+ supersedes.
+ :type conflicted_doc_revs: [str]
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
+ new_rev = self._ensure_maximal_rev(cur_doc.rev,
+ conflicted_doc_revs)
+ superseded_revs = set(conflicted_doc_revs)
+ doc.rev = new_rev
+ # this backend stores conflicts as properties of the documents, so we
+ # have to copy these conflicts over to the document being updated.
+ if cur_doc.rev in superseded_revs:
+ # the newer doc version will supersede the one in the database, so
+ # we copy conflicts before updating the backend.
+ doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over.
+ doc.delete_conflicts(superseded_revs)
+ self._put_doc(cur_doc, doc)
+ else:
+ # the newer doc version does not supersede the one in the
+ # database, so we will add a conflict to the database and copy
+ # those over to the document the user has in her hands.
+ cur_doc.add_conflict(doc)
+ cur_doc.delete_conflicts(superseded_revs)
+ self._put_doc(cur_doc, cur_doc) # just update conflicts
+ # backend has been updated with current conflicts, now copy them
+ # to the current document.
+ doc.set_conflicts(cur_doc.get_conflicts())
+
+ def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,
+ replica_trans_id='', number_of_docs=None,
+ doc_idx=None, sync_id=None):
+ """
+ Insert/update document into the database with a given revision.
+
+ This api is used during synchronization operations.
+
+ If a document would conflict and save_conflict is set to True, the
+ content will be selected as the 'current' content for doc.doc_id,
+ even though doc.rev doesn't supersede the currently stored revision.
+ The currently stored document will be added to the list of conflict
+ alternatives for the given doc_id.
+
+ This forces the new content to be 'current' so that we get convergence
+ after synchronizing, even if people don't resolve conflicts. Users can
+ then notice that their content is out of date, update it, and
+ synchronize again. (The alternative is that users could synchronize and
+ think the data has propagated, but their local copy looks fine, and the
+ remote copy is never updated again.)
+
+ :param doc: A document object
+ :type doc: CouchDocument
+ :param save_conflict: If this document is a conflict, do you want to
+ save it as a conflict, or just ignore it.
+ :type save_conflict: bool
+ :param replica_uid: A unique replica identifier.
+ :type replica_uid: str
+ :param replica_gen: The generation of the replica corresponding to the
+ this document. The replica arguments are optional,
+ but are used during synchronization.
+ :type replica_gen: int
+ :param replica_trans_id: The transaction_id associated with the
+ generation.
+ :type replica_trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
+
+ :return: (state, at_gen) - If we don't have doc_id already, or if
+ doc_rev supersedes the existing document revision, then the
+ content will be inserted, and state is 'inserted'. If
+ doc_rev is less than or equal to the existing revision, then
+ the put is ignored and state is respecitvely 'superseded' or
+ 'converged'. If doc_rev is not strictly superseded or
+ supersedes, then state is 'conflicted'. The document will not
+ be inserted if save_conflict is False. For 'inserted' or
+ 'converged', at_gen is the insertion/current generation.
+ :rtype: (str, int)
+ """
+ if not isinstance(doc, CouchDocument):
+ doc = self._factory(doc.doc_id, doc.rev, doc.get_json())
+ my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
+ if my_doc:
+ doc.set_conflicts(my_doc.get_conflicts())
+ return CommonBackend._put_doc_if_newer(self, doc, save_conflict,
+ replica_uid, replica_gen,
+ replica_trans_id)
+
+ def _put_and_update_indexes(self, cur_doc, doc):
+ self._put_doc(cur_doc, doc)
+
+ def get_docs(self, doc_ids, check_for_conflicts=True,
+ include_deleted=False):
+ """
+ Get the JSON content for many documents.
+
+ :param doc_ids: A list of document identifiers or None for all.
+ :type doc_ids: list
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped, and 'None' will be
+ returned instead of True/False.
+ :type check_for_conflicts: bool
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :return: iterable giving the Document object for each document id
+ in matching doc_ids order.
+ :rtype: iterable
+ """
+ return self._database.get_docs(doc_ids, check_for_conflicts,
+ include_deleted)
+
+ def _prune_conflicts(self, doc, doc_vcr):
+ """
+ Prune conflicts that are older then the current document's revision, or
+ whose content match to the current document's content.
+ Originally in u1db.CommonBackend
+
+ :param doc: The document to have conflicts pruned.
+ :type doc: CouchDocument
+ :param doc_vcr: A vector clock representing the current document's
+ revision.
+ :type doc_vcr: u1db.vectorclock.VectorClock
+ """
+ if doc.has_conflicts:
+ autoresolved = False
+ c_revs_to_prune = []
+ for c_doc in doc._conflicts:
+ c_vcr = vectorclock.VectorClockRev(c_doc.rev)
+ if doc_vcr.is_newer(c_vcr):
+ c_revs_to_prune.append(c_doc.rev)
+ elif doc.same_content_as(c_doc):
+ c_revs_to_prune.append(c_doc.rev)
+ doc_vcr.maximize(c_vcr)
+ autoresolved = True
+ if autoresolved:
+ doc_vcr.increment(self._replica_uid)
+ doc.rev = doc_vcr.as_str()
+ doc.delete_conflicts(c_revs_to_prune)
+
+
+class SoledadSyncTarget(CommonSyncTarget):
+
+ """
+ Functionality for using a SoledadBackend as a synchronization target.
+ """
+
+ def get_sync_info(self, source_replica_uid):
+ source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id(
+ source_replica_uid)
+ my_gen, my_trans_id = self._db._get_generation_info()
+ return (
+ self._db._replica_uid, my_gen, my_trans_id, source_gen,
+ source_trans_id)
+
+ def record_sync_info(self, source_replica_uid, source_replica_generation,
+ source_replica_transaction_id):
+ if self._trace_hook:
+ self._trace_hook('record_sync_info')
+ self._db._set_replica_gen_and_trans_id(
+ source_replica_uid, source_replica_generation,
+ source_replica_transaction_id)
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
deleted file mode 100644
index d4f67696..00000000
--- a/common/src/leap/soledad/common/couch.py
+++ /dev/null
@@ -1,1514 +0,0 @@
-# -*- coding: utf-8 -*-
-# couch.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""A U1DB backend that uses CouchDB as its persistence layer."""
-
-
-import json
-import re
-import uuid
-import logging
-import binascii
-import time
-
-
-from StringIO import StringIO
-from urlparse import urljoin
-from contextlib import contextmanager
-from multiprocessing.pool import ThreadPool
-
-
-from couchdb.client import Server, Database
-from couchdb.multipart import MultipartWriter
-from couchdb.http import (
- ResourceConflict,
- ResourceNotFound,
- ServerError,
- Session,
- urljoin as couch_urljoin,
- Resource,
-)
-from u1db import vectorclock
-from u1db.errors import (
- DatabaseDoesNotExist,
- InvalidGeneration,
- RevisionConflict,
- InvalidDocId,
- ConflictedDoc,
- DocumentDoesNotExist,
- DocumentAlreadyDeleted,
- Unauthorized,
-)
-from u1db.backends import CommonBackend, CommonSyncTarget
-from u1db.remote import http_app
-from u1db.remote.server_state import ServerState
-
-
-from leap.soledad.common import ddocs
-from leap.soledad.common.errors import raise_server_error
-from leap.soledad.common.errors import raise_missing_design_doc_error
-from leap.soledad.common.errors import InvalidURLError
-from leap.soledad.common.command import exec_validated_cmd
-from leap.soledad.common.document import CouchDocument
-
-
-logger = logging.getLogger(__name__)
-
-
-COUCH_TIMEOUT = 120 # timeout for transfers between Soledad server and Couch
-
-
-def list_users_dbs(couch_url):
- """
- Retrieves a list with all databases that starts with 'user-' on CouchDB.
- Those databases belongs to users. So, the list will contain all the
- database names in the form of 'user-{uuid4}'.
-
- :param couch_url: The couch url with needed credentials
- :type couch_url: str
-
- :return: The list of all database names from users.
- :rtype: [str]
- """
- with couch_server(couch_url) as server:
- users = [dbname for dbname in server if dbname.startswith('user-')]
- return users
-
-# monkey-patch the u1db http app to use CouchDocument
-http_app.Document = CouchDocument
-
-
-@contextmanager
-def couch_server(url):
- """
- Provide a connection to a couch server and cleanup after use.
-
- For database creation and deletion we use an ephemeral connection to the
- couch server. That connection has to be properly closed, so we provide it
- as a context manager.
-
- :param url: The URL of the Couch server.
- :type url: str
- """
- session = Session(timeout=COUCH_TIMEOUT)
- server = Server(url=url, full_commit=False, session=session)
- yield server
-
-
-THREAD_POOL = ThreadPool(20)
-
-
-class SoledadBackend(CommonBackend):
-
- """
- A U1DB implementation that uses CouchDB as its persistence layer.
- """
-
- @classmethod
- def open_database(cls, url, create, replica_uid=None, ensure_ddocs=False,
- database_security=None):
- """
- Open a U1DB database using CouchDB as backend.
-
- :param url: the url of the database replica
- :type url: str
- :param create: should the replica be created if it does not exist?
- :type create: bool
- :param replica_uid: an optional unique replica identifier
- :type replica_uid: str
- :param ensure_ddocs: Ensure that the design docs exist on server.
- :type ensure_ddocs: bool
-
- :return: the database instance
- :rtype: SoledadBackend
- """
- db = CouchDatabase.open_database(url, create, ensure_ddocs)
- return cls(
- db, replica_uid=replica_uid)
-
- def __init__(self, database, replica_uid=None):
- """
- Create a new Couch data container.
-
- :param url: the url of the couch database
- :type url: str
- :param dbname: the database name
- :type dbname: str
- :param replica_uid: an optional unique replica identifier
- :type replica_uid: str
- :param ensure_ddocs: Ensure that the design docs exist on server.
- :type ensure_ddocs: bool
- """
- # save params
- self._factory = CouchDocument
- self._real_replica_uid = None
- # configure couch
- self._cache = None
- self._dbname = database._dbname
- self._database = database
- if replica_uid is not None:
- self._set_replica_uid(replica_uid)
-
- @property
- def cache(self):
- if self._cache is not None:
- return self._cache
- else:
- return {}
-
- def init_caching(self, cache):
- """
- Start using cache by setting internal _cache attribute.
-
- :param cache: the cache instance, anything that behaves like a dict
- :type cache: dict
- """
- self._cache = cache
-
- def get_sync_target(self):
- """
- Return a SyncTarget object, for another u1db to synchronize with.
-
- :return: The sync target.
- :rtype: CouchSyncTarget
- """
- return CouchSyncTarget(self)
-
- def delete_database(self):
- """
- Delete a U1DB CouchDB database.
- """
- self._database.delete_database()
-
- def close(self):
- """
- Release any resources associated with this database.
-
- :return: True if db was succesfully closed.
- :rtype: bool
- """
- self._database.close()
- return True
-
- def __del__(self):
- """
- Close the database upon garbage collection.
- """
- self.close()
-
- def _set_replica_uid(self, replica_uid):
- """
- Force the replica uid to be set.
-
- :param replica_uid: The new replica uid.
- :type replica_uid: str
- """
- self._database.set_replica_uid(replica_uid)
- self._real_replica_uid = replica_uid
- self.cache['replica_uid'] = self._real_replica_uid
-
- def _get_replica_uid(self):
- """
- Get the replica uid.
-
- :return: The replica uid.
- :rtype: str
- """
- if self._real_replica_uid is not None:
- self.cache['replica_uid'] = self._real_replica_uid
- return self._real_replica_uid
- if 'replica_uid' in self.cache:
- return self.cache['replica_uid']
- self._real_replica_uid = self._database.get_replica_uid()
- self._set_replica_uid(self._real_replica_uid)
- return self._real_replica_uid
-
- _replica_uid = property(_get_replica_uid, _set_replica_uid)
-
- replica_uid = property(_get_replica_uid)
-
- def _get_generation(self):
- """
- Return the current generation.
-
- :return: The current generation.
- :rtype: int
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- return self._get_generation_info()[0]
-
- def _get_generation_info(self):
- """
- Return the current generation.
-
- :return: A tuple containing the current generation and transaction id.
- :rtype: (int, str)
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- if self.replica_uid + '_gen' in self.cache:
- response = self.cache[self.replica_uid + '_gen']
- return response
- cur_gen, newest_trans_id = self._database._get_generation_info()
- self.cache[self.replica_uid + '_gen'] = (cur_gen, newest_trans_id)
- return (cur_gen, newest_trans_id)
-
- def _get_trans_id_for_gen(self, generation):
- """
- Get the transaction id corresponding to a particular generation.
-
- :param generation: The generation for which to get the transaction id.
- :type generation: int
-
- :return: The transaction id for C{generation}.
- :rtype: str
-
- """
- return self._database._get_trans_id_for_gen(generation)
-
- def _get_transaction_log(self):
- """
- This is only for the test suite, it is not part of the api.
-
- :return: The complete transaction log.
- :rtype: [(str, str)]
-
- """
- return self._database._get_transaction_log()
-
- def _get_doc(self, doc_id, check_for_conflicts=False):
- """
- Extract the document from storage.
-
- This can return None if the document doesn't exist.
-
- :param doc_id: The unique document identifier
- :type doc_id: str
- :param check_for_conflicts: If set to False, then the conflict check
- will be skipped.
- :type check_for_conflicts: bool
-
- :return: The document.
- :rtype: CouchDocument
- """
- return self._database._get_doc(doc_id, check_for_conflicts)
-
- def get_doc(self, doc_id, include_deleted=False):
- """
- Get the JSON string for the given document.
-
- :param doc_id: The unique document identifier
- :type doc_id: str
- :param include_deleted: If set to True, deleted documents will be
- returned with empty content. Otherwise asking for a deleted
- document will return None.
- :type include_deleted: bool
-
- :return: A document object.
- :rtype: CouchDocument.
- """
- doc = self._get_doc(doc_id, check_for_conflicts=True)
- if doc is None:
- return None
- if doc.is_tombstone() and not include_deleted:
- return None
- return doc
-
- def get_all_docs(self, include_deleted=False):
- """
- Get the JSON content for all documents in the database.
-
- :param include_deleted: If set to True, deleted documents will be
- returned with empty content. Otherwise deleted
- documents will not be included in the results.
- :type include_deleted: bool
-
- :return: (generation, [CouchDocument])
- The current generation of the database, followed by a list of all
- the documents in the database.
- :rtype: (int, [CouchDocument])
- """
- return self._database.get_all_docs(include_deleted)
-
- def _put_doc(self, old_doc, doc):
- """
- Put the document in the Couch backend database.
-
- Note that C{old_doc} must have been fetched with the parameter
- C{check_for_conflicts} equal to True, so we can properly update the
- new document using the conflict information from the old one.
-
- :param old_doc: The old document version.
- :type old_doc: CouchDocument
- :param doc: The document to be put.
- :type doc: CouchDocument
- """
- last_transaction =\
- self._database.save_document(old_doc, doc,
- self._allocate_transaction_id())
- if self.replica_uid + '_gen' in self.cache:
- gen, trans = self.cache[self.replica_uid + '_gen']
- gen += 1
- trans = last_transaction
- self.cache[self.replica_uid + '_gen'] = (gen, trans)
-
- def put_doc(self, doc):
- """
- Update a document.
-
- If the document currently has conflicts, put will fail.
- If the database specifies a maximum document size and the document
- exceeds it, put will fail and raise a DocumentTooBig exception.
-
- :param doc: A Document with new content.
- :return: new_doc_rev - The new revision identifier for the document.
- The Document object will also be updated.
-
- :raise InvalidDocId: Raised if the document's id is invalid.
- :raise DocumentTooBig: Raised if the document size is too big.
- :raise ConflictedDoc: Raised if the document has conflicts.
- """
- if doc.doc_id is None:
- raise InvalidDocId()
- self._check_doc_id(doc.doc_id)
- self._check_doc_size(doc)
- old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- if old_doc and old_doc.has_conflicts:
- raise ConflictedDoc()
- if old_doc and doc.rev is None and old_doc.is_tombstone():
- new_rev = self._allocate_doc_rev(old_doc.rev)
- else:
- if old_doc is not None:
- if old_doc.rev != doc.rev:
- raise RevisionConflict()
- else:
- if doc.rev is not None:
- raise RevisionConflict()
- new_rev = self._allocate_doc_rev(doc.rev)
- doc.rev = new_rev
- self._put_doc(old_doc, doc)
- return new_rev
-
- def whats_changed(self, old_generation=0):
- """
- Return a list of documents that have changed since old_generation.
-
- :param old_generation: The generation of the database in the old
- state.
- :type old_generation: int
-
- :return: (generation, trans_id, [(doc_id, generation, trans_id),...])
- The current generation of the database, its associated
- transaction id, and a list of of changed documents since
- old_generation, represented by tuples with for each document
- its doc_id and the generation and transaction id corresponding
- to the last intervening change and sorted by generation (old
- changes first)
- :rtype: (int, str, [(str, int, str)])
- """
- return self._database.whats_changed(old_generation)
-
- def delete_doc(self, doc):
- """
- Mark a document as deleted.
-
- Will abort if the current revision doesn't match doc.rev.
- This will also set doc.content to None.
-
- :param doc: The document to mark as deleted.
- :type doc: CouchDocument.
-
- :raise DocumentDoesNotExist: Raised if the document does not
- exist.
- :raise RevisionConflict: Raised if the revisions do not match.
- :raise DocumentAlreadyDeleted: Raised if the document is
- already deleted.
- :raise ConflictedDoc: Raised if the doc has conflicts.
- """
- old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- if old_doc is None:
- raise DocumentDoesNotExist
- if old_doc.rev != doc.rev:
- raise RevisionConflict()
- if old_doc.is_tombstone():
- raise DocumentAlreadyDeleted
- if old_doc.has_conflicts:
- raise ConflictedDoc()
- new_rev = self._allocate_doc_rev(doc.rev)
- doc.rev = new_rev
- doc.make_tombstone()
- self._put_doc(old_doc, doc)
- return new_rev
-
- def get_doc_conflicts(self, doc_id, couch_rev=None):
- """
- Get the conflicted versions of a document.
-
- If the C{couch_rev} parameter is not None, conflicts for a specific
- document's couch revision are returned.
-
- :param couch_rev: The couch document revision.
- :type couch_rev: str
-
- :return: A list of conflicted versions of the document.
- :rtype: list
- """
- return self._database.get_doc_conflicts(doc_id, couch_rev)
-
- def _get_replica_gen_and_trans_id(self, other_replica_uid):
- """
- Return the last known generation and transaction id for the other db
- replica.
-
- When you do a synchronization with another replica, the Database keeps
- track of what generation the other database replica was at, and what
- the associated transaction id was. This is used to determine what data
- needs to be sent, and if two databases are claiming to be the same
- replica.
-
- :param other_replica_uid: The identifier for the other replica.
- :type other_replica_uid: str
-
- :return: A tuple containing the generation and transaction id we
- encountered during synchronization. If we've never
- synchronized with the replica, this is (0, '').
- :rtype: (int, str)
- """
- return self._database._get_replica_gen_and_trans_id(other_replica_uid)
-
- def _set_replica_gen_and_trans_id(self, other_replica_uid,
- other_generation, other_transaction_id,
- number_of_docs=None, doc_idx=None,
- sync_id=None):
- """
- Set the last-known generation and transaction id for the other
- database replica.
-
- We have just performed some synchronization, and we want to track what
- generation the other replica was at. See also
- _get_replica_gen_and_trans_id.
-
- :param other_replica_uid: The U1DB identifier for the other replica.
- :type other_replica_uid: str
- :param other_generation: The generation number for the other replica.
- :type other_generation: int
- :param other_transaction_id: The transaction id associated with the
- generation.
- :type other_transaction_id: str
- :param number_of_docs: The total amount of documents sent on this sync
- session.
- :type number_of_docs: int
- :param doc_idx: The index of the current document being sent.
- :type doc_idx: int
- :param sync_id: The id of the current sync session.
- :type sync_id: str
- """
- if other_replica_uid is not None and other_generation is not None:
- self._do_set_replica_gen_and_trans_id(
- other_replica_uid, other_generation, other_transaction_id,
- number_of_docs=number_of_docs, doc_idx=doc_idx,
- sync_id=sync_id)
-
- def _do_set_replica_gen_and_trans_id(
- self, other_replica_uid, other_generation, other_transaction_id,
- number_of_docs=None, doc_idx=None, sync_id=None):
- """
- Set the last-known generation and transaction id for the other
- database replica.
-
- We have just performed some synchronization, and we want to track what
- generation the other replica was at. See also
- _get_replica_gen_and_trans_id.
-
- :param other_replica_uid: The U1DB identifier for the other replica.
- :type other_replica_uid: str
- :param other_generation: The generation number for the other replica.
- :type other_generation: int
- :param other_transaction_id: The transaction id associated with the
- generation.
- :type other_transaction_id: str
- :param number_of_docs: The total amount of documents sent on this sync
- session.
- :type number_of_docs: int
- :param doc_idx: The index of the current document being sent.
- :type doc_idx: int
- :param sync_id: The id of the current sync session.
- :type sync_id: str
- """
- self.cache[other_replica_uid] = (other_generation,
- other_transaction_id)
- self._database._do_set_replica_gen_and_trans_id(other_replica_uid,
- other_generation,
- other_transaction_id)
-
- def _force_doc_sync_conflict(self, doc):
- """
- Add a conflict and force a document put.
-
- :param doc: The document to be put.
- :type doc: CouchDocument
- """
- my_doc = self._get_doc(doc.doc_id)
- self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev))
- doc.add_conflict(self._factory(doc.doc_id, my_doc.rev,
- my_doc.get_json()))
- doc.has_conflicts = True
- self._put_doc(my_doc, doc)
-
- def resolve_doc(self, doc, conflicted_doc_revs):
- """
- Mark a document as no longer conflicted.
-
- We take the list of revisions that the client knows about that it is
- superseding. This may be a different list from the actual current
- conflicts, in which case only those are removed as conflicted. This
- may fail if the conflict list is significantly different from the
- supplied information. (sync could have happened in the background from
- the time you GET_DOC_CONFLICTS until the point where you RESOLVE)
-
- :param doc: A Document with the new content to be inserted.
- :type doc: CouchDocument
- :param conflicted_doc_revs: A list of revisions that the new content
- supersedes.
- :type conflicted_doc_revs: [str]
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- new_rev = self._ensure_maximal_rev(cur_doc.rev,
- conflicted_doc_revs)
- superseded_revs = set(conflicted_doc_revs)
- doc.rev = new_rev
- # this backend stores conflicts as properties of the documents, so we
- # have to copy these conflicts over to the document being updated.
- if cur_doc.rev in superseded_revs:
- # the newer doc version will supersede the one in the database, so
- # we copy conflicts before updating the backend.
- doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over.
- doc.delete_conflicts(superseded_revs)
- self._put_doc(cur_doc, doc)
- else:
- # the newer doc version does not supersede the one in the
- # database, so we will add a conflict to the database and copy
- # those over to the document the user has in her hands.
- cur_doc.add_conflict(doc)
- cur_doc.delete_conflicts(superseded_revs)
- self._put_doc(cur_doc, cur_doc) # just update conflicts
- # backend has been updated with current conflicts, now copy them
- # to the current document.
- doc.set_conflicts(cur_doc.get_conflicts())
-
- def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,
- replica_trans_id='', number_of_docs=None,
- doc_idx=None, sync_id=None):
- """
- Insert/update document into the database with a given revision.
-
- This api is used during synchronization operations.
-
- If a document would conflict and save_conflict is set to True, the
- content will be selected as the 'current' content for doc.doc_id,
- even though doc.rev doesn't supersede the currently stored revision.
- The currently stored document will be added to the list of conflict
- alternatives for the given doc_id.
-
- This forces the new content to be 'current' so that we get convergence
- after synchronizing, even if people don't resolve conflicts. Users can
- then notice that their content is out of date, update it, and
- synchronize again. (The alternative is that users could synchronize and
- think the data has propagated, but their local copy looks fine, and the
- remote copy is never updated again.)
-
- :param doc: A document object
- :type doc: CouchDocument
- :param save_conflict: If this document is a conflict, do you want to
- save it as a conflict, or just ignore it.
- :type save_conflict: bool
- :param replica_uid: A unique replica identifier.
- :type replica_uid: str
- :param replica_gen: The generation of the replica corresponding to the
- this document. The replica arguments are optional,
- but are used during synchronization.
- :type replica_gen: int
- :param replica_trans_id: The transaction_id associated with the
- generation.
- :type replica_trans_id: str
- :param number_of_docs: The total amount of documents sent on this sync
- session.
- :type number_of_docs: int
- :param doc_idx: The index of the current document being sent.
- :type doc_idx: int
- :param sync_id: The id of the current sync session.
- :type sync_id: str
-
- :return: (state, at_gen) - If we don't have doc_id already, or if
- doc_rev supersedes the existing document revision, then the
- content will be inserted, and state is 'inserted'. If
- doc_rev is less than or equal to the existing revision, then
- the put is ignored and state is respecitvely 'superseded' or
- 'converged'. If doc_rev is not strictly superseded or
- supersedes, then state is 'conflicted'. The document will not
- be inserted if save_conflict is False. For 'inserted' or
- 'converged', at_gen is the insertion/current generation.
- :rtype: (str, int)
- """
- if not isinstance(doc, CouchDocument):
- doc = self._factory(doc.doc_id, doc.rev, doc.get_json())
- my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- if my_doc:
- doc.set_conflicts(my_doc.get_conflicts())
- return CommonBackend._put_doc_if_newer(self, doc, save_conflict,
- replica_uid, replica_gen,
- replica_trans_id)
-
- def _put_and_update_indexes(self, cur_doc, doc):
- self._put_doc(cur_doc, doc)
-
- def get_docs(self, doc_ids, check_for_conflicts=True,
- include_deleted=False):
- """
- Get the JSON content for many documents.
-
- :param doc_ids: A list of document identifiers or None for all.
- :type doc_ids: list
- :param check_for_conflicts: If set to False, then the conflict check
- will be skipped, and 'None' will be
- returned instead of True/False.
- :type check_for_conflicts: bool
- :param include_deleted: If set to True, deleted documents will be
- returned with empty content. Otherwise deleted
- documents will not be included in the results.
- :return: iterable giving the Document object for each document id
- in matching doc_ids order.
- :rtype: iterable
- """
- return self._database.get_docs(doc_ids, check_for_conflicts,
- include_deleted)
-
- def _prune_conflicts(self, doc, doc_vcr):
- """
- Prune conflicts that are older then the current document's revision, or
- whose content match to the current document's content.
- Originally in u1db.CommonBackend
-
- :param doc: The document to have conflicts pruned.
- :type doc: CouchDocument
- :param doc_vcr: A vector clock representing the current document's
- revision.
- :type doc_vcr: u1db.vectorclock.VectorClock
- """
- if doc.has_conflicts:
- autoresolved = False
- c_revs_to_prune = []
- for c_doc in doc._conflicts:
- c_vcr = vectorclock.VectorClockRev(c_doc.rev)
- if doc_vcr.is_newer(c_vcr):
- c_revs_to_prune.append(c_doc.rev)
- elif doc.same_content_as(c_doc):
- c_revs_to_prune.append(c_doc.rev)
- doc_vcr.maximize(c_vcr)
- autoresolved = True
- if autoresolved:
- doc_vcr.increment(self._replica_uid)
- doc.rev = doc_vcr.as_str()
- doc.delete_conflicts(c_revs_to_prune)
-
-
-class CouchDatabase(object):
- """
- Holds CouchDB related code.
- This class gives methods to encapsulate database operations and hide
- CouchDB details from backend code.
- """
-
- @classmethod
- def open_database(cls, url, create, ensure_ddocs=False):
- """
- Open a U1DB database using CouchDB as backend.
-
- :param url: the url of the database replica
- :type url: str
- :param create: should the replica be created if it does not exist?
- :type create: bool
- :param replica_uid: an optional unique replica identifier
- :type replica_uid: str
- :param ensure_ddocs: Ensure that the design docs exist on server.
- :type ensure_ddocs: bool
-
- :return: the database instance
- :rtype: SoledadBackend
- """
- # get database from url
- m = re.match('(^https?://[^/]+)/(.+)$', url)
- if not m:
- raise InvalidURLError
- url = m.group(1)
- dbname = m.group(2)
- with couch_server(url) as server:
- try:
- server[dbname]
- except ResourceNotFound:
- if not create:
- raise DatabaseDoesNotExist()
- server.create(dbname)
- return cls(
- url, dbname, ensure_ddocs=ensure_ddocs)
-
- def __init__(self, url, dbname, ensure_ddocs=True,
- database_security=None):
- self._session = Session(timeout=COUCH_TIMEOUT)
- self._url = url
- self._dbname = dbname
- self._database = Database(
- urljoin(url, dbname),
- self._session)
- self._database.info()
- if ensure_ddocs:
- self.ensure_ddocs_on_db()
- self.ensure_security_ddoc(database_security)
-
- def ensure_ddocs_on_db(self):
- """
- Ensure that the design documents used by the backend exist on the
- couch database.
- """
- for ddoc_name in ['docs', 'syncs', 'transactions']:
- try:
- self._database.resource('_design',
- ddoc_name, '_info').get_json()
- except ResourceNotFound:
- ddoc = json.loads(
- binascii.a2b_base64(
- getattr(ddocs, ddoc_name)))
- self._database.save(ddoc)
-
- def ensure_security_ddoc(self, security_config=None):
- """
- Make sure that only soledad user is able to access this database as
- an unprivileged member, meaning that administration access will
- be forbidden even inside an user database.
- The goal is to make sure that only the lowest access level is given
- to the unprivileged CouchDB user set on the server process.
- This is achieved by creating a _security design document, see:
- http://docs.couchdb.org/en/latest/api/database/security.html
-
- :param database_security: security configuration parsed from conf file
- :type cache: dict
- """
- security_config = security_config or {}
- security = self._database.resource.get_json('_security')[2]
- security['members'] = {'names': [], 'roles': []}
- security['members']['names'] = security_config.get('members',
- ['soledad'])
- security['members']['roles'] = security_config.get('members_roles', [])
- security['admins'] = {'names': [], 'roles': []}
- security['admins']['names'] = security_config.get('admins', [])
- security['admins']['roles'] = security_config.get('admins_roles', [])
- self._database.resource.put_json('_security', body=security)
-
- def delete_database(self):
- """
- Delete a U1DB CouchDB database.
- """
- with couch_server(self._url) as server:
- del(server[self._dbname])
-
- def set_replica_uid(self, replica_uid):
- """
- Force the replica uid to be set.
-
- :param replica_uid: The new replica uid.
- :type replica_uid: str
- """
- try:
- # set on existent config document
- doc = self._database['u1db_config']
- doc['replica_uid'] = replica_uid
- except ResourceNotFound:
- # or create the config document
- doc = {
- '_id': 'u1db_config',
- 'replica_uid': replica_uid,
- }
- self._database.save(doc)
-
- def get_replica_uid(self):
- """
- Get the replica uid.
-
- :return: The replica uid.
- :rtype: str
- """
- try:
- # grab replica_uid from server
- doc = self._database['u1db_config']
- replica_uid = doc['replica_uid']
- return replica_uid
- except ResourceNotFound:
- # create a unique replica_uid
- replica_uid = uuid.uuid4().hex
- self.set_replica_uid(replica_uid)
- return replica_uid
-
- def close(self):
- self._database = None
-
- def get_all_docs(self, include_deleted=False):
- """
- Get the JSON content for all documents in the database.
-
- :param include_deleted: If set to True, deleted documents will be
- returned with empty content. Otherwise deleted
- documents will not be included in the results.
- :type include_deleted: bool
-
- :return: (generation, [CouchDocument])
- The current generation of the database, followed by a list of all
- the documents in the database.
- :rtype: (int, [CouchDocument])
- """
-
- generation, _ = self._get_generation_info()
- results = list(self.get_docs(self._database,
- include_deleted=include_deleted))
- return (generation, results)
-
- def get_docs(self, doc_ids, check_for_conflicts=True,
- include_deleted=False):
- """
- Get the JSON content for many documents.
-
- :param doc_ids: A list of document identifiers or None for all.
- :type doc_ids: list
- :param check_for_conflicts: If set to False, then the conflict check
- will be skipped, and 'None' will be
- returned instead of True/False.
- :type check_for_conflicts: bool
- :param include_deleted: If set to True, deleted documents will be
- returned with empty content. Otherwise deleted
- documents will not be included in the results.
- :return: iterable giving the Document object for each document id
- in matching doc_ids order.
- :rtype: iterable
- """
- # Workaround for:
- #
- # http://bugs.python.org/issue7980
- # https://leap.se/code/issues/5449
- #
- # python-couchdb uses time.strptime, which is not thread safe. In
- # order to avoid the problem described on the issues above, we preload
- # strptime here by evaluating the conversion of an arbitrary date.
- # This will not be needed when/if we switch from python-couchdb to
- # paisley.
- time.strptime('Mar 8 1917', '%b %d %Y')
- get_one = lambda doc_id: self._get_doc(doc_id, check_for_conflicts)
- docs = [THREAD_POOL.apply_async(get_one, [doc_id])
- for doc_id in doc_ids]
- for doc in docs:
- doc = doc.get()
- if not doc or not include_deleted and doc.is_tombstone():
- continue
- yield doc
-
- def _get_doc(self, doc_id, check_for_conflicts=False):
- """
- Extract the document from storage.
-
- This can return None if the document doesn't exist.
-
- :param doc_id: The unique document identifier
- :type doc_id: str
- :param check_for_conflicts: If set to False, then the conflict check
- will be skipped.
- :type check_for_conflicts: bool
-
- :return: The document.
- :rtype: CouchDocument
- """
- # get document with all attachments (u1db content and eventual
- # conflicts)
- try:
- result = \
- self._database.resource(doc_id).get_json(
- attachments=True)[2]
- except ResourceNotFound:
- return None
- return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts)
-
- def __parse_doc_from_couch(self, result, doc_id,
- check_for_conflicts=False):
- # restrict to u1db documents
- if 'u1db_rev' not in result:
- return None
- doc = CouchDocument(doc_id, result['u1db_rev'])
- # set contents or make tombstone
- if '_attachments' not in result \
- or 'u1db_content' not in result['_attachments']:
- doc.make_tombstone()
- else:
- doc.content = json.loads(
- binascii.a2b_base64(
- result['_attachments']['u1db_content']['data']))
- # determine if there are conflicts
- if check_for_conflicts \
- and '_attachments' in result \
- and 'u1db_conflicts' in result['_attachments']:
- doc.set_conflicts(
- self._build_conflicts(
- doc.doc_id,
- json.loads(binascii.a2b_base64(
- result['_attachments']['u1db_conflicts']['data']))))
- # store couch revision
- doc.couch_rev = result['_rev']
- # store transactions
- doc.transactions = result['u1db_transactions']
- return doc
-
- def _build_conflicts(self, doc_id, attached_conflicts):
- """
- Build the conflicted documents list from the conflicts attachment
- fetched from a couch document.
-
- :param attached_conflicts: The document's conflicts as fetched from a
- couch document attachment.
- :type attached_conflicts: dict
- """
- conflicts = []
- for doc_rev, content in attached_conflicts:
- doc = CouchDocument(doc_id, doc_rev)
- if content is None:
- doc.make_tombstone()
- else:
- doc.content = content
- conflicts.append(doc)
- return conflicts
-
- def _get_trans_id_for_gen(self, generation):
- """
- Get the transaction id corresponding to a particular generation.
-
- :param generation: The generation for which to get the transaction id.
- :type generation: int
-
- :return: The transaction id for C{generation}.
- :rtype: str
-
- :raise InvalidGeneration: Raised when the generation does not exist.
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- if generation == 0:
- return ''
- # query a couch list function
- ddoc_path = [
- '_design', 'transactions', '_list', 'trans_id_for_gen', 'log'
- ]
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json(gen=generation)
- if response[2] == {}:
- raise InvalidGeneration
- return response[2]['transaction_id']
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
- except ServerError as e:
- raise_server_error(e, ddoc_path)
-
- def _get_replica_gen_and_trans_id(self, other_replica_uid):
- """
- Return the last known generation and transaction id for the other db
- replica.
-
- When you do a synchronization with another replica, the Database keeps
- track of what generation the other database replica was at, and what
- the associated transaction id was. This is used to determine what data
- needs to be sent, and if two databases are claiming to be the same
- replica.
-
- :param other_replica_uid: The identifier for the other replica.
- :type other_replica_uid: str
-
- :return: A tuple containing the generation and transaction id we
- encountered during synchronization. If we've never
- synchronized with the replica, this is (0, '').
- :rtype: (int, str)
- """
- doc_id = 'u1db_sync_%s' % other_replica_uid
- try:
- doc = self._database[doc_id]
- except ResourceNotFound:
- doc = {
- '_id': doc_id,
- 'generation': 0,
- 'transaction_id': '',
- }
- self._database.save(doc)
- result = doc['generation'], doc['transaction_id']
- return result
-
- def get_doc_conflicts(self, doc_id, couch_rev=None):
- """
- Get the conflicted versions of a document.
-
- If the C{couch_rev} parameter is not None, conflicts for a specific
- document's couch revision are returned.
-
- :param couch_rev: The couch document revision.
- :type couch_rev: str
-
- :return: A list of conflicted versions of the document.
- :rtype: list
- """
- # request conflicts attachment from server
- params = {}
- conflicts = []
- if couch_rev is not None:
- params['rev'] = couch_rev # restric document's couch revision
- else:
- # TODO: move into resource logic!
- first_entry = self._get_doc(doc_id, check_for_conflicts=True)
- conflicts.append(first_entry)
- resource = self._database.resource(doc_id, 'u1db_conflicts')
- try:
- response = resource.get_json(**params)
- return conflicts + self._build_conflicts(
- doc_id, json.loads(response[2].read()))
- except ResourceNotFound:
- return []
-
- def _do_set_replica_gen_and_trans_id(
- self, other_replica_uid, other_generation, other_transaction_id,
- number_of_docs=None, doc_idx=None, sync_id=None):
- """
- Set the last-known generation and transaction id for the other
- database replica.
-
- We have just performed some synchronization, and we want to track what
- generation the other replica was at. See also
- _get_replica_gen_and_trans_id.
-
- :param other_replica_uid: The U1DB identifier for the other replica.
- :type other_replica_uid: str
- :param other_generation: The generation number for the other replica.
- :type other_generation: int
- :param other_transaction_id: The transaction id associated with the
- generation.
- :type other_transaction_id: str
- :param number_of_docs: The total amount of documents sent on this sync
- session.
- :type number_of_docs: int
- :param doc_idx: The index of the current document being sent.
- :type doc_idx: int
- :param sync_id: The id of the current sync session.
- :type sync_id: str
- """
- doc_id = 'u1db_sync_%s' % other_replica_uid
- try:
- doc = self._database[doc_id]
- except ResourceNotFound:
- doc = {'_id': doc_id}
- doc['generation'] = other_generation
- doc['transaction_id'] = other_transaction_id
- self._database.save(doc)
-
- def _get_transaction_log(self):
- """
- This is only for the test suite, it is not part of the api.
-
- :return: The complete transaction log.
- :rtype: [(str, str)]
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- # query a couch view
- ddoc_path = ['_design', 'transactions', '_view', 'log']
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json()
- return map(
- lambda row: (row['id'], row['value']),
- response[2]['rows'])
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
-
- def whats_changed(self, old_generation=0):
- """
- Return a list of documents that have changed since old_generation.
-
- :param old_generation: The generation of the database in the old
- state.
- :type old_generation: int
-
- :return: (generation, trans_id, [(doc_id, generation, trans_id),...])
- The current generation of the database, its associated
- transaction id, and a list of of changed documents since
- old_generation, represented by tuples with for each document
- its doc_id and the generation and transaction id corresponding
- to the last intervening change and sorted by generation (old
- changes first)
- :rtype: (int, str, [(str, int, str)])
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- # query a couch list function
- ddoc_path = [
- '_design', 'transactions', '_list', 'whats_changed', 'log'
- ]
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json(old_gen=old_generation)
- results = map(
- lambda row:
- (row['generation'], row['doc_id'], row['transaction_id']),
- response[2]['transactions'])
- results.reverse()
- cur_gen = old_generation
- seen = set()
- changes = []
- newest_trans_id = ''
- for generation, doc_id, trans_id in results:
- if doc_id not in seen:
- changes.append((doc_id, generation, trans_id))
- seen.add(doc_id)
- if changes:
- cur_gen = changes[0][1] # max generation
- newest_trans_id = changes[0][2]
- changes.reverse()
- else:
- cur_gen, newest_trans_id = self._get_generation_info()
-
- return cur_gen, newest_trans_id, changes
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
- except ServerError as e:
- raise_server_error(e, ddoc_path)
-
- def _get_generation_info(self):
- """
- Return the current generation.
-
- :return: A tuple containing the current generation and transaction id.
- :rtype: (int, str)
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- # query a couch list function
- ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json()
- return (response[2]['generation'], response[2]['transaction_id'])
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
- except ServerError as e:
- raise_server_error(e, ddoc_path)
-
- def save_document(self, old_doc, doc, transaction_id):
- """
- Put the document in the Couch backend database.
-
- Note that C{old_doc} must have been fetched with the parameter
- C{check_for_conflicts} equal to True, so we can properly update the
- new document using the conflict information from the old one.
-
- :param old_doc: The old document version.
- :type old_doc: CouchDocument
- :param doc: The document to be put.
- :type doc: CouchDocument
-
- :raise RevisionConflict: Raised when trying to update a document but
- couch revisions mismatch.
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- attachments = {} # we save content and conflicts as attachments
- parts = [] # and we put it using couch's multipart PUT
- # save content as attachment
- if doc.is_tombstone() is False:
- content = doc.get_json()
- attachments['u1db_content'] = {
- 'follows': True,
- 'content_type': 'application/octet-stream',
- 'length': len(content),
- }
- parts.append(content)
- # save conflicts as attachment
- if doc.has_conflicts is True:
- conflicts = json.dumps(
- map(lambda cdoc: (cdoc.rev, cdoc.content),
- doc.get_conflicts()))
- attachments['u1db_conflicts'] = {
- 'follows': True,
- 'content_type': 'application/octet-stream',
- 'length': len(conflicts),
- }
- parts.append(conflicts)
- # store old transactions, if any
- transactions = old_doc.transactions[:] if old_doc is not None else []
- # create a new transaction id and timestamp it so the transaction log
- # is consistent when querying the database.
- transactions.append(
- # here we store milliseconds to keep consistent with javascript
- # Date.prototype.getTime() which was used before inside a couchdb
- # update handler.
- (int(time.time() * 1000),
- transaction_id))
- # build the couch document
- couch_doc = {
- '_id': doc.doc_id,
- 'u1db_rev': doc.rev,
- 'u1db_transactions': transactions,
- '_attachments': attachments,
- }
- # if we are updating a doc we have to add the couch doc revision
- if old_doc is not None:
- couch_doc['_rev'] = old_doc.couch_rev
- # prepare the multipart PUT
- buf = StringIO()
- headers = {}
- envelope = MultipartWriter(buf, headers=headers, subtype='related')
- envelope.add('application/json', json.dumps(couch_doc))
- for part in parts:
- envelope.add('application/octet-stream', part)
- envelope.close()
- # try to save and fail if there's a revision conflict
- try:
- resource = self._new_resource()
- resource.put_json(
- doc.doc_id, body=str(buf.getvalue()), headers=headers)
- except ResourceConflict:
- raise RevisionConflict()
- return transactions[-1][1]
-
- def _new_resource(self, *path):
- """
- Return a new resource for accessing a couch database.
-
- :return: A resource for accessing a couch database.
- :rtype: couchdb.http.Resource
- """
- # Workaround for: https://leap.se/code/issues/5448
- url = couch_urljoin(self._database.resource.url, *path)
- resource = Resource(url, Session(timeout=COUCH_TIMEOUT))
- resource.credentials = self._database.resource.credentials
- resource.headers = self._database.resource.headers.copy()
- return resource
-
-
-class CouchSyncTarget(CommonSyncTarget):
-
- """
- Functionality for using a SoledadBackend as a synchronization target.
- """
-
- def get_sync_info(self, source_replica_uid):
- source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id(
- source_replica_uid)
- my_gen, my_trans_id = self._db._get_generation_info()
- return (
- self._db._replica_uid, my_gen, my_trans_id, source_gen,
- source_trans_id)
-
- def record_sync_info(self, source_replica_uid, source_replica_generation,
- source_replica_transaction_id):
- if self._trace_hook:
- self._trace_hook('record_sync_info')
- self._db._set_replica_gen_and_trans_id(
- source_replica_uid, source_replica_generation,
- source_replica_transaction_id)
-
-
-def is_db_name_valid(name):
- """
- Validate a user database using a regular expression.
-
- :param name: database name.
- :type name: str
-
- :return: boolean for name vailidity
- :rtype: bool
- """
- db_name_regex = "^user-[a-f0-9]+$"
- return re.match(db_name_regex, name) is not None
-
-
-class CouchServerState(ServerState):
-
- """
- Inteface of the WSGI server with the CouchDB backend.
- """
-
- def __init__(self, couch_url, create_cmd=None):
- """
- Initialize the couch server state.
-
- :param couch_url: The URL for the couch database.
- :type couch_url: str
- """
- self.couch_url = couch_url
- self.create_cmd = create_cmd
-
- def open_database(self, dbname):
- """
- Open a couch database.
-
- :param dbname: The name of the database to open.
- :type dbname: str
-
- :return: The SoledadBackend object.
- :rtype: SoledadBackend
- """
- url = urljoin(self.couch_url, dbname)
- db = SoledadBackend.open_database(url, create=False,
- ensure_ddocs=False)
- return db
-
- def ensure_database(self, dbname):
- """
- Ensure couch database exists.
-
- :param dbname: The name of the database to ensure.
- :type dbname: str
-
- :raise Unauthorized: If disabled or other error was raised.
-
- :return: The SoledadBackend object and its replica_uid.
- :rtype: (SoledadBackend, str)
- """
- if not self.create_cmd:
- raise Unauthorized()
- else:
- code, out = exec_validated_cmd(self.create_cmd, dbname,
- validator=is_db_name_valid)
- if code is not 0:
- logger.error("""
- Error while creating database (%s) with (%s) command.
- Output: %s
- Exit code: %d
- """ % (dbname, self.create_cmd, out, code))
- raise Unauthorized()
- db = self.open_database(dbname)
- return db, db.replica_uid
-
- def delete_database(self, dbname):
- """
- Delete couch database.
-
- :param dbname: The name of the database to delete.
- :type dbname: str
-
- :raise Unauthorized: Always, because Soledad server is not allowed to
- delete databases.
- """
- raise Unauthorized()
diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py
new file mode 100644
index 00000000..727f033f
--- /dev/null
+++ b/common/src/leap/soledad/common/couch/__init__.py
@@ -0,0 +1,746 @@
+# -*- coding: utf-8 -*-
+# couch.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""A U1DB backend that uses CouchDB as its persistence layer."""
+
+
+import json
+import re
+import uuid
+import binascii
+import time
+
+
+from StringIO import StringIO
+from urlparse import urljoin
+from contextlib import contextmanager
+from multiprocessing.pool import ThreadPool
+
+
+from couchdb.client import Server, Database
+from couchdb.multipart import MultipartWriter
+from couchdb.http import (
+ ResourceConflict,
+ ResourceNotFound,
+ ServerError,
+ Session,
+ urljoin as couch_urljoin,
+ Resource,
+)
+from u1db.errors import (
+ DatabaseDoesNotExist,
+ InvalidGeneration,
+ RevisionConflict,
+)
+from u1db.remote import http_app
+
+
+from leap.soledad.common import ddocs
+from leap.soledad.common.errors import raise_server_error
+from leap.soledad.common.errors import raise_missing_design_doc_error
+from leap.soledad.common.errors import InvalidURLError
+from leap.soledad.common.document import CouchDocument
+from leap.soledad.common.backend import SoledadBackend
+
+
+COUCH_TIMEOUT = 120 # timeout for transfers between Soledad server and Couch
+
+
+def list_users_dbs(couch_url):
+ """
+ Retrieves a list with all databases that starts with 'user-' on CouchDB.
+ Those databases belongs to users. So, the list will contain all the
+ database names in the form of 'user-{uuid4}'.
+
+ :param couch_url: The couch url with needed credentials
+ :type couch_url: str
+
+ :return: The list of all database names from users.
+ :rtype: [str]
+ """
+ with couch_server(couch_url) as server:
+ users = [dbname for dbname in server if dbname.startswith('user-')]
+ return users
+
+
+# monkey-patch the u1db http app to use CouchDocument
+http_app.Document = CouchDocument
+
+
+@contextmanager
+def couch_server(url):
+ """
+ Provide a connection to a couch server and cleanup after use.
+
+ For database creation and deletion we use an ephemeral connection to the
+ couch server. That connection has to be properly closed, so we provide it
+ as a context manager.
+
+ :param url: The URL of the Couch server.
+ :type url: str
+ """
+ session = Session(timeout=COUCH_TIMEOUT)
+ server = Server(url=url, full_commit=False, session=session)
+ yield server
+
+
+THREAD_POOL = ThreadPool(20)
+
+
+class CouchDatabase(object):
+ """
+ Holds CouchDB related code.
+ This class gives methods to encapsulate database operations and hide
+ CouchDB details from backend code.
+ """
+
+ @classmethod
+ def open_database(cls, url, create, ensure_ddocs=False, replica_uid=None,
+ database_security=None):
+ """
+ Open a U1DB database using CouchDB as backend.
+
+ :param url: the url of the database replica
+ :type url: str
+ :param create: should the replica be created if it does not exist?
+ :type create: bool
+ :param replica_uid: an optional unique replica identifier
+ :type replica_uid: str
+ :param ensure_ddocs: Ensure that the design docs exist on server.
+ :type ensure_ddocs: bool
+
+ :return: the database instance
+ :rtype: SoledadBackend
+ """
+ # get database from url
+ m = re.match('(^https?://[^/]+)/(.+)$', url)
+ if not m:
+ raise InvalidURLError
+ url = m.group(1)
+ dbname = m.group(2)
+ with couch_server(url) as server:
+ try:
+ server[dbname]
+ except ResourceNotFound:
+ if not create:
+ raise DatabaseDoesNotExist()
+ server.create(dbname)
+ db = cls(url,
+ dbname, ensure_ddocs=ensure_ddocs,
+ database_security=database_security)
+ return SoledadBackend(
+ db, replica_uid=replica_uid)
+
+ def __init__(self, url, dbname, ensure_ddocs=True,
+ database_security=None):
+ self._session = Session(timeout=COUCH_TIMEOUT)
+ self._url = url
+ self._dbname = dbname
+ self._database = Database(
+ urljoin(url, dbname),
+ self._session)
+ self._database.info()
+ if ensure_ddocs:
+ self.ensure_ddocs_on_db()
+ self.ensure_security_ddoc(database_security)
+
+ def ensure_ddocs_on_db(self):
+ """
+ Ensure that the design documents used by the backend exist on the
+ couch database.
+ """
+ for ddoc_name in ['docs', 'syncs', 'transactions']:
+ try:
+ self._database.resource('_design',
+ ddoc_name, '_info').get_json()
+ except ResourceNotFound:
+ ddoc = json.loads(
+ binascii.a2b_base64(
+ getattr(ddocs, ddoc_name)))
+ self._database.save(ddoc)
+
+ def ensure_security_ddoc(self, security_config=None):
+ """
+ Make sure that only soledad user is able to access this database as
+ an unprivileged member, meaning that administration access will
+ be forbidden even inside an user database.
+ The goal is to make sure that only the lowest access level is given
+ to the unprivileged CouchDB user set on the server process.
+ This is achieved by creating a _security design document, see:
+ http://docs.couchdb.org/en/latest/api/database/security.html
+
+ :param database_security: security configuration parsed from conf file
+ :type cache: dict
+ """
+ security_config = security_config or {}
+ security = self._database.resource.get_json('_security')[2]
+ security['members'] = {'names': [], 'roles': []}
+ security['members']['names'] = security_config.get('members',
+ ['soledad'])
+ security['members']['roles'] = security_config.get('members_roles', [])
+ security['admins'] = {'names': [], 'roles': []}
+ security['admins']['names'] = security_config.get('admins', [])
+ security['admins']['roles'] = security_config.get('admins_roles', [])
+ self._database.resource.put_json('_security', body=security)
+
+ def delete_database(self):
+ """
+ Delete a U1DB CouchDB database.
+ """
+ with couch_server(self._url) as server:
+ del(server[self._dbname])
+
+ def set_replica_uid(self, replica_uid):
+ """
+ Force the replica uid to be set.
+
+ :param replica_uid: The new replica uid.
+ :type replica_uid: str
+ """
+ try:
+ # set on existent config document
+ doc = self._database['u1db_config']
+ doc['replica_uid'] = replica_uid
+ except ResourceNotFound:
+ # or create the config document
+ doc = {
+ '_id': 'u1db_config',
+ 'replica_uid': replica_uid,
+ }
+ self._database.save(doc)
+
+ def get_replica_uid(self):
+ """
+ Get the replica uid.
+
+ :return: The replica uid.
+ :rtype: str
+ """
+ try:
+ # grab replica_uid from server
+ doc = self._database['u1db_config']
+ replica_uid = doc['replica_uid']
+ return replica_uid
+ except ResourceNotFound:
+ # create a unique replica_uid
+ replica_uid = uuid.uuid4().hex
+ self.set_replica_uid(replica_uid)
+ return replica_uid
+
+ def close(self):
+ self._database = None
+
+ def get_all_docs(self, include_deleted=False):
+ """
+ Get the JSON content for all documents in the database.
+
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :type include_deleted: bool
+
+ :return: (generation, [CouchDocument])
+ The current generation of the database, followed by a list of all
+ the documents in the database.
+ :rtype: (int, [CouchDocument])
+ """
+
+ generation, _ = self._get_generation_info()
+ results = list(self.get_docs(self._database,
+ include_deleted=include_deleted))
+ return (generation, results)
+
+ def get_docs(self, doc_ids, check_for_conflicts=True,
+ include_deleted=False):
+ """
+ Get the JSON content for many documents.
+
+ :param doc_ids: A list of document identifiers or None for all.
+ :type doc_ids: list
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped, and 'None' will be
+ returned instead of True/False.
+ :type check_for_conflicts: bool
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :return: iterable giving the Document object for each document id
+ in matching doc_ids order.
+ :rtype: iterable
+ """
+ # Workaround for:
+ #
+ # http://bugs.python.org/issue7980
+ # https://leap.se/code/issues/5449
+ #
+ # python-couchdb uses time.strptime, which is not thread safe. In
+ # order to avoid the problem described on the issues above, we preload
+ # strptime here by evaluating the conversion of an arbitrary date.
+ # This will not be needed when/if we switch from python-couchdb to
+ # paisley.
+ time.strptime('Mar 8 1917', '%b %d %Y')
+ get_one = lambda doc_id: self._get_doc(doc_id, check_for_conflicts)
+ docs = [THREAD_POOL.apply_async(get_one, [doc_id])
+ for doc_id in doc_ids]
+ for doc in docs:
+ doc = doc.get()
+ if not doc or not include_deleted and doc.is_tombstone():
+ continue
+ yield doc
+
+ def _get_doc(self, doc_id, check_for_conflicts=False):
+ """
+ Extract the document from storage.
+
+ This can return None if the document doesn't exist.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped.
+ :type check_for_conflicts: bool
+
+ :return: The document.
+ :rtype: CouchDocument
+ """
+ # get document with all attachments (u1db content and eventual
+ # conflicts)
+ try:
+ result = \
+ self._database.resource(doc_id).get_json(
+ attachments=True)[2]
+ except ResourceNotFound:
+ return None
+ return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts)
+
+ def __parse_doc_from_couch(self, result, doc_id,
+ check_for_conflicts=False):
+ # restrict to u1db documents
+ if 'u1db_rev' not in result:
+ return None
+ doc = CouchDocument(doc_id, result['u1db_rev'])
+ # set contents or make tombstone
+ if '_attachments' not in result \
+ or 'u1db_content' not in result['_attachments']:
+ doc.make_tombstone()
+ else:
+ doc.content = json.loads(
+ binascii.a2b_base64(
+ result['_attachments']['u1db_content']['data']))
+ # determine if there are conflicts
+ if check_for_conflicts \
+ and '_attachments' in result \
+ and 'u1db_conflicts' in result['_attachments']:
+ doc.set_conflicts(
+ self._build_conflicts(
+ doc.doc_id,
+ json.loads(binascii.a2b_base64(
+ result['_attachments']['u1db_conflicts']['data']))))
+ # store couch revision
+ doc.couch_rev = result['_rev']
+ # store transactions
+ doc.transactions = result['u1db_transactions']
+ return doc
+
+ def _build_conflicts(self, doc_id, attached_conflicts):
+ """
+ Build the conflicted documents list from the conflicts attachment
+ fetched from a couch document.
+
+ :param attached_conflicts: The document's conflicts as fetched from a
+ couch document attachment.
+ :type attached_conflicts: dict
+ """
+ conflicts = []
+ for doc_rev, content in attached_conflicts:
+ doc = CouchDocument(doc_id, doc_rev)
+ if content is None:
+ doc.make_tombstone()
+ else:
+ doc.content = content
+ conflicts.append(doc)
+ return conflicts
+
+ def _get_trans_id_for_gen(self, generation):
+ """
+ Get the transaction id corresponding to a particular generation.
+
+ :param generation: The generation for which to get the transaction id.
+ :type generation: int
+
+ :return: The transaction id for C{generation}.
+ :rtype: str
+
+ :raise InvalidGeneration: Raised when the generation does not exist.
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ if generation == 0:
+ return ''
+ # query a couch list function
+ ddoc_path = [
+ '_design', 'transactions', '_list', 'trans_id_for_gen', 'log'
+ ]
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json(gen=generation)
+ if response[2] == {}:
+ raise InvalidGeneration
+ return response[2]['transaction_id']
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
+
+ def _get_replica_gen_and_trans_id(self, other_replica_uid):
+ """
+ Return the last known generation and transaction id for the other db
+ replica.
+
+ When you do a synchronization with another replica, the Database keeps
+ track of what generation the other database replica was at, and what
+ the associated transaction id was. This is used to determine what data
+ needs to be sent, and if two databases are claiming to be the same
+ replica.
+
+ :param other_replica_uid: The identifier for the other replica.
+ :type other_replica_uid: str
+
+ :return: A tuple containing the generation and transaction id we
+ encountered during synchronization. If we've never
+ synchronized with the replica, this is (0, '').
+ :rtype: (int, str)
+ """
+ doc_id = 'u1db_sync_%s' % other_replica_uid
+ try:
+ doc = self._database[doc_id]
+ except ResourceNotFound:
+ doc = {
+ '_id': doc_id,
+ 'generation': 0,
+ 'transaction_id': '',
+ }
+ self._database.save(doc)
+ result = doc['generation'], doc['transaction_id']
+ return result
+
+ def get_doc_conflicts(self, doc_id, couch_rev=None):
+ """
+ Get the conflicted versions of a document.
+
+ If the C{couch_rev} parameter is not None, conflicts for a specific
+ document's couch revision are returned.
+
+ :param couch_rev: The couch document revision.
+ :type couch_rev: str
+
+ :return: A list of conflicted versions of the document.
+ :rtype: list
+ """
+ # request conflicts attachment from server
+ params = {}
+ conflicts = []
+ if couch_rev is not None:
+ params['rev'] = couch_rev # restric document's couch revision
+ else:
+ # TODO: move into resource logic!
+ first_entry = self._get_doc(doc_id, check_for_conflicts=True)
+ conflicts.append(first_entry)
+ resource = self._database.resource(doc_id, 'u1db_conflicts')
+ try:
+ response = resource.get_json(**params)
+ return conflicts + self._build_conflicts(
+ doc_id, json.loads(response[2].read()))
+ except ResourceNotFound:
+ return []
+
+ def _do_set_replica_gen_and_trans_id(
+ self, other_replica_uid, other_generation, other_transaction_id,
+ number_of_docs=None, doc_idx=None, sync_id=None):
+ """
+ Set the last-known generation and transaction id for the other
+ database replica.
+
+ We have just performed some synchronization, and we want to track what
+ generation the other replica was at. See also
+ _get_replica_gen_and_trans_id.
+
+ :param other_replica_uid: The U1DB identifier for the other replica.
+ :type other_replica_uid: str
+ :param other_generation: The generation number for the other replica.
+ :type other_generation: int
+ :param other_transaction_id: The transaction id associated with the
+ generation.
+ :type other_transaction_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
+ """
+ doc_id = 'u1db_sync_%s' % other_replica_uid
+ try:
+ doc = self._database[doc_id]
+ except ResourceNotFound:
+ doc = {'_id': doc_id}
+ doc['generation'] = other_generation
+ doc['transaction_id'] = other_transaction_id
+ self._database.save(doc)
+
+ def _get_transaction_log(self):
+ """
+ This is only for the test suite, it is not part of the api.
+
+ :return: The complete transaction log.
+ :rtype: [(str, str)]
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ # query a couch view
+ ddoc_path = ['_design', 'transactions', '_view', 'log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json()
+ return map(
+ lambda row: (row['id'], row['value']),
+ response[2]['rows'])
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+
+ def whats_changed(self, old_generation=0):
+ """
+ Return a list of documents that have changed since old_generation.
+
+ :param old_generation: The generation of the database in the old
+ state.
+ :type old_generation: int
+
+ :return: (generation, trans_id, [(doc_id, generation, trans_id),...])
+ The current generation of the database, its associated
+ transaction id, and a list of of changed documents since
+ old_generation, represented by tuples with for each document
+ its doc_id and the generation and transaction id corresponding
+ to the last intervening change and sorted by generation (old
+ changes first)
+ :rtype: (int, str, [(str, int, str)])
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ # query a couch list function
+ ddoc_path = [
+ '_design', 'transactions', '_list', 'whats_changed', 'log'
+ ]
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json(old_gen=old_generation)
+ results = map(
+ lambda row:
+ (row['generation'], row['doc_id'], row['transaction_id']),
+ response[2]['transactions'])
+ results.reverse()
+ cur_gen = old_generation
+ seen = set()
+ changes = []
+ newest_trans_id = ''
+ for generation, doc_id, trans_id in results:
+ if doc_id not in seen:
+ changes.append((doc_id, generation, trans_id))
+ seen.add(doc_id)
+ if changes:
+ cur_gen = changes[0][1] # max generation
+ newest_trans_id = changes[0][2]
+ changes.reverse()
+ else:
+ cur_gen, newest_trans_id = self._get_generation_info()
+
+ return cur_gen, newest_trans_id, changes
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
+
+ def _get_generation_info(self):
+ """
+ Return the current generation.
+
+ :return: A tuple containing the current generation and transaction id.
+ :rtype: (int, str)
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ # query a couch list function
+ ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
+ res = self._database.resource(*ddoc_path)
+ try:
+ response = res.get_json()
+ return (response[2]['generation'], response[2]['transaction_id'])
+ except ResourceNotFound as e:
+ raise_missing_design_doc_error(e, ddoc_path)
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
+
+ def save_document(self, old_doc, doc, transaction_id):
+ """
+ Put the document in the Couch backend database.
+
+ Note that C{old_doc} must have been fetched with the parameter
+ C{check_for_conflicts} equal to True, so we can properly update the
+ new document using the conflict information from the old one.
+
+ :param old_doc: The old document version.
+ :type old_doc: CouchDocument
+ :param doc: The document to be put.
+ :type doc: CouchDocument
+
+ :raise RevisionConflict: Raised when trying to update a document but
+ couch revisions mismatch.
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ attachments = {} # we save content and conflicts as attachments
+ parts = [] # and we put it using couch's multipart PUT
+ # save content as attachment
+ if doc.is_tombstone() is False:
+ content = doc.get_json()
+ attachments['u1db_content'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(content),
+ }
+ parts.append(content)
+ # save conflicts as attachment
+ if doc.has_conflicts is True:
+ conflicts = json.dumps(
+ map(lambda cdoc: (cdoc.rev, cdoc.content),
+ doc.get_conflicts()))
+ attachments['u1db_conflicts'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(conflicts),
+ }
+ parts.append(conflicts)
+ # store old transactions, if any
+ transactions = old_doc.transactions[:] if old_doc is not None else []
+ # create a new transaction id and timestamp it so the transaction log
+ # is consistent when querying the database.
+ transactions.append(
+ # here we store milliseconds to keep consistent with javascript
+ # Date.prototype.getTime() which was used before inside a couchdb
+ # update handler.
+ (int(time.time() * 1000),
+ transaction_id))
+ # build the couch document
+ couch_doc = {
+ '_id': doc.doc_id,
+ 'u1db_rev': doc.rev,
+ 'u1db_transactions': transactions,
+ '_attachments': attachments,
+ }
+ # if we are updating a doc we have to add the couch doc revision
+ if old_doc is not None:
+ couch_doc['_rev'] = old_doc.couch_rev
+ # prepare the multipart PUT
+ buf = StringIO()
+ headers = {}
+ envelope = MultipartWriter(buf, headers=headers, subtype='related')
+ envelope.add('application/json', json.dumps(couch_doc))
+ for part in parts:
+ envelope.add('application/octet-stream', part)
+ envelope.close()
+ # try to save and fail if there's a revision conflict
+ try:
+ resource = self._new_resource()
+ resource.put_json(
+ doc.doc_id, body=str(buf.getvalue()), headers=headers)
+ except ResourceConflict:
+ raise RevisionConflict()
+ return transactions[-1][1]
+
+ def _new_resource(self, *path):
+ """
+ Return a new resource for accessing a couch database.
+
+ :return: A resource for accessing a couch database.
+ :rtype: couchdb.http.Resource
+ """
+ # Workaround for: https://leap.se/code/issues/5448
+ url = couch_urljoin(self._database.resource.url, *path)
+ resource = Resource(url, Session(timeout=COUCH_TIMEOUT))
+ resource.credentials = self._database.resource.credentials
+ resource.headers = self._database.resource.headers.copy()
+ return resource
diff --git a/common/src/leap/soledad/common/couch/state.py b/common/src/leap/soledad/common/couch/state.py
new file mode 100644
index 00000000..39d49fa0
--- /dev/null
+++ b/common/src/leap/soledad/common/couch/state.py
@@ -0,0 +1,114 @@
+# -*- coding: utf-8 -*-
+# state.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Server state using CouchDatabase as backend.
+"""
+import re
+import logging
+from urlparse import urljoin
+
+from u1db.remote.server_state import ServerState
+from leap.soledad.common.command import exec_validated_cmd
+from leap.soledad.common.couch import CouchDatabase
+from u1db.errors import Unauthorized
+
+
+logger = logging.getLogger(__name__)
+
+
+def is_db_name_valid(name):
+ """
+ Validate a user database using a regular expression.
+
+ :param name: database name.
+ :type name: str
+
+ :return: boolean for name vailidity
+ :rtype: bool
+ """
+ db_name_regex = "^user-[a-f0-9]+$"
+ return re.match(db_name_regex, name) is not None
+
+
+class CouchServerState(ServerState):
+
+ """
+ Inteface of the WSGI server with the CouchDB backend.
+ """
+
+ def __init__(self, couch_url, create_cmd=None):
+ """
+ Initialize the couch server state.
+
+ :param couch_url: The URL for the couch database.
+ :type couch_url: str
+ """
+ self.couch_url = couch_url
+ self.create_cmd = create_cmd
+
+ def open_database(self, dbname):
+ """
+ Open a couch database.
+
+ :param dbname: The name of the database to open.
+ :type dbname: str
+
+ :return: The SoledadBackend object.
+ :rtype: SoledadBackend
+ """
+ url = urljoin(self.couch_url, dbname)
+ db = CouchDatabase.open_database(url, create=False, ensure_ddocs=False)
+ return db
+
+ def ensure_database(self, dbname):
+ """
+ Ensure couch database exists.
+
+ :param dbname: The name of the database to ensure.
+ :type dbname: str
+
+ :raise Unauthorized: If disabled or other error was raised.
+
+ :return: The SoledadBackend object and its replica_uid.
+ :rtype: (SoledadBackend, str)
+ """
+ if not self.create_cmd:
+ raise Unauthorized()
+ else:
+ code, out = exec_validated_cmd(self.create_cmd, dbname,
+ validator=is_db_name_valid)
+ if code is not 0:
+ logger.error("""
+ Error while creating database (%s) with (%s) command.
+ Output: %s
+ Exit code: %d
+ """ % (dbname, self.create_cmd, out, code))
+ raise Unauthorized()
+ db = self.open_database(dbname)
+ return db, db.replica_uid
+
+ def delete_database(self, dbname):
+ """
+ Delete couch database.
+
+ :param dbname: The name of the database to delete.
+ :type dbname: str
+
+ :raise Unauthorized: Always, because Soledad server is not allowed to
+ delete databases.
+ """
+ raise Unauthorized()
diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py
index ea9d6ce4..84d8d813 100644
--- a/common/src/leap/soledad/common/errors.py
+++ b/common/src/leap/soledad/common/errors.py
@@ -266,4 +266,4 @@ def raise_server_error(exc, ddoc_path):
if 'point is undefined' in exc.message[1][1]:
raise MissingDesignDocListFunctionError
# other errors are unknown for now
- raise DesignDocUnknownError(path)
+ raise DesignDocUnknownError("%s: %s" % (path, str(exc.message)))
diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py
index b3536a6a..1634d30c 100644
--- a/common/src/leap/soledad/common/tests/test_couch.py
+++ b/common/src/leap/soledad/common/tests/test_couch.py
@@ -36,6 +36,7 @@ from u1db import SyncTarget
from u1db import vectorclock
from leap.soledad.common import couch
+from leap.soledad.common import backend
from leap.soledad.common import errors
from leap.soledad.common.tests import u1db_tests as tests
@@ -56,7 +57,7 @@ from u1db.backends.inmemory import InMemoryIndex
class TestCouchBackendImpl(CouchDBTestCase):
def test__allocate_doc_id(self):
- db = couch.SoledadBackend.open_database(
+ db = couch.CouchDatabase.open_database(
urljoin(
'http://localhost:' + str(self.couch_port),
('test-%s' % uuid4().hex)
@@ -78,7 +79,7 @@ class TestCouchBackendImpl(CouchDBTestCase):
def make_couch_database_for_test(test, replica_uid):
port = str(test.couch_port)
dbname = ('test-%s' % uuid4().hex)
- db = couch.SoledadBackend.open_database(
+ db = couch.CouchDatabase.open_database(
urljoin('http://localhost:' + port, dbname),
create=True,
replica_uid=replica_uid or 'test',
@@ -91,7 +92,7 @@ def copy_couch_database_for_test(test, db):
port = str(test.couch_port)
couch_url = 'http://localhost:' + port
new_dbname = db._dbname + '_copy'
- new_db = couch.SoledadBackend.open_database(
+ new_db = couch.CouchDatabase.open_database(
urljoin(couch_url, new_dbname),
create=True,
replica_uid=db._replica_uid or 'test')
@@ -529,89 +530,6 @@ class SoledadBackendSyncTargetTests(
self.st.record_sync_info('replica', 0, 'T-sid')
self.assertEqual(expected, called)
-
-# The following tests need that the database have an index, so we fake one.
-
-class IndexedSoledadBackend(couch.SoledadBackend):
-
- def __init__(self, db, replica_uid=None):
- old_class.__init__(self, db, replica_uid=replica_uid)
- self._indexes = {}
-
- def _put_doc(self, old_doc, doc):
- for index in self._indexes.itervalues():
- if old_doc is not None and not old_doc.is_tombstone():
- index.remove_json(old_doc.doc_id, old_doc.get_json())
- if not doc.is_tombstone():
- index.add_json(doc.doc_id, doc.get_json())
- old_class._put_doc(self, old_doc, doc)
-
- def create_index(self, index_name, *index_expressions):
- if index_name in self._indexes:
- if self._indexes[index_name]._definition == list(
- index_expressions):
- return
- raise u1db_errors.IndexNameTakenError
- index = InMemoryIndex(index_name, list(index_expressions))
- _, all_docs = self.get_all_docs()
- for doc in all_docs:
- index.add_json(doc.doc_id, doc.get_json())
- self._indexes[index_name] = index
-
- def delete_index(self, index_name):
- del self._indexes[index_name]
-
- def list_indexes(self):
- definitions = []
- for idx in self._indexes.itervalues():
- definitions.append((idx._name, idx._definition))
- return definitions
-
- def get_from_index(self, index_name, *key_values):
- try:
- index = self._indexes[index_name]
- except KeyError:
- raise u1db_errors.IndexDoesNotExist
- doc_ids = index.lookup(key_values)
- result = []
- for doc_id in doc_ids:
- result.append(self._get_doc(doc_id, check_for_conflicts=True))
- return result
-
- def get_range_from_index(self, index_name, start_value=None,
- end_value=None):
- """Return all documents with key values in the specified range."""
- try:
- index = self._indexes[index_name]
- except KeyError:
- raise u1db_errors.IndexDoesNotExist
- if isinstance(start_value, basestring):
- start_value = (start_value,)
- if isinstance(end_value, basestring):
- end_value = (end_value,)
- doc_ids = index.lookup_range(start_value, end_value)
- result = []
- for doc_id in doc_ids:
- result.append(self._get_doc(doc_id, check_for_conflicts=True))
- return result
-
- def get_index_keys(self, index_name):
- try:
- index = self._indexes[index_name]
- except KeyError:
- raise u1db_errors.IndexDoesNotExist
- keys = index.keys()
- # XXX inefficiency warning
- return list(set([tuple(key.split('\x01')) for key in keys]))
-
-
-# monkey patch SoledadBackend (once) to include virtual indexes
-if getattr(couch.SoledadBackend, '_old_class', None) is None:
- old_class = couch.SoledadBackend
- IndexedSoledadBackend._old_class = old_class
- couch.SoledadBackend = IndexedSoledadBackend
-
-
sync_scenarios = []
for name, scenario in COUCH_SCENARIOS:
scenario = dict(scenario)
@@ -921,7 +839,6 @@ class SoledadBackendSyncTests(
self.db1 = self.create_database('test1', 'source')
self.db2 = self.create_database('test2', 'target')
doc = self.db2.create_doc_from_json(simple_doc)
- self.db1.create_index('test-idx', 'key')
self.assertEqual(0, self.sync(self.db1, self.db2))
self.assertGetDoc(self.db1, doc.doc_id, doc.rev, simple_doc, False)
self.assertEqual(1, self.db1._get_replica_gen_and_trans_id('test2')[0])
@@ -931,7 +848,7 @@ class SoledadBackendSyncTests(
{'receive': {'docs': [], 'last_known_gen': 0},
'return': {'docs': [(doc.doc_id, doc.rev)],
'last_gen': 1}})
- self.assertEqual([doc], self.db1.get_from_index('test-idx', 'value'))
+ self.assertGetDoc(self.db2, doc.doc_id, doc.rev, simple_doc, False)
def test_sync_pulling_doesnt_update_other_if_changed(self):
self.db1 = self.create_database('test1', 'source')
@@ -1020,7 +937,7 @@ class SoledadBackendSyncTests(
doc1 = self.db1.create_doc_from_json(simple_doc)
doc_id = doc1.doc_id
doc1_rev = doc1.rev
- self.db1.create_index('test-idx', 'key')
+ # self.db1.create_index('test-idx', 'key')
new_doc = '{"key": "altval"}'
doc2 = self.db2.create_doc_from_json(new_doc, doc_id=doc_id)
doc2_rev = doc2.rev
@@ -1036,18 +953,18 @@ class SoledadBackendSyncTests(
self.assertTransactionLog([doc_id, doc_id], self.db1)
self.assertGetDoc(self.db1, doc_id, doc2_rev, new_doc, True)
self.assertGetDoc(self.db2, doc_id, doc2_rev, new_doc, False)
- from_idx = self.db1.get_from_index('test-idx', 'altval')[0]
- self.assertEqual(doc2.doc_id, from_idx.doc_id)
- self.assertEqual(doc2.rev, from_idx.rev)
- self.assertTrue(from_idx.has_conflicts)
- self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
+ # soledad doesnt support index due to encryption
+ # from_idx = self.db1.get_from_index('test-idx', 'altval')[0]
+ # self.assertEqual(doc2.doc_id, from_idx.doc_id)
+ # self.assertEqual(doc2.rev, from_idx.rev)
+ # self.assertTrue(from_idx.has_conflicts)
+ # self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
def test_sync_sees_remote_delete_conflicted(self):
self.db1 = self.create_database('test1', 'source')
self.db2 = self.create_database('test2', 'target')
doc1 = self.db1.create_doc_from_json(simple_doc)
doc_id = doc1.doc_id
- self.db1.create_index('test-idx', 'key')
self.sync(self.db1, self.db2)
doc2 = self.make_document(doc1.doc_id, doc1.rev, doc1.get_json())
new_doc = '{"key": "altval"}'
@@ -1067,7 +984,6 @@ class SoledadBackendSyncTests(
self.assertGetDocIncludeDeleted(self.db1, doc_id, doc2.rev, None, True)
self.assertGetDocIncludeDeleted(
self.db2, doc_id, doc2.rev, None, False)
- self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
def test_sync_local_race_conflicted(self):
self.db1 = self.create_database('test1', 'source')
@@ -1075,7 +991,7 @@ class SoledadBackendSyncTests(
doc = self.db1.create_doc_from_json(simple_doc)
doc_id = doc.doc_id
doc1_rev = doc.rev
- self.db1.create_index('test-idx', 'key')
+ # self.db1.create_index('test-idx', 'key')
self.sync(self.db1, self.db2)
content1 = '{"key": "localval"}'
content2 = '{"key": "altval"}'
@@ -1094,21 +1010,22 @@ class SoledadBackendSyncTests(
self.sync(self.db1, self.db2, trace_hook=after_whatschanged)
self.assertEqual([True], triggered)
self.assertGetDoc(self.db1, doc_id, doc2_rev2, content2, True)
- from_idx = self.db1.get_from_index('test-idx', 'altval')[0]
- self.assertEqual(doc.doc_id, from_idx.doc_id)
- self.assertEqual(doc.rev, from_idx.rev)
- self.assertTrue(from_idx.has_conflicts)
- self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
- self.assertEqual([], self.db1.get_from_index('test-idx', 'localval'))
+ # soledad doesnt support indexing due to encryption
+ # from_idx = self.db1.get_from_index('test-idx', 'altval')[0]
+ # self.assertEqual(doc.doc_id, from_idx.doc_id)
+ # self.assertEqual(doc.rev, from_idx.rev)
+ # self.assertTrue(from_idx.has_conflicts)
+ # self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
+ # self.assertEqual([], self.db1.get_from_index('test-idx', 'localval'))
def test_sync_propagates_deletes(self):
self.db1 = self.create_database('test1', 'source')
self.db2 = self.create_database('test2', 'both')
doc1 = self.db1.create_doc_from_json(simple_doc)
doc_id = doc1.doc_id
- self.db1.create_index('test-idx', 'key')
+ # self.db1.create_index('test-idx', 'key')
self.sync(self.db1, self.db2)
- self.db2.create_index('test-idx', 'key')
+ # self.db2.create_index('test-idx', 'key')
self.db3 = self.create_database('test3', 'target')
self.sync(self.db1, self.db3)
self.db1.delete_doc(doc1)
@@ -1124,8 +1041,8 @@ class SoledadBackendSyncTests(
self.db1, doc_id, deleted_rev, None, False)
self.assertGetDocIncludeDeleted(
self.db2, doc_id, deleted_rev, None, False)
- self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
- self.assertEqual([], self.db2.get_from_index('test-idx', 'value'))
+ # self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
+ # self.assertEqual([], self.db2.get_from_index('test-idx', 'value'))
self.sync(self.db2, self.db3)
self.assertLastExchangeLog(
self.db3,
@@ -1324,9 +1241,11 @@ class SoledadBackendExceptionsTests(CouchDBTestCase):
def create_db(self, ensure=True, dbname=None):
if not dbname:
dbname = ('test-%s' % uuid4().hex)
- self.db = couch.CouchDatabase.open_database(
- urljoin('http://127.0.0.1:%d' % self.couch_port, dbname),
- create=True,
+ if dbname not in self.couch_server:
+ self.couch_server.create(dbname)
+ self.db = couch.CouchDatabase(
+ ('http://127.0.0.1:%d' % self.couch_port),
+ dbname,
ensure_ddocs=ensure)
def tearDown(self):
@@ -1520,14 +1439,14 @@ class SoledadBackendExceptionsTests(CouchDBTestCase):
class DatabaseNameValidationTest(unittest.TestCase):
def test_database_name_validation(self):
- self.assertFalse(couch.is_db_name_valid("user-deadbeef | cat /secret"))
- self.assertTrue(couch.is_db_name_valid("user-cafe1337"))
+ self.assertFalse(couch.state.is_db_name_valid("user-deadbeef | cat /secret"))
+ self.assertTrue(couch.state.is_db_name_valid("user-cafe1337"))
class CommandBasedDBCreationTest(unittest.TestCase):
def test_ensure_db_using_custom_command(self):
- state = couch.CouchServerState("url", create_cmd="echo")
+ state = couch.state.CouchServerState("url", create_cmd="echo")
mock_db = Mock()
mock_db.replica_uid = 'replica_uid'
state.open_database = Mock(return_value=mock_db)
@@ -1536,11 +1455,11 @@ class CommandBasedDBCreationTest(unittest.TestCase):
self.assertEquals(mock_db.replica_uid, replica_uid)
def test_raises_unauthorized_on_failure(self):
- state = couch.CouchServerState("url", create_cmd="inexistent")
+ state = couch.state.CouchServerState("url", create_cmd="inexistent")
self.assertRaises(u1db_errors.Unauthorized,
state.ensure_database, "user-1337")
def test_raises_unauthorized_by_default(self):
- state = couch.CouchServerState("url")
+ state = couch.state.CouchServerState("url")
self.assertRaises(u1db_errors.Unauthorized,
state.ensure_database, "user-1337")
diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
index 2c2daf05..8cd3ae08 100644
--- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
+++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
@@ -26,7 +26,8 @@ from twisted.internet import defer
from uuid import uuid4
from leap.soledad.client import Soledad
-from leap.soledad.common.couch import SoledadBackend, CouchServerState
+from leap.soledad.common.couch.state import CouchServerState
+from leap.soledad.common.couch import CouchDatabase
from leap.soledad.common.tests.util import (
make_token_soledad_app,
@@ -86,7 +87,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
TestCaseWithServer.setUp(self)
CouchDBTestCase.setUp(self)
self.user = ('user-%s' % uuid4().hex)
- self.db = SoledadBackend.open_database(
+ self.db = CouchDatabase.open_database(
urljoin(self.couch_url, 'user-' + self.user),
create=True,
replica_uid='replica',
diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py
index 5810ce31..d75275d6 100644
--- a/common/src/leap/soledad/common/tests/test_server.py
+++ b/common/src/leap/soledad/common/tests/test_server.py
@@ -29,10 +29,8 @@ from urlparse import urljoin
from twisted.internet import defer
from twisted.trial import unittest
-from leap.soledad.common.couch import (
- CouchServerState,
- SoledadBackend,
-)
+from leap.soledad.common.couch.state import CouchServerState
+from leap.soledad.common.couch import CouchDatabase
from leap.soledad.common.tests.u1db_tests import TestCaseWithServer
from leap.soledad.common.tests.test_couch import CouchDBTestCase
from leap.soledad.common.tests.util import (
@@ -358,7 +356,7 @@ class EncryptedSyncTestCase(
passphrase=passphrase)
# ensure remote db exists before syncing
- db = SoledadBackend.open_database(
+ db = CouchDatabase.open_database(
urljoin(self.couch_url, 'user-' + user),
create=True,
ensure_ddocs=True)
@@ -488,7 +486,7 @@ class LockResourceTestCase(
self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
TestCaseWithServer.setUp(self)
# create the databases
- db = SoledadBackend.open_database(
+ db = CouchDatabase.open_database(
urljoin(self.couch_url, ('shared-%s' % (uuid4().hex))),
create=True,
ensure_ddocs=True)
diff --git a/common/src/leap/soledad/common/tests/test_sync_mutex.py b/common/src/leap/soledad/common/tests/test_sync_mutex.py
index 35cecb8f..973a8587 100644
--- a/common/src/leap/soledad/common/tests/test_sync_mutex.py
+++ b/common/src/leap/soledad/common/tests/test_sync_mutex.py
@@ -33,7 +33,8 @@ from twisted.internet import defer
from leap.soledad.client.sync import SoledadSynchronizer
-from leap.soledad.common import couch
+from leap.soledad.common.couch.state import CouchServerState
+from leap.soledad.common.couch import CouchDatabase
from leap.soledad.common.tests.u1db_tests import TestCaseWithServer
from leap.soledad.common.tests.test_couch import CouchDBTestCase
@@ -84,7 +85,7 @@ class TestSyncMutex(
sync_target = soledad_sync_target
def make_app(self):
- self.request_state = couch.CouchServerState(self.couch_url)
+ self.request_state = CouchServerState(self.couch_url)
return self.make_app_with_state(self.request_state)
def setUp(self):
@@ -102,7 +103,7 @@ class TestSyncMutex(
self.startServer()
# ensure remote db exists before syncing
- db = couch.SoledadBackend.open_database(
+ db = CouchDatabase.open_database(
urljoin(self.couch_url, 'user-' + self.user),
create=True,
ensure_ddocs=True)
diff --git a/common/src/leap/soledad/common/tests/test_sync_target.py b/common/src/leap/soledad/common/tests/test_sync_target.py
index 0cf31c3f..f25e84dd 100644
--- a/common/src/leap/soledad/common/tests/test_sync_target.py
+++ b/common/src/leap/soledad/common/tests/test_sync_target.py
@@ -29,7 +29,6 @@ from uuid import uuid4
from testscenarios import TestWithScenarios
from twisted.internet import defer
-from urlparse import urljoin
from leap.soledad.client import http_target as target
from leap.soledad.client import crypto
@@ -37,7 +36,6 @@ from leap.soledad.client.sqlcipher import SQLCipherU1DBSync
from leap.soledad.client.sqlcipher import SQLCipherOptions
from leap.soledad.client.sqlcipher import SQLCipherDatabase
-from leap.soledad.common import couch
from leap.soledad.common.document import SoledadDocument
from leap.soledad.common.tests import u1db_tests as tests
@@ -265,9 +263,9 @@ class TestSoledadSyncTarget(
replica_trans_id=replica_trans_id,
number_of_docs=number_of_docs,
doc_idx=doc_idx, sync_id=sync_id)
- from leap.soledad.common.tests.test_couch import IndexedSoledadBackend
+ from leap.soledad.common.backend import SoledadBackend
self.patch(
- IndexedSoledadBackend, '_put_doc_if_newer', bomb_put_doc_if_newer)
+ SoledadBackend, '_put_doc_if_newer', bomb_put_doc_if_newer)
remote_target = self.getSyncTarget(
source_replica_uid='replica')
other_changes = []
diff --git a/common/src/leap/soledad/common/tests/util.py b/common/src/leap/soledad/common/tests/util.py
index bfd06856..f7f9ebd0 100644
--- a/common/src/leap/soledad/common/tests/util.py
+++ b/common/src/leap/soledad/common/tests/util.py
@@ -27,7 +27,6 @@ import shutil
import random
import string
import u1db
-import traceback
import couchdb
from uuid import uuid4
@@ -37,17 +36,17 @@ from StringIO import StringIO
from pysqlcipher import dbapi2
from u1db import sync
-from u1db.errors import DatabaseDoesNotExist
from u1db.remote import http_database
from twisted.trial import unittest
-from leap.common.files import mkdir_p
from leap.common.testing.basetest import BaseLeapTest
from leap.soledad.common import soledad_assert
from leap.soledad.common.document import SoledadDocument
-from leap.soledad.common.couch import SoledadBackend, CouchServerState
+from leap.soledad.common.couch import CouchDatabase
+from leap.soledad.common.couch.state import CouchServerState
+
from leap.soledad.common.crypto import ENC_SCHEME_KEY
from leap.soledad.client import Soledad
@@ -379,7 +378,7 @@ class CouchServerStateForTests(CouchServerState):
Create db and append to a list, allowing test to close it later
"""
dbname = dbname or ('test-%s' % uuid4().hex)
- db = SoledadBackend.open_database(
+ db = CouchDatabase.open_database(
urljoin(self.couch_url, dbname),
True,
replica_uid=replica_uid or 'test',