summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2015-11-12 18:57:33 +0100
committerRuben Pollan <meskio@sindominio.net>2015-11-12 18:57:33 +0100
commit3aea1652d82755affdcf4c734f9f0ba004482046 (patch)
tree75c8344b6f7f9e168d7d8e72d0a1a87b2f09b4ad
parent36f476c53f6f3532652d7428e306dcef1c37f879 (diff)
parent0cf6f3efd3cc7dd3361277a8f05577e823e361e5 (diff)
Merge branch 'refactor/isolated_couchdb_code' into develop
- Releases: 0.8.0
-rw-r--r--client/src/leap/soledad/client/sync.py2
-rw-r--r--common/src/leap/soledad/common/backend.py623
-rw-r--r--common/src/leap/soledad/common/couch.py1518
-rw-r--r--common/src/leap/soledad/common/couch/__init__.py722
-rw-r--r--common/src/leap/soledad/common/couch/errors.py144
-rw-r--r--common/src/leap/soledad/common/couch/state.py160
-rw-r--r--common/src/leap/soledad/common/document.py70
-rw-r--r--common/src/leap/soledad/common/errors.py65
-rw-r--r--common/src/leap/soledad/common/tests/test_couch.py212
-rw-r--r--common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py3
-rw-r--r--common/src/leap/soledad/common/tests/test_server.py37
-rw-r--r--common/src/leap/soledad/common/tests/test_sync_mutex.py7
-rw-r--r--common/src/leap/soledad/common/tests/test_sync_target.py6
-rw-r--r--common/src/leap/soledad/common/tests/util.py7
-rw-r--r--scripts/profiling/mail/mx.py5
-rwxr-xr-xserver/pkg/create-user-db2
-rw-r--r--server/src/leap/soledad/server/__init__.py2
-rw-r--r--server/src/leap/soledad/server/auth.py55
-rw-r--r--server/src/leap/soledad/server/sync.py2
19 files changed, 1825 insertions, 1817 deletions
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index 2276db2a..626ad2e5 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -22,7 +22,7 @@ import logging
from twisted.internet import defer
from u1db import errors
-from leap.soledad.common.errors import MissingDesignDocError
+from leap.soledad.common.couch.errors import MissingDesignDocError
from u1db.sync import Synchronizer
diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py
new file mode 100644
index 00000000..deed5ac2
--- /dev/null
+++ b/common/src/leap/soledad/common/backend.py
@@ -0,0 +1,623 @@
+# -*- coding: utf-8 -*-
+# backend.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""A U1DB generic backend."""
+
+
+from u1db import vectorclock
+from u1db.errors import (
+ RevisionConflict,
+ InvalidDocId,
+ ConflictedDoc,
+ DocumentDoesNotExist,
+ DocumentAlreadyDeleted,
+)
+from u1db.backends import CommonBackend
+from u1db.backends import CommonSyncTarget
+from leap.soledad.common.document import ServerDocument
+
+
+class SoledadBackend(CommonBackend):
+
+ """
+ A U1DB backend implementation.
+ """
+
+ def __init__(self, database, replica_uid=None):
+ """
+ Create a new backend.
+
+ :param database: the database implementation
+ :type database: Database
+ :param replica_uid: an optional unique replica identifier
+ :type replica_uid: str
+ """
+ # save params
+ self._factory = ServerDocument
+ self._real_replica_uid = None
+ self._cache = None
+ self._dbname = database._dbname
+ self._database = database
+ if replica_uid is not None:
+ self._set_replica_uid(replica_uid)
+
+ @property
+ def cache(self):
+ if self._cache is not None:
+ return self._cache
+ else:
+ return {}
+
+ def init_caching(self, cache):
+ """
+ Start using cache by setting internal _cache attribute.
+
+ :param cache: the cache instance, anything that behaves like a dict
+ :type cache: dict
+ """
+ self._cache = cache
+
+ def get_sync_target(self):
+ """
+ Return a SyncTarget object, for another u1db to synchronize with.
+
+ :return: The sync target.
+ :rtype: SoledadSyncTarget
+ """
+ return SoledadSyncTarget(self)
+
+ def delete_database(self):
+ """
+ Delete a U1DB database.
+ """
+ self._database.delete_database()
+
+ def close(self):
+ """
+ Release any resources associated with this database.
+
+ :return: True if db was succesfully closed.
+ :rtype: bool
+ """
+ self._database.close()
+ return True
+
+ def __del__(self):
+ """
+ Close the database upon garbage collection.
+ """
+ self.close()
+
+ def _set_replica_uid(self, replica_uid):
+ """
+ Force the replica uid to be set.
+
+ :param replica_uid: The new replica uid.
+ :type replica_uid: str
+ """
+ self._database.set_replica_uid(replica_uid)
+ self._real_replica_uid = replica_uid
+ self.cache['replica_uid'] = self._real_replica_uid
+
+ def _get_replica_uid(self):
+ """
+ Get the replica uid.
+
+ :return: The replica uid.
+ :rtype: str
+ """
+ if self._real_replica_uid is not None:
+ self.cache['replica_uid'] = self._real_replica_uid
+ return self._real_replica_uid
+ if 'replica_uid' in self.cache:
+ return self.cache['replica_uid']
+ self._real_replica_uid = self._database.get_replica_uid()
+ self._set_replica_uid(self._real_replica_uid)
+ return self._real_replica_uid
+
+ _replica_uid = property(_get_replica_uid, _set_replica_uid)
+
+ replica_uid = property(_get_replica_uid)
+
+ def _get_generation(self):
+ """
+ Return the current generation.
+
+ :return: The current generation.
+ :rtype: int
+
+ :raise SoledadError: Raised by database on operation failure
+ """
+ return self._get_generation_info()[0]
+
+ def _get_generation_info(self):
+ """
+ Return the current generation.
+
+ :return: A tuple containing the current generation and transaction id.
+ :rtype: (int, str)
+
+ :raise SoledadError: Raised by database on operation failure
+ """
+ if self.replica_uid + '_gen' in self.cache:
+ response = self.cache[self.replica_uid + '_gen']
+ return response
+ cur_gen, newest_trans_id = self._database.get_generation_info()
+ self.cache[self.replica_uid + '_gen'] = (cur_gen, newest_trans_id)
+ return (cur_gen, newest_trans_id)
+
+ def _get_trans_id_for_gen(self, generation):
+ """
+ Get the transaction id corresponding to a particular generation.
+
+ :param generation: The generation for which to get the transaction id.
+ :type generation: int
+
+ :return: The transaction id for C{generation}.
+ :rtype: str
+
+ :raise InvalidGeneration: Raised when the generation does not exist.
+
+ """
+ return self._database.get_trans_id_for_gen(generation)
+
+ def _get_transaction_log(self):
+ """
+ This is only for the test suite, it is not part of the api.
+
+ :return: The complete transaction log.
+ :rtype: [(str, str)]
+
+ """
+ return self._database.get_transaction_log()
+
+ def _get_doc(self, doc_id, check_for_conflicts=False):
+ """
+ Extract the document from storage.
+
+ This can return None if the document doesn't exist.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped.
+ :type check_for_conflicts: bool
+
+ :return: The document.
+ :rtype: ServerDocument
+ """
+ return self._database.get_doc(doc_id, check_for_conflicts)
+
+ def get_doc(self, doc_id, include_deleted=False):
+ """
+ Get the JSON string for the given document.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise asking for a deleted
+ document will return None.
+ :type include_deleted: bool
+
+ :return: A document object.
+ :rtype: ServerDocument.
+ """
+ doc = self._get_doc(doc_id, check_for_conflicts=True)
+ if doc is None:
+ return None
+ if doc.is_tombstone() and not include_deleted:
+ return None
+ return doc
+
+ def get_all_docs(self, include_deleted=False):
+ """
+ Get the JSON content for all documents in the database.
+
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :type include_deleted: bool
+
+ :return: (generation, [ServerDocument])
+ The current generation of the database, followed by a list of all
+ the documents in the database.
+ :rtype: (int, [ServerDocument])
+ """
+ return self._database.get_all_docs(include_deleted)
+
+ def _put_doc(self, old_doc, doc):
+ """
+ Put the document in the backend database.
+
+ Note that C{old_doc} must have been fetched with the parameter
+ C{check_for_conflicts} equal to True, so we can properly update the
+ new document using the conflict information from the old one.
+
+ :param old_doc: The old document version.
+ :type old_doc: ServerDocument
+ :param doc: The document to be put.
+ :type doc: ServerDocument
+ """
+ last_transaction =\
+ self._database.save_document(old_doc, doc,
+ self._allocate_transaction_id())
+ if self.replica_uid + '_gen' in self.cache:
+ gen, trans = self.cache[self.replica_uid + '_gen']
+ gen += 1
+ trans = last_transaction
+ self.cache[self.replica_uid + '_gen'] = (gen, trans)
+
+ def put_doc(self, doc):
+ """
+ Update a document.
+
+ If the document currently has conflicts, put will fail.
+ If the database specifies a maximum document size and the document
+ exceeds it, put will fail and raise a DocumentTooBig exception.
+
+ :param doc: A Document with new content.
+ :return: new_doc_rev - The new revision identifier for the document.
+ The Document object will also be updated.
+
+ :raise InvalidDocId: Raised if the document's id is invalid.
+ :raise DocumentTooBig: Raised if the document size is too big.
+ :raise ConflictedDoc: Raised if the document has conflicts.
+ """
+ if doc.doc_id is None:
+ raise InvalidDocId()
+ self._check_doc_id(doc.doc_id)
+ self._check_doc_size(doc)
+ old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
+ if old_doc and old_doc.has_conflicts:
+ raise ConflictedDoc()
+ if old_doc and doc.rev is None and old_doc.is_tombstone():
+ new_rev = self._allocate_doc_rev(old_doc.rev)
+ else:
+ if old_doc is not None:
+ if old_doc.rev != doc.rev:
+ raise RevisionConflict()
+ else:
+ if doc.rev is not None:
+ raise RevisionConflict()
+ new_rev = self._allocate_doc_rev(doc.rev)
+ doc.rev = new_rev
+ self._put_doc(old_doc, doc)
+ return new_rev
+
+ def whats_changed(self, old_generation=0):
+ """
+ Return a list of documents that have changed since old_generation.
+
+ :param old_generation: The generation of the database in the old
+ state.
+ :type old_generation: int
+
+ :return: (generation, trans_id, [(doc_id, generation, trans_id),...])
+ The current generation of the database, its associated
+ transaction id, and a list of of changed documents since
+ old_generation, represented by tuples with for each document
+ its doc_id and the generation and transaction id corresponding
+ to the last intervening change and sorted by generation (old
+ changes first)
+ :rtype: (int, str, [(str, int, str)])
+ """
+ return self._database.whats_changed(old_generation)
+
+ def delete_doc(self, doc):
+ """
+ Mark a document as deleted.
+
+ Will abort if the current revision doesn't match doc.rev.
+ This will also set doc.content to None.
+
+ :param doc: The document to mark as deleted.
+ :type doc: ServerDocument.
+
+ :raise DocumentDoesNotExist: Raised if the document does not
+ exist.
+ :raise RevisionConflict: Raised if the revisions do not match.
+ :raise DocumentAlreadyDeleted: Raised if the document is
+ already deleted.
+ :raise ConflictedDoc: Raised if the doc has conflicts.
+ """
+ old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
+ if old_doc is None:
+ raise DocumentDoesNotExist
+ if old_doc.rev != doc.rev:
+ raise RevisionConflict()
+ if old_doc.is_tombstone():
+ raise DocumentAlreadyDeleted
+ if old_doc.has_conflicts:
+ raise ConflictedDoc()
+ new_rev = self._allocate_doc_rev(doc.rev)
+ doc.rev = new_rev
+ doc.make_tombstone()
+ self._put_doc(old_doc, doc)
+ return new_rev
+
+ def get_doc_conflicts(self, doc_id):
+ """
+ Get the conflicted versions of a document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+
+ :return: A list of conflicted versions of the document.
+ :rtype: list
+ """
+ return self._database.get_doc_conflicts(doc_id)
+
+ def _get_replica_gen_and_trans_id(self, other_replica_uid):
+ """
+ Return the last known generation and transaction id for the other db
+ replica.
+
+ When you do a synchronization with another replica, the Database keeps
+ track of what generation the other database replica was at, and what
+ the associated transaction id was. This is used to determine what data
+ needs to be sent, and if two databases are claiming to be the same
+ replica.
+
+ :param other_replica_uid: The identifier for the other replica.
+ :type other_replica_uid: str
+
+ :return: A tuple containing the generation and transaction id we
+ encountered during synchronization. If we've never
+ synchronized with the replica, this is (0, '').
+ :rtype: (int, str)
+ """
+ if other_replica_uid in self.cache:
+ return self.cache[other_replica_uid]
+ return self._database.get_replica_gen_and_trans_id(other_replica_uid)
+
+ def _set_replica_gen_and_trans_id(self, other_replica_uid,
+ other_generation, other_transaction_id):
+ """
+ Set the last-known generation and transaction id for the other
+ database replica.
+
+ We have just performed some synchronization, and we want to track what
+ generation the other replica was at. See also
+ _get_replica_gen_and_trans_id.
+
+ :param other_replica_uid: The U1DB identifier for the other replica.
+ :type other_replica_uid: str
+ :param other_generation: The generation number for the other replica.
+ :type other_generation: int
+ :param other_transaction_id: The transaction id associated with the
+ generation.
+ :type other_transaction_id: str
+ """
+ if other_replica_uid is not None and other_generation is not None:
+ self.cache[other_replica_uid] = (other_generation,
+ other_transaction_id)
+ self._database.set_replica_gen_and_trans_id(other_replica_uid,
+ other_generation,
+ other_transaction_id)
+
+ def _do_set_replica_gen_and_trans_id(
+ self, other_replica_uid, other_generation, other_transaction_id):
+ """
+ _put_doc_if_newer from super class is calling it. So we declare this.
+
+ :param other_replica_uid: The U1DB identifier for the other replica.
+ :type other_replica_uid: str
+ :param other_generation: The generation number for the other replica.
+ :type other_generation: int
+ :param other_transaction_id: The transaction id associated with the
+ generation.
+ :type other_transaction_id: str
+ """
+ self._set_replica_gen_and_trans_id(other_replica_uid,
+ other_generation,
+ other_transaction_id)
+
+ def _force_doc_sync_conflict(self, doc):
+ """
+ Add a conflict and force a document put.
+
+ :param doc: The document to be put.
+ :type doc: ServerDocument
+ """
+ my_doc = self._get_doc(doc.doc_id)
+ self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev))
+ doc.add_conflict(self._factory(doc.doc_id, my_doc.rev,
+ my_doc.get_json()))
+ doc.has_conflicts = True
+ self._put_doc(my_doc, doc)
+
+ def resolve_doc(self, doc, conflicted_doc_revs):
+ """
+ Mark a document as no longer conflicted.
+
+ We take the list of revisions that the client knows about that it is
+ superseding. This may be a different list from the actual current
+ conflicts, in which case only those are removed as conflicted. This
+ may fail if the conflict list is significantly different from the
+ supplied information. (sync could have happened in the background from
+ the time you GET_DOC_CONFLICTS until the point where you RESOLVE)
+
+ :param doc: A Document with the new content to be inserted.
+ :type doc: ServerDocument
+ :param conflicted_doc_revs: A list of revisions that the new content
+ supersedes.
+ :type conflicted_doc_revs: [str]
+
+ :raise SoledadError: Raised by database on operation failure
+ """
+ cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
+ new_rev = self._ensure_maximal_rev(cur_doc.rev,
+ conflicted_doc_revs)
+ superseded_revs = set(conflicted_doc_revs)
+ doc.rev = new_rev
+ # this backend stores conflicts as properties of the documents, so we
+ # have to copy these conflicts over to the document being updated.
+ if cur_doc.rev in superseded_revs:
+ # the newer doc version will supersede the one in the database, so
+ # we copy conflicts before updating the backend.
+ doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over.
+ doc.delete_conflicts(superseded_revs)
+ self._put_doc(cur_doc, doc)
+ else:
+ # the newer doc version does not supersede the one in the
+ # database, so we will add a conflict to the database and copy
+ # those over to the document the user has in her hands.
+ cur_doc.add_conflict(doc)
+ cur_doc.delete_conflicts(superseded_revs)
+ self._put_doc(cur_doc, cur_doc) # just update conflicts
+ # backend has been updated with current conflicts, now copy them
+ # to the current document.
+ doc.set_conflicts(cur_doc.get_conflicts())
+
+ def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,
+ replica_trans_id='', number_of_docs=None,
+ doc_idx=None, sync_id=None):
+ """
+ Insert/update document into the database with a given revision.
+
+ This api is used during synchronization operations.
+
+ If a document would conflict and save_conflict is set to True, the
+ content will be selected as the 'current' content for doc.doc_id,
+ even though doc.rev doesn't supersede the currently stored revision.
+ The currently stored document will be added to the list of conflict
+ alternatives for the given doc_id.
+
+ This forces the new content to be 'current' so that we get convergence
+ after synchronizing, even if people don't resolve conflicts. Users can
+ then notice that their content is out of date, update it, and
+ synchronize again. (The alternative is that users could synchronize and
+ think the data has propagated, but their local copy looks fine, and the
+ remote copy is never updated again.)
+
+ :param doc: A document object
+ :type doc: ServerDocument
+ :param save_conflict: If this document is a conflict, do you want to
+ save it as a conflict, or just ignore it.
+ :type save_conflict: bool
+ :param replica_uid: A unique replica identifier.
+ :type replica_uid: str
+ :param replica_gen: The generation of the replica corresponding to the
+ this document. The replica arguments are optional,
+ but are used during synchronization.
+ :type replica_gen: int
+ :param replica_trans_id: The transaction_id associated with the
+ generation.
+ :type replica_trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
+
+ :return: (state, at_gen) - If we don't have doc_id already, or if
+ doc_rev supersedes the existing document revision, then the
+ content will be inserted, and state is 'inserted'. If
+ doc_rev is less than or equal to the existing revision, then
+ the put is ignored and state is respecitvely 'superseded' or
+ 'converged'. If doc_rev is not strictly superseded or
+ supersedes, then state is 'conflicted'. The document will not
+ be inserted if save_conflict is False. For 'inserted' or
+ 'converged', at_gen is the insertion/current generation.
+ :rtype: (str, int)
+ """
+ if not isinstance(doc, ServerDocument):
+ doc = self._factory(doc.doc_id, doc.rev, doc.get_json())
+ my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
+ if my_doc:
+ doc.set_conflicts(my_doc.get_conflicts())
+ return CommonBackend._put_doc_if_newer(self, doc, save_conflict,
+ replica_uid, replica_gen,
+ replica_trans_id)
+
+ def _put_and_update_indexes(self, cur_doc, doc):
+ self._put_doc(cur_doc, doc)
+
+ def get_docs(self, doc_ids, check_for_conflicts=True,
+ include_deleted=False):
+ """
+ Get the JSON content for many documents.
+
+ :param doc_ids: A list of document identifiers or None for all.
+ :type doc_ids: list
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped, and 'None' will be
+ returned instead of True/False.
+ :type check_for_conflicts: bool
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :return: iterable giving the Document object for each document id
+ in matching doc_ids order.
+ :rtype: iterable
+ """
+ return self._database.get_docs(doc_ids, check_for_conflicts,
+ include_deleted)
+
+ def _prune_conflicts(self, doc, doc_vcr):
+ """
+ Prune conflicts that are older then the current document's revision, or
+ whose content match to the current document's content.
+ Originally in u1db.CommonBackend
+
+ :param doc: The document to have conflicts pruned.
+ :type doc: ServerDocument
+ :param doc_vcr: A vector clock representing the current document's
+ revision.
+ :type doc_vcr: u1db.vectorclock.VectorClock
+ """
+ if doc.has_conflicts:
+ autoresolved = False
+ c_revs_to_prune = []
+ for c_doc in doc._conflicts:
+ c_vcr = vectorclock.VectorClockRev(c_doc.rev)
+ if doc_vcr.is_newer(c_vcr):
+ c_revs_to_prune.append(c_doc.rev)
+ elif doc.same_content_as(c_doc):
+ c_revs_to_prune.append(c_doc.rev)
+ doc_vcr.maximize(c_vcr)
+ autoresolved = True
+ if autoresolved:
+ doc_vcr.increment(self._replica_uid)
+ doc.rev = doc_vcr.as_str()
+ doc.delete_conflicts(c_revs_to_prune)
+
+
+class SoledadSyncTarget(CommonSyncTarget):
+
+ """
+ Functionality for using a SoledadBackend as a synchronization target.
+ """
+
+ def get_sync_info(self, source_replica_uid):
+ source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id(
+ source_replica_uid)
+ my_gen, my_trans_id = self._db._get_generation_info()
+ return (
+ self._db._replica_uid, my_gen, my_trans_id, source_gen,
+ source_trans_id)
+
+ def record_sync_info(self, source_replica_uid, source_replica_generation,
+ source_replica_transaction_id):
+ if self._trace_hook:
+ self._trace_hook('record_sync_info')
+ self._db._set_replica_gen_and_trans_id(
+ source_replica_uid, source_replica_generation,
+ source_replica_transaction_id)
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
deleted file mode 100644
index ccedef15..00000000
--- a/common/src/leap/soledad/common/couch.py
+++ /dev/null
@@ -1,1518 +0,0 @@
-# -*- coding: utf-8 -*-
-# couch.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""A U1DB backend that uses CouchDB as its persistence layer."""
-
-
-import json
-import re
-import uuid
-import logging
-import binascii
-import time
-import sys
-
-
-from StringIO import StringIO
-from urlparse import urljoin
-from contextlib import contextmanager
-from multiprocessing.pool import ThreadPool
-
-
-from couchdb.client import Server, Database
-from couchdb.http import (
- ResourceConflict,
- ResourceNotFound,
- ServerError,
- Session,
- urljoin as couch_urljoin,
- Resource,
-)
-from u1db import vectorclock
-from u1db.errors import (
- DatabaseDoesNotExist,
- InvalidGeneration,
- RevisionConflict,
- InvalidDocId,
- ConflictedDoc,
- DocumentDoesNotExist,
- DocumentAlreadyDeleted,
- Unauthorized,
-)
-from u1db.backends import CommonBackend, CommonSyncTarget
-from u1db.remote import http_app
-from u1db.remote.server_state import ServerState
-
-
-from leap.soledad.common import ddocs, errors
-from leap.soledad.common.command import exec_validated_cmd
-from leap.soledad.common.document import SoledadDocument
-
-
-logger = logging.getLogger(__name__)
-
-
-COUCH_TIMEOUT = 120 # timeout for transfers between Soledad server and Couch
-
-
-def list_users_dbs(couch_url):
- """
- Retrieves a list with all databases that starts with 'user-' on CouchDB.
- Those databases belongs to users. So, the list will contain all the
- database names in the form of 'user-{uuid4}'.
-
- :param couch_url: The couch url with needed credentials
- :type couch_url: str
-
- :return: The list of all database names from users.
- :rtype: [str]
- """
- with couch_server(couch_url) as server:
- users = [dbname for dbname in server if dbname.startswith('user-')]
- return users
-
-
-class InvalidURLError(Exception):
-
- """
- Exception raised when Soledad encounters a malformed URL.
- """
-
-
-class CouchDocument(SoledadDocument):
-
- """
- This is the document used for maintaining the Couch backend.
-
- A CouchDocument can fetch and manipulate conflicts and also holds a
- reference to the couch document revision. This data is used to ensure an
- atomic and consistent update of the database.
- """
-
- def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False):
- """
- Container for handling a document that is stored in couch backend.
-
- :param doc_id: The unique document identifier.
- :type doc_id: str
- :param rev: The revision identifier of the document.
- :type rev: str
- :param json: The JSON string for this document.
- :type json: str
- :param has_conflicts: Boolean indicating if this document has conflicts
- :type has_conflicts: bool
- """
- SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts)
- self.couch_rev = None
- self.transactions = None
- self._conflicts = None
-
- def get_conflicts(self):
- """
- Get the conflicted versions of the document.
-
- :return: The conflicted versions of the document.
- :rtype: [CouchDocument]
- """
- return self._conflicts or []
-
- def set_conflicts(self, conflicts):
- """
- Set the conflicted versions of the document.
-
- :param conflicts: The conflicted versions of the document.
- :type conflicts: list
- """
- self._conflicts = conflicts
- self.has_conflicts = len(self._conflicts) > 0
-
- def add_conflict(self, doc):
- """
- Add a conflict to this document.
-
- :param doc: The conflicted version to be added.
- :type doc: CouchDocument
- """
- if self._conflicts is None:
- raise Exception("Fetch conflicts first!")
- self._conflicts.append(doc)
- self.has_conflicts = len(self._conflicts) > 0
-
- def delete_conflicts(self, conflict_revs):
- """
- Delete conflicted versions of this document.
-
- :param conflict_revs: The conflicted revisions to be deleted.
- :type conflict_revs: [str]
- """
- if self._conflicts is None:
- raise Exception("Fetch conflicts first!")
- self._conflicts = filter(
- lambda doc: doc.rev not in conflict_revs,
- self._conflicts)
- self.has_conflicts = len(self._conflicts) > 0
-
- def update(self, new_doc):
- # update info
- self.rev = new_doc.rev
- if new_doc.is_tombstone():
- self.is_tombstone()
- else:
- self.content = new_doc.content
- self.has_conflicts = new_doc.has_conflicts
-
- def prune_conflicts(self, doc_vcr, autoresolved_increment):
- """
- Prune conflicts that are older then the current document's revision, or
- whose content match to the current document's content.
- Originally in u1db.CommonBackend
-
- :param doc: The document to have conflicts pruned.
- :type doc: CouchDocument
- :param doc_vcr: A vector clock representing the current document's
- revision.
- :type doc_vcr: u1db.vectorclock.VectorClock
- """
- if self.has_conflicts:
- autoresolved = False
- c_revs_to_prune = []
- for c_doc in self._conflicts:
- c_vcr = vectorclock.VectorClockRev(c_doc.rev)
- if doc_vcr.is_newer(c_vcr):
- c_revs_to_prune.append(c_doc.rev)
- elif self.same_content_as(c_doc):
- c_revs_to_prune.append(c_doc.rev)
- doc_vcr.maximize(c_vcr)
- autoresolved = True
- if autoresolved:
- doc_vcr.increment(autoresolved_increment)
- self.rev = doc_vcr.as_str()
- self.delete_conflicts(c_revs_to_prune)
-
-
-# monkey-patch the u1db http app to use CouchDocument
-http_app.Document = CouchDocument
-
-
-def raise_missing_design_doc_error(exc, ddoc_path):
- """
- Raise an appropriate exception when catching a ResourceNotFound when
- accessing a design document.
-
- :param exc: The exception cought.
- :type exc: ResourceNotFound
- :param ddoc_path: A list representing the requested path.
- :type ddoc_path: list
-
- :raise MissingDesignDocError: Raised when tried to access a missing design
- document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access a
- missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a design
- document for an yet unknown reason.
- """
- path = "".join(ddoc_path)
- if exc.message[1] == 'missing':
- raise errors.MissingDesignDocError(path)
- elif exc.message[1] == 'missing function' or \
- exc.message[1].startswith('missing lists function'):
- raise errors.MissingDesignDocListFunctionError(path)
- elif exc.message[1] == 'missing_named_view':
- raise errors.MissingDesignDocNamedViewError(path)
- elif exc.message[1] == 'deleted':
- raise errors.MissingDesignDocDeletedError(path)
- # other errors are unknown for now
- raise errors.DesignDocUnknownError("%s: %s" % (path, str(exc.message)))
-
-
-def raise_server_error(exc, ddoc_path):
- """
- Raise an appropriate exception when catching a ServerError when
- accessing a design document.
-
- :param exc: The exception cought.
- :type exc: ResourceNotFound
- :param ddoc_path: A list representing the requested path.
- :type ddoc_path: list
-
- :raise MissingDesignDocListFunctionError: Raised when trying to access a
- missing list function on a
- design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a design
- document for an yet unknown reason.
- """
- path = "".join(ddoc_path)
- msg = exc.message[1][0]
- if msg == 'unnamed_error':
- raise errors.MissingDesignDocListFunctionError(path)
- elif msg == 'TypeError':
- if 'point is undefined' in exc.message[1][1]:
- raise errors.MissingDesignDocListFunctionError
- # other errors are unknown for now
- raise errors.DesignDocUnknownError(path)
-
-
-class MultipartWriter(object):
-
- """
- A multipart writer adapted from python-couchdb's one so we can PUT
- documents using couch's multipart PUT.
-
- This stripped down version does not allow for nested structures, and
- contains only the essential things we need to PUT SoledadDocuments to the
- couch backend.
- """
-
- CRLF = '\r\n'
-
- def __init__(self, fileobj, headers=None, boundary=None):
- """
- Initialize the multipart writer.
- """
- self.fileobj = fileobj
- if boundary is None:
- boundary = self._make_boundary()
- self._boundary = boundary
- self._build_headers('related', headers)
-
- def add(self, mimetype, content, headers={}):
- """
- Add a part to the multipart stream.
- """
- self.fileobj.write('--')
- self.fileobj.write(self._boundary)
- self.fileobj.write(self.CRLF)
- headers['Content-Type'] = mimetype
- self._write_headers(headers)
- if content:
- # XXX: throw an exception if a boundary appears in the content??
- self.fileobj.write(content)
- self.fileobj.write(self.CRLF)
-
- def close(self):
- """
- Close the multipart stream.
- """
- self.fileobj.write('--')
- self.fileobj.write(self._boundary)
- # be careful not to have anything after '--', otherwise old couch
- # versions (including bigcouch) will fail.
- self.fileobj.write('--')
-
- def _make_boundary(self):
- """
- Create a boundary to discern multi parts.
- """
- try:
- from uuid import uuid4
- return '==' + uuid4().hex + '=='
- except ImportError:
- from random import randrange
- token = randrange(sys.maxint)
- format = '%%0%dd' % len(repr(sys.maxint - 1))
- return '===============' + (format % token) + '=='
-
- def _write_headers(self, headers):
- """
- Write a part header in the buffer stream.
- """
- if headers:
- for name in sorted(headers.keys()):
- value = headers[name]
- self.fileobj.write(name)
- self.fileobj.write(': ')
- self.fileobj.write(value)
- self.fileobj.write(self.CRLF)
- self.fileobj.write(self.CRLF)
-
- def _build_headers(self, subtype, headers):
- """
- Build the main headers of the multipart stream.
-
- This is here so we can send headers separete from content using
- python-couchdb API.
- """
- self.headers = {}
- self.headers['Content-Type'] = 'multipart/%s; boundary="%s"' % \
- (subtype, self._boundary)
- if headers:
- for name in sorted(headers.keys()):
- value = headers[name]
- self.headers[name] = value
-
-
-@contextmanager
-def couch_server(url):
- """
- Provide a connection to a couch server and cleanup after use.
-
- For database creation and deletion we use an ephemeral connection to the
- couch server. That connection has to be properly closed, so we provide it
- as a context manager.
-
- :param url: The URL of the Couch server.
- :type url: str
- """
- session = Session(timeout=COUCH_TIMEOUT)
- server = Server(url=url, full_commit=False, session=session)
- yield server
-
-
-THREAD_POOL = ThreadPool(20)
-
-
-class CouchDatabase(CommonBackend):
-
- """
- A U1DB implementation that uses CouchDB as its persistence layer.
- """
-
- @classmethod
- def open_database(cls, url, create, replica_uid=None, ensure_ddocs=False,
- database_security=None):
- """
- Open a U1DB database using CouchDB as backend.
-
- :param url: the url of the database replica
- :type url: str
- :param create: should the replica be created if it does not exist?
- :type create: bool
- :param replica_uid: an optional unique replica identifier
- :type replica_uid: str
- :param ensure_ddocs: Ensure that the design docs exist on server.
- :type ensure_ddocs: bool
-
- :return: the database instance
- :rtype: CouchDatabase
- """
- # get database from url
- m = re.match('(^https?://[^/]+)/(.+)$', url)
- if not m:
- raise InvalidURLError
- url = m.group(1)
- dbname = m.group(2)
- with couch_server(url) as server:
- try:
- server[dbname]
- except ResourceNotFound:
- if not create:
- raise DatabaseDoesNotExist()
- server.create(dbname)
- return cls(
- url, dbname, replica_uid=replica_uid,
- ensure_ddocs=ensure_ddocs, database_security=database_security)
-
- def __init__(self, url, dbname, replica_uid=None, ensure_ddocs=False,
- database_security=None):
- """
- Create a new Couch data container.
-
- :param url: the url of the couch database
- :type url: str
- :param dbname: the database name
- :type dbname: str
- :param replica_uid: an optional unique replica identifier
- :type replica_uid: str
- :param ensure_ddocs: Ensure that the design docs exist on server.
- :type ensure_ddocs: bool
- """
- # save params
- self._url = url
- self._session = Session(timeout=COUCH_TIMEOUT)
- self._factory = CouchDocument
- self._real_replica_uid = None
- # configure couch
- self._dbname = dbname
- self._database = Database(
- urljoin(self._url, self._dbname),
- self._session)
- try:
- self._database.info()
- except ResourceNotFound:
- raise DatabaseDoesNotExist()
- if replica_uid is not None:
- self._set_replica_uid(replica_uid)
- if ensure_ddocs:
- self.ensure_ddocs_on_db()
- self.ensure_security_ddoc(database_security)
- self._cache = None
-
- @property
- def cache(self):
- if self._cache is not None:
- return self._cache
- else:
- return {}
-
- def init_caching(self, cache):
- """
- Start using cache by setting internal _cache attribute.
-
- :param cache: the cache instance, anything that behaves like a dict
- :type cache: dict
- """
- self._cache = cache
-
- def ensure_ddocs_on_db(self):
- """
- Ensure that the design documents used by the backend exist on the
- couch database.
- """
- for ddoc_name in ['docs', 'syncs', 'transactions']:
- try:
- self._database.resource('_design',
- ddoc_name, '_info').get_json()
- except ResourceNotFound:
- ddoc = json.loads(
- binascii.a2b_base64(
- getattr(ddocs, ddoc_name)))
- self._database.save(ddoc)
-
- def ensure_security_ddoc(self, security_config=None):
- """
- Make sure that only soledad user is able to access this database as
- an unprivileged member, meaning that administration access will
- be forbidden even inside an user database.
- The goal is to make sure that only the lowest access level is given
- to the unprivileged CouchDB user set on the server process.
- This is achieved by creating a _security design document, see:
- http://docs.couchdb.org/en/latest/api/database/security.html
-
- :param database_security: security configuration parsed from conf file
- :type cache: dict
- """
- security_config = security_config or {}
- security = self._database.resource.get_json('_security')[2]
- security['members'] = {'names': [], 'roles': []}
- security['members']['names'] = security_config.get('members',
- ['soledad'])
- security['members']['roles'] = security_config.get('members_roles', [])
- security['admins'] = {'names': [], 'roles': []}
- security['admins']['names'] = security_config.get('admins', [])
- security['admins']['roles'] = security_config.get('admins_roles', [])
- self._database.resource.put_json('_security', body=security)
-
- def get_sync_target(self):
- """
- Return a SyncTarget object, for another u1db to synchronize with.
-
- :return: The sync target.
- :rtype: CouchSyncTarget
- """
- return CouchSyncTarget(self)
-
- def delete_database(self):
- """
- Delete a U1DB CouchDB database.
- """
- with couch_server(self._url) as server:
- del(server[self._dbname])
-
- def close(self):
- """
- Release any resources associated with this database.
-
- :return: True if db was succesfully closed.
- :rtype: bool
- """
- self._url = None
- self._full_commit = None
- self._session = None
- self._database = None
- return True
-
- def __del__(self):
- """
- Close the database upon garbage collection.
- """
- self.close()
-
- def _set_replica_uid(self, replica_uid):
- """
- Force the replica uid to be set.
-
- :param replica_uid: The new replica uid.
- :type replica_uid: str
- """
- try:
- # set on existent config document
- doc = self._database['u1db_config']
- doc['replica_uid'] = replica_uid
- except ResourceNotFound:
- # or create the config document
- doc = {
- '_id': 'u1db_config',
- 'replica_uid': replica_uid,
- }
- self._database.save(doc)
- self._real_replica_uid = replica_uid
-
- def _get_replica_uid(self):
- """
- Get the replica uid.
-
- :return: The replica uid.
- :rtype: str
- """
- if self._real_replica_uid is not None:
- self.cache[self._url] = {'replica_uid': self._real_replica_uid}
- return self._real_replica_uid
- if self._url in self.cache:
- return self.cache[self._url]['replica_uid']
- try:
- # grab replica_uid from server
- doc = self._database['u1db_config']
- self.cache[self._url] = doc
- self._real_replica_uid = doc['replica_uid']
- return self._real_replica_uid
- except ResourceNotFound:
- # create a unique replica_uid
- self._real_replica_uid = uuid.uuid4().hex
- self._set_replica_uid(self._real_replica_uid)
- return self._real_replica_uid
-
- _replica_uid = property(_get_replica_uid, _set_replica_uid)
-
- replica_uid = property(_get_replica_uid)
-
- def _get_generation(self):
- """
- Return the current generation.
-
- :return: The current generation.
- :rtype: int
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- # query a couch list function
- if self.replica_uid + '_gen' in self.cache:
- return self.cache[self.replica_uid + '_gen']['generation']
- ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json()
- self.cache[self.replica_uid + '_gen'] = response[2]
- return response[2]['generation']
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
- except ServerError as e:
- raise_server_error(e, ddoc_path)
-
- def _get_generation_info(self):
- """
- Return the current generation.
-
- :return: A tuple containing the current generation and transaction id.
- :rtype: (int, str)
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- if self.replica_uid + '_gen' in self.cache:
- response = self.cache[self.replica_uid + '_gen']
- return (response['generation'], response['transaction_id'])
- # query a couch list function
- ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json()
- self.cache[self.replica_uid + '_gen'] = response[2]
- return (response[2]['generation'], response[2]['transaction_id'])
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
- except ServerError as e:
- raise_server_error(e, ddoc_path)
-
- def _get_trans_id_for_gen(self, generation):
- """
- Get the transaction id corresponding to a particular generation.
-
- :param generation: The generation for which to get the transaction id.
- :type generation: int
-
- :return: The transaction id for C{generation}.
- :rtype: str
-
- :raise InvalidGeneration: Raised when the generation does not exist.
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- if generation == 0:
- return ''
- # query a couch list function
- ddoc_path = [
- '_design', 'transactions', '_list', 'trans_id_for_gen', 'log'
- ]
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json(gen=generation)
- if response[2] == {}:
- raise InvalidGeneration
- return response[2]['transaction_id']
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
- except ServerError as e:
- raise_server_error(e, ddoc_path)
-
- def _get_transaction_log(self):
- """
- This is only for the test suite, it is not part of the api.
-
- :return: The complete transaction log.
- :rtype: [(str, str)]
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- # query a couch view
- ddoc_path = ['_design', 'transactions', '_view', 'log']
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json()
- return map(
- lambda row: (row['id'], row['value']),
- response[2]['rows'])
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
-
- def _get_doc(self, doc_id, check_for_conflicts=False):
- """
- Extract the document from storage.
-
- This can return None if the document doesn't exist.
-
- :param doc_id: The unique document identifier
- :type doc_id: str
- :param check_for_conflicts: If set to False, then the conflict check
- will be skipped.
- :type check_for_conflicts: bool
-
- :return: The document.
- :rtype: CouchDocument
- """
- # get document with all attachments (u1db content and eventual
- # conflicts)
- try:
- result = \
- self._database.resource(doc_id).get_json(
- attachments=True)[2]
- except ResourceNotFound:
- return None
- return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts)
-
- def __parse_doc_from_couch(self, result, doc_id,
- check_for_conflicts=False):
- # restrict to u1db documents
- if 'u1db_rev' not in result:
- return None
- doc = self._factory(doc_id, result['u1db_rev'])
- # set contents or make tombstone
- if '_attachments' not in result \
- or 'u1db_content' not in result['_attachments']:
- doc.make_tombstone()
- else:
- doc.content = json.loads(
- binascii.a2b_base64(
- result['_attachments']['u1db_content']['data']))
- # determine if there are conflicts
- if check_for_conflicts \
- and '_attachments' in result \
- and 'u1db_conflicts' in result['_attachments']:
- doc.set_conflicts(
- self._build_conflicts(
- doc.doc_id,
- json.loads(binascii.a2b_base64(
- result['_attachments']['u1db_conflicts']['data']))))
- # store couch revision
- doc.couch_rev = result['_rev']
- # store transactions
- doc.transactions = result['u1db_transactions']
- return doc
-
- def get_doc(self, doc_id, include_deleted=False):
- """
- Get the JSON string for the given document.
-
- :param doc_id: The unique document identifier
- :type doc_id: str
- :param include_deleted: If set to True, deleted documents will be
- returned with empty content. Otherwise asking for a deleted
- document will return None.
- :type include_deleted: bool
-
- :return: A document object.
- :rtype: CouchDocument.
- """
- doc = self._get_doc(doc_id, check_for_conflicts=True)
- if doc is None:
- return None
- if doc.is_tombstone() and not include_deleted:
- return None
- return doc
-
- def get_all_docs(self, include_deleted=False):
- """
- Get the JSON content for all documents in the database.
-
- :param include_deleted: If set to True, deleted documents will be
- returned with empty content. Otherwise deleted
- documents will not be included in the results.
- :type include_deleted: bool
-
- :return: (generation, [CouchDocument])
- The current generation of the database, followed by a list of all
- the documents in the database.
- :rtype: (int, [CouchDocument])
- """
-
- generation = self._get_generation()
- results = list(self.get_docs(self._database,
- include_deleted=include_deleted))
- return (generation, results)
-
- def _put_doc(self, old_doc, doc):
- """
- Put the document in the Couch backend database.
-
- Note that C{old_doc} must have been fetched with the parameter
- C{check_for_conflicts} equal to True, so we can properly update the
- new document using the conflict information from the old one.
-
- :param old_doc: The old document version.
- :type old_doc: CouchDocument
- :param doc: The document to be put.
- :type doc: CouchDocument
-
- :raise RevisionConflict: Raised when trying to update a document but
- couch revisions mismatch.
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- attachments = {} # we save content and conflicts as attachments
- parts = [] # and we put it using couch's multipart PUT
- # save content as attachment
- if doc.is_tombstone() is False:
- content = doc.get_json()
- attachments['u1db_content'] = {
- 'follows': True,
- 'content_type': 'application/octet-stream',
- 'length': len(content),
- }
- parts.append(content)
- # save conflicts as attachment
- if doc.has_conflicts is True:
- conflicts = json.dumps(
- map(lambda cdoc: (cdoc.rev, cdoc.content),
- doc.get_conflicts()))
- attachments['u1db_conflicts'] = {
- 'follows': True,
- 'content_type': 'application/octet-stream',
- 'length': len(conflicts),
- }
- parts.append(conflicts)
- # store old transactions, if any
- transactions = old_doc.transactions[:] if old_doc is not None else []
- # create a new transaction id and timestamp it so the transaction log
- # is consistent when querying the database.
- transactions.append(
- # here we store milliseconds to keep consistent with javascript
- # Date.prototype.getTime() which was used before inside a couchdb
- # update handler.
- (int(time.time() * 1000),
- self._allocate_transaction_id()))
- # build the couch document
- couch_doc = {
- '_id': doc.doc_id,
- 'u1db_rev': doc.rev,
- 'u1db_transactions': transactions,
- '_attachments': attachments,
- }
- # if we are updating a doc we have to add the couch doc revision
- if old_doc is not None:
- couch_doc['_rev'] = old_doc.couch_rev
- # prepare the multipart PUT
- buf = StringIO()
- envelope = MultipartWriter(buf)
- envelope.add('application/json', json.dumps(couch_doc))
- for part in parts:
- envelope.add('application/octet-stream', part)
- envelope.close()
- # try to save and fail if there's a revision conflict
- try:
- resource = self._new_resource()
- resource.put_json(
- doc.doc_id, body=str(buf.getvalue()), headers=envelope.headers)
- except ResourceConflict:
- raise RevisionConflict()
- if self.replica_uid + '_gen' in self.cache:
- gen_info = self.cache[self.replica_uid + '_gen']
- gen_info['generation'] += 1
- gen_info['transaction_id'] = transactions[-1][1]
-
- def put_doc(self, doc):
- """
- Update a document.
-
- If the document currently has conflicts, put will fail.
- If the database specifies a maximum document size and the document
- exceeds it, put will fail and raise a DocumentTooBig exception.
-
- :param doc: A Document with new content.
- :return: new_doc_rev - The new revision identifier for the document.
- The Document object will also be updated.
-
- :raise InvalidDocId: Raised if the document's id is invalid.
- :raise DocumentTooBig: Raised if the document size is too big.
- :raise ConflictedDoc: Raised if the document has conflicts.
- """
- if doc.doc_id is None:
- raise InvalidDocId()
- self._check_doc_id(doc.doc_id)
- self._check_doc_size(doc)
- old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- if old_doc and old_doc.has_conflicts:
- raise ConflictedDoc()
- if old_doc and doc.rev is None and old_doc.is_tombstone():
- new_rev = self._allocate_doc_rev(old_doc.rev)
- else:
- if old_doc is not None:
- if old_doc.rev != doc.rev:
- raise RevisionConflict()
- else:
- if doc.rev is not None:
- raise RevisionConflict()
- new_rev = self._allocate_doc_rev(doc.rev)
- doc.rev = new_rev
- self._put_doc(old_doc, doc)
- return new_rev
-
- def whats_changed(self, old_generation=0):
- """
- Return a list of documents that have changed since old_generation.
-
- :param old_generation: The generation of the database in the old
- state.
- :type old_generation: int
-
- :return: (generation, trans_id, [(doc_id, generation, trans_id),...])
- The current generation of the database, its associated
- transaction id, and a list of of changed documents since
- old_generation, represented by tuples with for each document
- its doc_id and the generation and transaction id corresponding
- to the last intervening change and sorted by generation (old
- changes first)
- :rtype: (int, str, [(str, int, str)])
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- # query a couch list function
- ddoc_path = [
- '_design', 'transactions', '_list', 'whats_changed', 'log'
- ]
- res = self._database.resource(*ddoc_path)
- try:
- response = res.get_json(old_gen=old_generation)
- results = map(
- lambda row:
- (row['generation'], row['doc_id'], row['transaction_id']),
- response[2]['transactions'])
- results.reverse()
- cur_gen = old_generation
- seen = set()
- changes = []
- newest_trans_id = ''
- for generation, doc_id, trans_id in results:
- if doc_id not in seen:
- changes.append((doc_id, generation, trans_id))
- seen.add(doc_id)
- if changes:
- cur_gen = changes[0][1] # max generation
- newest_trans_id = changes[0][2]
- changes.reverse()
- else:
- cur_gen, newest_trans_id = self._get_generation_info()
-
- return cur_gen, newest_trans_id, changes
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
- except ServerError as e:
- raise_server_error(e, ddoc_path)
-
- def delete_doc(self, doc):
- """
- Mark a document as deleted.
-
- Will abort if the current revision doesn't match doc.rev.
- This will also set doc.content to None.
-
- :param doc: The document to mark as deleted.
- :type doc: CouchDocument.
-
- :raise DocumentDoesNotExist: Raised if the document does not
- exist.
- :raise RevisionConflict: Raised if the revisions do not match.
- :raise DocumentAlreadyDeleted: Raised if the document is
- already deleted.
- :raise ConflictedDoc: Raised if the doc has conflicts.
- """
- old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- if old_doc is None:
- raise DocumentDoesNotExist
- if old_doc.rev != doc.rev:
- raise RevisionConflict()
- if old_doc.is_tombstone():
- raise DocumentAlreadyDeleted
- if old_doc.has_conflicts:
- raise ConflictedDoc()
- new_rev = self._allocate_doc_rev(doc.rev)
- doc.rev = new_rev
- doc.make_tombstone()
- self._put_doc(old_doc, doc)
- return new_rev
-
- def _build_conflicts(self, doc_id, attached_conflicts):
- """
- Build the conflicted documents list from the conflicts attachment
- fetched from a couch document.
-
- :param attached_conflicts: The document's conflicts as fetched from a
- couch document attachment.
- :type attached_conflicts: dict
- """
- conflicts = []
- for doc_rev, content in attached_conflicts:
- doc = self._factory(doc_id, doc_rev)
- if content is None:
- doc.make_tombstone()
- else:
- doc.content = content
- conflicts.append(doc)
- return conflicts
-
- def get_doc_conflicts(self, doc_id, couch_rev=None):
- """
- Get the conflicted versions of a document.
-
- If the C{couch_rev} parameter is not None, conflicts for a specific
- document's couch revision are returned.
-
- :param couch_rev: The couch document revision.
- :type couch_rev: str
-
- :return: A list of conflicted versions of the document.
- :rtype: list
- """
- # request conflicts attachment from server
- params = {}
- conflicts = []
- if couch_rev is not None:
- params['rev'] = couch_rev # restric document's couch revision
- else:
- # TODO: move into resource logic!
- first_entry = self._get_doc(doc_id, check_for_conflicts=True)
- conflicts.append(first_entry)
- resource = self._database.resource(doc_id, 'u1db_conflicts')
- try:
- response = resource.get_json(**params)
- return conflicts + self._build_conflicts(
- doc_id, json.loads(response[2].read()))
- except ResourceNotFound:
- return []
-
- def _get_replica_gen_and_trans_id(self, other_replica_uid):
- """
- Return the last known generation and transaction id for the other db
- replica.
-
- When you do a synchronization with another replica, the Database keeps
- track of what generation the other database replica was at, and what
- the associated transaction id was. This is used to determine what data
- needs to be sent, and if two databases are claiming to be the same
- replica.
-
- :param other_replica_uid: The identifier for the other replica.
- :type other_replica_uid: str
-
- :return: A tuple containing the generation and transaction id we
- encountered during synchronization. If we've never
- synchronized with the replica, this is (0, '').
- :rtype: (int, str)
- """
- if other_replica_uid in self.cache:
- return self.cache[other_replica_uid]
-
- doc_id = 'u1db_sync_%s' % other_replica_uid
- try:
- doc = self._database[doc_id]
- except ResourceNotFound:
- doc = {
- '_id': doc_id,
- 'generation': 0,
- 'transaction_id': '',
- }
- self._database.save(doc)
- result = doc['generation'], doc['transaction_id']
- self.cache[other_replica_uid] = result
- return result
-
- def _set_replica_gen_and_trans_id(self, other_replica_uid,
- other_generation, other_transaction_id,
- number_of_docs=None, doc_idx=None,
- sync_id=None):
- """
- Set the last-known generation and transaction id for the other
- database replica.
-
- We have just performed some synchronization, and we want to track what
- generation the other replica was at. See also
- _get_replica_gen_and_trans_id.
-
- :param other_replica_uid: The U1DB identifier for the other replica.
- :type other_replica_uid: str
- :param other_generation: The generation number for the other replica.
- :type other_generation: int
- :param other_transaction_id: The transaction id associated with the
- generation.
- :type other_transaction_id: str
- :param number_of_docs: The total amount of documents sent on this sync
- session.
- :type number_of_docs: int
- :param doc_idx: The index of the current document being sent.
- :type doc_idx: int
- :param sync_id: The id of the current sync session.
- :type sync_id: str
- """
- if other_replica_uid is not None and other_generation is not None:
- self._do_set_replica_gen_and_trans_id(
- other_replica_uid, other_generation, other_transaction_id,
- number_of_docs=number_of_docs, doc_idx=doc_idx,
- sync_id=sync_id)
-
- def _do_set_replica_gen_and_trans_id(
- self, other_replica_uid, other_generation, other_transaction_id,
- number_of_docs=None, doc_idx=None, sync_id=None):
- """
- Set the last-known generation and transaction id for the other
- database replica.
-
- We have just performed some synchronization, and we want to track what
- generation the other replica was at. See also
- _get_replica_gen_and_trans_id.
-
- :param other_replica_uid: The U1DB identifier for the other replica.
- :type other_replica_uid: str
- :param other_generation: The generation number for the other replica.
- :type other_generation: int
- :param other_transaction_id: The transaction id associated with the
- generation.
- :type other_transaction_id: str
- :param number_of_docs: The total amount of documents sent on this sync
- session.
- :type number_of_docs: int
- :param doc_idx: The index of the current document being sent.
- :type doc_idx: int
- :param sync_id: The id of the current sync session.
- :type sync_id: str
- """
- self.cache[other_replica_uid] = (other_generation,
- other_transaction_id)
- doc_id = 'u1db_sync_%s' % other_replica_uid
- try:
- doc = self._database[doc_id]
- except ResourceNotFound:
- doc = {'_id': doc_id}
- doc['generation'] = other_generation
- doc['transaction_id'] = other_transaction_id
- self._database.save(doc)
-
- def _force_doc_sync_conflict(self, doc):
- """
- Add a conflict and force a document put.
-
- :param doc: The document to be put.
- :type doc: CouchDocument
- """
- my_doc = self._get_doc(doc.doc_id)
- self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev))
- doc.add_conflict(self._factory(doc.doc_id, my_doc.rev,
- my_doc.get_json()))
- doc.has_conflicts = True
- self._put_doc(my_doc, doc)
-
- def resolve_doc(self, doc, conflicted_doc_revs):
- """
- Mark a document as no longer conflicted.
-
- We take the list of revisions that the client knows about that it is
- superseding. This may be a different list from the actual current
- conflicts, in which case only those are removed as conflicted. This
- may fail if the conflict list is significantly different from the
- supplied information. (sync could have happened in the background from
- the time you GET_DOC_CONFLICTS until the point where you RESOLVE)
-
- :param doc: A Document with the new content to be inserted.
- :type doc: CouchDocument
- :param conflicted_doc_revs: A list of revisions that the new content
- supersedes.
- :type conflicted_doc_revs: [str]
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
- """
- cur_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- new_rev = self._ensure_maximal_rev(cur_doc.rev,
- conflicted_doc_revs)
- superseded_revs = set(conflicted_doc_revs)
- doc.rev = new_rev
- # this backend stores conflicts as properties of the documents, so we
- # have to copy these conflicts over to the document being updated.
- if cur_doc.rev in superseded_revs:
- # the newer doc version will supersede the one in the database, so
- # we copy conflicts before updating the backend.
- doc.set_conflicts(cur_doc.get_conflicts()) # copy conflicts over.
- doc.delete_conflicts(superseded_revs)
- self._put_doc(cur_doc, doc)
- else:
- # the newer doc version does not supersede the one in the
- # database, so we will add a conflict to the database and copy
- # those over to the document the user has in her hands.
- cur_doc.add_conflict(doc)
- cur_doc.delete_conflicts(superseded_revs)
- self._put_doc(cur_doc, cur_doc) # just update conflicts
- # backend has been updated with current conflicts, now copy them
- # to the current document.
- doc.set_conflicts(cur_doc.get_conflicts())
-
- def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,
- replica_trans_id='', number_of_docs=None,
- doc_idx=None, sync_id=None):
- """
- Insert/update document into the database with a given revision.
-
- This api is used during synchronization operations.
-
- If a document would conflict and save_conflict is set to True, the
- content will be selected as the 'current' content for doc.doc_id,
- even though doc.rev doesn't supersede the currently stored revision.
- The currently stored document will be added to the list of conflict
- alternatives for the given doc_id.
-
- This forces the new content to be 'current' so that we get convergence
- after synchronizing, even if people don't resolve conflicts. Users can
- then notice that their content is out of date, update it, and
- synchronize again. (The alternative is that users could synchronize and
- think the data has propagated, but their local copy looks fine, and the
- remote copy is never updated again.)
-
- :param doc: A document object
- :type doc: CouchDocument
- :param save_conflict: If this document is a conflict, do you want to
- save it as a conflict, or just ignore it.
- :type save_conflict: bool
- :param replica_uid: A unique replica identifier.
- :type replica_uid: str
- :param replica_gen: The generation of the replica corresponding to the
- this document. The replica arguments are optional,
- but are used during synchronization.
- :type replica_gen: int
- :param replica_trans_id: The transaction_id associated with the
- generation.
- :type replica_trans_id: str
- :param number_of_docs: The total amount of documents sent on this sync
- session.
- :type number_of_docs: int
- :param doc_idx: The index of the current document being sent.
- :type doc_idx: int
- :param sync_id: The id of the current sync session.
- :type sync_id: str
-
- :return: (state, at_gen) - If we don't have doc_id already, or if
- doc_rev supersedes the existing document revision, then the
- content will be inserted, and state is 'inserted'. If
- doc_rev is less than or equal to the existing revision, then
- the put is ignored and state is respecitvely 'superseded' or
- 'converged'. If doc_rev is not strictly superseded or
- supersedes, then state is 'conflicted'. The document will not
- be inserted if save_conflict is False. For 'inserted' or
- 'converged', at_gen is the insertion/current generation.
- :rtype: (str, int)
- """
- if not isinstance(doc, CouchDocument):
- doc = self._factory(doc.doc_id, doc.rev, doc.get_json())
- my_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
- if my_doc:
- doc.set_conflicts(my_doc.get_conflicts())
- return CommonBackend._put_doc_if_newer(self, doc, save_conflict,
- replica_uid, replica_gen,
- replica_trans_id)
-
- def _put_and_update_indexes(self, cur_doc, doc):
- self._put_doc(cur_doc, doc)
-
- def get_docs(self, doc_ids, check_for_conflicts=True,
- include_deleted=False):
- """
- Get the JSON content for many documents.
-
- :param doc_ids: A list of document identifiers or None for all.
- :type doc_ids: list
- :param check_for_conflicts: If set to False, then the conflict check
- will be skipped, and 'None' will be
- returned instead of True/False.
- :type check_for_conflicts: bool
- :param include_deleted: If set to True, deleted documents will be
- returned with empty content. Otherwise deleted
- documents will not be included in the results.
- :return: iterable giving the Document object for each document id
- in matching doc_ids order.
- :rtype: iterable
- """
- # Workaround for:
- #
- # http://bugs.python.org/issue7980
- # https://leap.se/code/issues/5449
- #
- # python-couchdb uses time.strptime, which is not thread safe. In
- # order to avoid the problem described on the issues above, we preload
- # strptime here by evaluating the conversion of an arbitrary date.
- # This will not be needed when/if we switch from python-couchdb to
- # paisley.
- time.strptime('Mar 8 1917', '%b %d %Y')
- get_one = lambda doc_id: self._get_doc(doc_id, check_for_conflicts)
- docs = [THREAD_POOL.apply_async(get_one, [doc_id])
- for doc_id in doc_ids]
- for doc in docs:
- doc = doc.get()
- if not doc or not include_deleted and doc.is_tombstone():
- continue
- yield doc
-
- def _prune_conflicts(self, doc, doc_vcr):
- """
- Overrides original method, but it is implemented elsewhere for
- simplicity.
- """
- doc.prune_conflicts(doc_vcr, self._replica_uid)
-
- def _new_resource(self, *path):
- """
- Return a new resource for accessing a couch database.
-
- :return: A resource for accessing a couch database.
- :rtype: couchdb.http.Resource
- """
- # Workaround for: https://leap.se/code/issues/5448
- url = couch_urljoin(self._database.resource.url, *path)
- resource = Resource(url, Session(timeout=COUCH_TIMEOUT))
- resource.credentials = self._database.resource.credentials
- resource.headers = self._database.resource.headers.copy()
- return resource
-
-
-class CouchSyncTarget(CommonSyncTarget):
-
- """
- Functionality for using a CouchDatabase as a synchronization target.
- """
-
- def get_sync_info(self, source_replica_uid):
- source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id(
- source_replica_uid)
- my_gen, my_trans_id = self._db._get_generation_info()
- return (
- self._db._replica_uid, my_gen, my_trans_id, source_gen,
- source_trans_id)
-
- def record_sync_info(self, source_replica_uid, source_replica_generation,
- source_replica_transaction_id):
- if self._trace_hook:
- self._trace_hook('record_sync_info')
- self._db._set_replica_gen_and_trans_id(
- source_replica_uid, source_replica_generation,
- source_replica_transaction_id)
-
-
-def is_db_name_valid(name):
- """
- Validate a user database using a regular expression.
-
- :param name: database name.
- :type name: str
-
- :return: boolean for name vailidity
- :rtype: bool
- """
- db_name_regex = "^user-[a-f0-9]+$"
- return re.match(db_name_regex, name) is not None
-
-
-class CouchServerState(ServerState):
-
- """
- Inteface of the WSGI server with the CouchDB backend.
- """
-
- def __init__(self, couch_url, create_cmd=None):
- """
- Initialize the couch server state.
-
- :param couch_url: The URL for the couch database.
- :type couch_url: str
- """
- self.couch_url = couch_url
- self.create_cmd = create_cmd
-
- def open_database(self, dbname):
- """
- Open a couch database.
-
- :param dbname: The name of the database to open.
- :type dbname: str
-
- :return: The CouchDatabase object.
- :rtype: CouchDatabase
- """
- db = CouchDatabase(
- self.couch_url,
- dbname,
- ensure_ddocs=False)
- return db
-
- def ensure_database(self, dbname):
- """
- Ensure couch database exists.
-
- :param dbname: The name of the database to ensure.
- :type dbname: str
-
- :raise Unauthorized: If disabled or other error was raised.
-
- :return: The CouchDatabase object and its replica_uid.
- :rtype: (CouchDatabase, str)
- """
- if not self.create_cmd:
- raise Unauthorized()
- else:
- code, out = exec_validated_cmd(self.create_cmd, dbname,
- validator=is_db_name_valid)
- if code is not 0:
- logger.error("""
- Error while creating database (%s) with (%s) command.
- Output: %s
- Exit code: %d
- """ % (dbname, self.create_cmd, out, code))
- raise Unauthorized()
- db = self.open_database(dbname)
- return db, db.replica_uid
-
- def delete_database(self, dbname):
- """
- Delete couch database.
-
- :param dbname: The name of the database to delete.
- :type dbname: str
-
- :raise Unauthorized: Always, because Soledad server is not allowed to
- delete databases.
- """
- raise Unauthorized()
diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py
new file mode 100644
index 00000000..bd8b08b7
--- /dev/null
+++ b/common/src/leap/soledad/common/couch/__init__.py
@@ -0,0 +1,722 @@
+# -*- coding: utf-8 -*-
+# __init__.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""A U1DB backend that uses CouchDB as its persistence layer."""
+
+
+import json
+import re
+import uuid
+import binascii
+import time
+
+
+from StringIO import StringIO
+from urlparse import urljoin
+from contextlib import contextmanager
+from multiprocessing.pool import ThreadPool
+
+
+from couchdb.client import Server, Database
+from couchdb.multipart import MultipartWriter
+from couchdb.http import (
+ ResourceConflict,
+ ResourceNotFound,
+ ServerError,
+ Session,
+ urljoin as couch_urljoin,
+ Resource,
+)
+from u1db.errors import (
+ DatabaseDoesNotExist,
+ InvalidGeneration,
+ RevisionConflict,
+)
+from u1db.remote import http_app
+
+
+from leap.soledad.common import ddocs
+from .errors import raise_server_error
+from .errors import raise_missing_design_doc_error
+from leap.soledad.common.errors import InvalidURLError
+from leap.soledad.common.document import ServerDocument
+from leap.soledad.common.backend import SoledadBackend
+
+
+COUCH_TIMEOUT = 120 # timeout for transfers between Soledad server and Couch
+
+
+def list_users_dbs(couch_url):
+ """
+ Retrieves a list with all databases that starts with 'user-' on CouchDB.
+ Those databases belongs to users. So, the list will contain all the
+ database names in the form of 'user-{uuid4}'.
+
+ :param couch_url: The couch url with needed credentials
+ :type couch_url: str
+
+ :return: The list of all database names from users.
+ :rtype: [str]
+ """
+ with couch_server(couch_url) as server:
+ users = [dbname for dbname in server if dbname.startswith('user-')]
+ return users
+
+
+# monkey-patch the u1db http app to use ServerDocument
+http_app.Document = ServerDocument
+
+
+@contextmanager
+def couch_server(url):
+ """
+ Provide a connection to a couch server and cleanup after use.
+
+ For database creation and deletion we use an ephemeral connection to the
+ couch server. That connection has to be properly closed, so we provide it
+ as a context manager.
+
+ :param url: The URL of the Couch server.
+ :type url: str
+ """
+ session = Session(timeout=COUCH_TIMEOUT)
+ server = Server(url=url, full_commit=False, session=session)
+ yield server
+
+
+THREAD_POOL = ThreadPool(20)
+
+
+class CouchDatabase(object):
+ """
+ Holds CouchDB related code.
+ This class gives methods to encapsulate database operations and hide
+ CouchDB details from backend code.
+ """
+
+ @classmethod
+ def open_database(cls, url, create, ensure_ddocs=False, replica_uid=None,
+ database_security=None):
+ """
+ Open a U1DB database using CouchDB as backend.
+
+ :param url: the url of the database replica
+ :type url: str
+ :param create: should the replica be created if it does not exist?
+ :type create: bool
+ :param replica_uid: an optional unique replica identifier
+ :type replica_uid: str
+ :param ensure_ddocs: Ensure that the design docs exist on server.
+ :type ensure_ddocs: bool
+
+ :return: the database instance
+ :rtype: SoledadBackend
+
+ :raise DatabaseDoesNotExist: Raised if database does not exist.
+ """
+ # get database from url
+ m = re.match('(^https?://[^/]+)/(.+)$', url)
+ if not m:
+ raise InvalidURLError
+ url = m.group(1)
+ dbname = m.group(2)
+ with couch_server(url) as server:
+ if dbname not in server:
+ if create:
+ server.create(dbname)
+ else:
+ raise DatabaseDoesNotExist()
+ db = cls(url,
+ dbname, ensure_ddocs=ensure_ddocs,
+ database_security=database_security)
+ return SoledadBackend(
+ db, replica_uid=replica_uid)
+
+ def __init__(self, url, dbname, ensure_ddocs=True,
+ database_security=None):
+ self._session = Session(timeout=COUCH_TIMEOUT)
+ self._url = url
+ self._dbname = dbname
+ self._database = self.get_couch_database(url, dbname)
+ if ensure_ddocs:
+ self.ensure_ddocs_on_db()
+ self.ensure_security_ddoc(database_security)
+
+ def get_couch_database(self, url, dbname):
+ """
+ Generate a couchdb.Database instance given a url and dbname.
+
+ :param url: CouchDB's server url with credentials
+ :type url: str
+ :param dbname: Database name
+ :type dbname: str
+
+ :return: couch library database instance
+ :rtype: couchdb.Database
+
+ :raise DatabaseDoesNotExist: Raised if database does not exist.
+ """
+ try:
+ return Database(
+ urljoin(url, dbname),
+ self._session)
+ except ResourceNotFound:
+ raise DatabaseDoesNotExist()
+
+ def ensure_ddocs_on_db(self):
+ """
+ Ensure that the design documents used by the backend exist on the
+ couch database.
+ """
+ for ddoc_name in ['docs', 'syncs', 'transactions']:
+ try:
+ self.json_from_resource(['_design', ddoc_name, '_info'],
+ check_missing_ddoc=False)
+ except ResourceNotFound:
+ ddoc = json.loads(
+ binascii.a2b_base64(
+ getattr(ddocs, ddoc_name)))
+ self._database.save(ddoc)
+
+ def ensure_security_ddoc(self, security_config=None):
+ """
+ Make sure that only soledad user is able to access this database as
+ an unprivileged member, meaning that administration access will
+ be forbidden even inside an user database.
+ The goal is to make sure that only the lowest access level is given
+ to the unprivileged CouchDB user set on the server process.
+ This is achieved by creating a _security design document, see:
+ http://docs.couchdb.org/en/latest/api/database/security.html
+
+ :param database_security: security configuration parsed from conf file
+ :type cache: dict
+ """
+ security_config = security_config or {}
+ security = self._database.resource.get_json('_security')[2]
+ security['members'] = {'names': [], 'roles': []}
+ security['members']['names'] = security_config.get('members',
+ ['soledad'])
+ security['members']['roles'] = security_config.get('members_roles', [])
+ security['admins'] = {'names': [], 'roles': []}
+ security['admins']['names'] = security_config.get('admins', [])
+ security['admins']['roles'] = security_config.get('admins_roles', [])
+ self._database.resource.put_json('_security', body=security)
+
+ def delete_database(self):
+ """
+ Delete a U1DB CouchDB database.
+ """
+ with couch_server(self._url) as server:
+ del(server[self._dbname])
+
+ def set_replica_uid(self, replica_uid):
+ """
+ Force the replica uid to be set.
+
+ :param replica_uid: The new replica uid.
+ :type replica_uid: str
+ """
+ try:
+ # set on existent config document
+ doc = self._database['u1db_config']
+ doc['replica_uid'] = replica_uid
+ except ResourceNotFound:
+ # or create the config document
+ doc = {
+ '_id': 'u1db_config',
+ 'replica_uid': replica_uid,
+ }
+ self._database.save(doc)
+
+ def get_replica_uid(self):
+ """
+ Get the replica uid.
+
+ :return: The replica uid.
+ :rtype: str
+ """
+ try:
+ # grab replica_uid from server
+ doc = self._database['u1db_config']
+ replica_uid = doc['replica_uid']
+ return replica_uid
+ except ResourceNotFound:
+ # create a unique replica_uid
+ replica_uid = uuid.uuid4().hex
+ self.set_replica_uid(replica_uid)
+ return replica_uid
+
+ def close(self):
+ self._database = None
+
+ def get_all_docs(self, include_deleted=False):
+ """
+ Get the JSON content for all documents in the database.
+
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :type include_deleted: bool
+
+ :return: (generation, [ServerDocument])
+ The current generation of the database, followed by a list of all
+ the documents in the database.
+ :rtype: (int, [ServerDocument])
+ """
+
+ generation, _ = self.get_generation_info()
+ results = list(self.get_docs(self._database,
+ include_deleted=include_deleted))
+ return (generation, results)
+
+ def get_docs(self, doc_ids, check_for_conflicts=True,
+ include_deleted=False):
+ """
+ Get the JSON content for many documents.
+
+ :param doc_ids: A list of document identifiers or None for all.
+ :type doc_ids: list
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped, and 'None' will be
+ returned instead of True/False.
+ :type check_for_conflicts: bool
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :return: iterable giving the Document object for each document id
+ in matching doc_ids order.
+ :rtype: iterable
+ """
+ # Workaround for:
+ #
+ # http://bugs.python.org/issue7980
+ # https://leap.se/code/issues/5449
+ #
+ # python-couchdb uses time.strptime, which is not thread safe. In
+ # order to avoid the problem described on the issues above, we preload
+ # strptime here by evaluating the conversion of an arbitrary date.
+ # This will not be needed when/if we switch from python-couchdb to
+ # paisley.
+ time.strptime('Mar 8 1917', '%b %d %Y')
+ get_one = lambda doc_id: self.get_doc(doc_id, check_for_conflicts)
+ docs = [THREAD_POOL.apply_async(get_one, [doc_id])
+ for doc_id in doc_ids]
+ for doc in docs:
+ doc = doc.get()
+ if not doc or not include_deleted and doc.is_tombstone():
+ continue
+ yield doc
+
+ def get_doc(self, doc_id, check_for_conflicts=False):
+ """
+ Extract the document from storage.
+
+ This can return None if the document doesn't exist.
+
+ :param doc_id: The unique document identifier
+ :type doc_id: str
+ :param check_for_conflicts: If set to False, then the conflict check
+ will be skipped.
+ :type check_for_conflicts: bool
+
+ :return: The document.
+ :rtype: ServerDocument
+ """
+ # get document with all attachments (u1db content and eventual
+ # conflicts)
+ if doc_id not in self._database:
+ return None
+ result = self.json_from_resource([doc_id], attachments=True)
+ return self.__parse_doc_from_couch(result, doc_id, check_for_conflicts)
+
+ def __parse_doc_from_couch(self, result, doc_id,
+ check_for_conflicts=False):
+ # restrict to u1db documents
+ if 'u1db_rev' not in result:
+ return None
+ doc = ServerDocument(doc_id, result['u1db_rev'])
+ # set contents or make tombstone
+ if '_attachments' not in result \
+ or 'u1db_content' not in result['_attachments']:
+ doc.make_tombstone()
+ else:
+ doc.content = json.loads(
+ binascii.a2b_base64(
+ result['_attachments']['u1db_content']['data']))
+ # determine if there are conflicts
+ if check_for_conflicts \
+ and '_attachments' in result \
+ and 'u1db_conflicts' in result['_attachments']:
+ doc.set_conflicts(
+ self._build_conflicts(
+ doc.doc_id,
+ json.loads(binascii.a2b_base64(
+ result['_attachments']['u1db_conflicts']['data']))))
+ # store couch revision
+ doc.couch_rev = result['_rev']
+ # store transactions
+ doc.transactions = result['u1db_transactions']
+ return doc
+
+ def _build_conflicts(self, doc_id, attached_conflicts):
+ """
+ Build the conflicted documents list from the conflicts attachment
+ fetched from a couch document.
+
+ :param attached_conflicts: The document's conflicts as fetched from a
+ couch document attachment.
+ :type attached_conflicts: dict
+ """
+ conflicts = []
+ for doc_rev, content in attached_conflicts:
+ doc = ServerDocument(doc_id, doc_rev)
+ if content is None:
+ doc.make_tombstone()
+ else:
+ doc.content = content
+ conflicts.append(doc)
+ return conflicts
+
+ def get_trans_id_for_gen(self, generation):
+ """
+ Get the transaction id corresponding to a particular generation.
+
+ :param generation: The generation for which to get the transaction id.
+ :type generation: int
+
+ :return: The transaction id for C{generation}.
+ :rtype: str
+
+ :raise InvalidGeneration: Raised when the generation does not exist.
+ """
+ if generation == 0:
+ return ''
+ # query a couch list function
+ ddoc_path = [
+ '_design', 'transactions', '_list', 'trans_id_for_gen', 'log'
+ ]
+ response = self.json_from_resource(ddoc_path, gen=generation)
+ if response == {}:
+ raise InvalidGeneration
+ return response['transaction_id']
+
+ def get_replica_gen_and_trans_id(self, other_replica_uid):
+ """
+ Return the last known generation and transaction id for the other db
+ replica.
+
+ When you do a synchronization with another replica, the Database keeps
+ track of what generation the other database replica was at, and what
+ the associated transaction id was. This is used to determine what data
+ needs to be sent, and if two databases are claiming to be the same
+ replica.
+
+ :param other_replica_uid: The identifier for the other replica.
+ :type other_replica_uid: str
+
+ :return: A tuple containing the generation and transaction id we
+ encountered during synchronization. If we've never
+ synchronized with the replica, this is (0, '').
+ :rtype: (int, str)
+ """
+ doc_id = 'u1db_sync_%s' % other_replica_uid
+ try:
+ doc = self._database[doc_id]
+ except ResourceNotFound:
+ doc = {
+ '_id': doc_id,
+ 'generation': 0,
+ 'transaction_id': '',
+ }
+ self._database.save(doc)
+ result = doc['generation'], doc['transaction_id']
+ return result
+
+ def get_doc_conflicts(self, doc_id, couch_rev=None):
+ """
+ Get the conflicted versions of a document.
+
+ If the C{couch_rev} parameter is not None, conflicts for a specific
+ document's couch revision are returned.
+
+ :param couch_rev: The couch document revision.
+ :type couch_rev: str
+
+ :return: A list of conflicted versions of the document.
+ :rtype: list
+ """
+ # request conflicts attachment from server
+ params = {}
+ conflicts = []
+ if couch_rev is not None:
+ params['rev'] = couch_rev # restric document's couch revision
+ else:
+ # TODO: move into resource logic!
+ first_entry = self.get_doc(doc_id, check_for_conflicts=True)
+ conflicts.append(first_entry)
+
+ try:
+ response = self.json_from_resource([doc_id, 'u1db_conflicts'],
+ check_missing_ddoc=False,
+ **params)
+ return conflicts + self._build_conflicts(
+ doc_id, json.loads(response.read()))
+ except ResourceNotFound:
+ return []
+
+ def set_replica_gen_and_trans_id(
+ self, other_replica_uid, other_generation, other_transaction_id):
+ """
+ Set the last-known generation and transaction id for the other
+ database replica.
+
+ We have just performed some synchronization, and we want to track what
+ generation the other replica was at. See also
+ get_replica_gen_and_trans_id.
+
+ :param other_replica_uid: The U1DB identifier for the other replica.
+ :type other_replica_uid: str
+ :param other_generation: The generation number for the other replica.
+ :type other_generation: int
+ :param other_transaction_id: The transaction id associated with the
+ generation.
+ :type other_transaction_id: str
+ """
+ doc_id = 'u1db_sync_%s' % other_replica_uid
+ try:
+ doc = self._database[doc_id]
+ except ResourceNotFound:
+ doc = {'_id': doc_id}
+ doc['generation'] = other_generation
+ doc['transaction_id'] = other_transaction_id
+ self._database.save(doc)
+
+ def get_transaction_log(self):
+ """
+ This is only for the test suite, it is not part of the api.
+
+ :return: The complete transaction log.
+ :rtype: [(str, str)]
+ """
+ # query a couch view
+ ddoc_path = ['_design', 'transactions', '_view', 'log']
+ response = self.json_from_resource(ddoc_path)
+ return map(
+ lambda row: (row['id'], row['value']),
+ response['rows'])
+
+ def whats_changed(self, old_generation=0):
+ """
+ Return a list of documents that have changed since old_generation.
+
+ :param old_generation: The generation of the database in the old
+ state.
+ :type old_generation: int
+
+ :return: (generation, trans_id, [(doc_id, generation, trans_id),...])
+ The current generation of the database, its associated
+ transaction id, and a list of of changed documents since
+ old_generation, represented by tuples with for each document
+ its doc_id and the generation and transaction id corresponding
+ to the last intervening change and sorted by generation (old
+ changes first)
+ :rtype: (int, str, [(str, int, str)])
+ """
+ # query a couch list function
+ ddoc_path = [
+ '_design', 'transactions', '_list', 'whats_changed', 'log'
+ ]
+ response = self.json_from_resource(ddoc_path, old_gen=old_generation)
+ results = map(
+ lambda row:
+ (row['generation'], row['doc_id'], row['transaction_id']),
+ response['transactions'])
+ results.reverse()
+ cur_gen = old_generation
+ seen = set()
+ changes = []
+ newest_trans_id = ''
+ for generation, doc_id, trans_id in results:
+ if doc_id not in seen:
+ changes.append((doc_id, generation, trans_id))
+ seen.add(doc_id)
+ if changes:
+ cur_gen = changes[0][1] # max generation
+ newest_trans_id = changes[0][2]
+ changes.reverse()
+ else:
+ cur_gen, newest_trans_id = self.get_generation_info()
+
+ return cur_gen, newest_trans_id, changes
+
+ def get_generation_info(self):
+ """
+ Return the current generation.
+
+ :return: A tuple containing the current generation and transaction id.
+ :rtype: (int, str)
+ """
+ # query a couch list function
+ ddoc_path = ['_design', 'transactions', '_list', 'generation', 'log']
+ info = self.json_from_resource(ddoc_path)
+ return (info['generation'], info['transaction_id'])
+
+ def json_from_resource(self, ddoc_path, check_missing_ddoc=True,
+ **kwargs):
+ """
+ Get a resource from it's path and gets a doc's JSON using provided
+ parameters, also checking for missing design docs by default.
+
+ :param ddoc_path: The path to resource.
+ :type ddoc_path: [str]
+ :param check_missing_ddoc: Raises info on what design doc is missing.
+ :type check_missin_ddoc: bool
+
+ :return: The request's data parsed from JSON to a dict.
+ :rtype: dict
+
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ if ddoc_path is not None:
+ resource = self._database.resource(*ddoc_path)
+ else:
+ resource = self._database.resource()
+ try:
+ _, _, data = resource.get_json(**kwargs)
+ return data
+ except ResourceNotFound as e:
+ if check_missing_ddoc:
+ raise_missing_design_doc_error(e, ddoc_path)
+ else:
+ raise e
+ except ServerError as e:
+ raise_server_error(e, ddoc_path)
+
+ def save_document(self, old_doc, doc, transaction_id):
+ """
+ Put the document in the Couch backend database.
+
+ Note that C{old_doc} must have been fetched with the parameter
+ C{check_for_conflicts} equal to True, so we can properly update the
+ new document using the conflict information from the old one.
+
+ :param old_doc: The old document version.
+ :type old_doc: ServerDocument
+ :param doc: The document to be put.
+ :type doc: ServerDocument
+
+ :raise RevisionConflict: Raised when trying to update a document but
+ couch revisions mismatch.
+ :raise MissingDesignDocError: Raised when tried to access a missing
+ design document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access
+ a missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a
+ design document for an yet
+ unknown reason.
+ """
+ attachments = {} # we save content and conflicts as attachments
+ parts = [] # and we put it using couch's multipart PUT
+ # save content as attachment
+ if doc.is_tombstone() is False:
+ content = doc.get_json()
+ attachments['u1db_content'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(content),
+ }
+ parts.append(content)
+ # save conflicts as attachment
+ if doc.has_conflicts is True:
+ conflicts = json.dumps(
+ map(lambda cdoc: (cdoc.rev, cdoc.content),
+ doc.get_conflicts()))
+ attachments['u1db_conflicts'] = {
+ 'follows': True,
+ 'content_type': 'application/octet-stream',
+ 'length': len(conflicts),
+ }
+ parts.append(conflicts)
+ # store old transactions, if any
+ transactions = old_doc.transactions[:] if old_doc is not None else []
+ # create a new transaction id and timestamp it so the transaction log
+ # is consistent when querying the database.
+ transactions.append(
+ # here we store milliseconds to keep consistent with javascript
+ # Date.prototype.getTime() which was used before inside a couchdb
+ # update handler.
+ (int(time.time() * 1000),
+ transaction_id))
+ # build the couch document
+ couch_doc = {
+ '_id': doc.doc_id,
+ 'u1db_rev': doc.rev,
+ 'u1db_transactions': transactions,
+ '_attachments': attachments,
+ }
+ # if we are updating a doc we have to add the couch doc revision
+ if old_doc is not None and hasattr(old_doc, 'couch_rev'):
+ couch_doc['_rev'] = old_doc.couch_rev
+ # prepare the multipart PUT
+ buf = StringIO()
+ headers = {}
+ envelope = MultipartWriter(buf, headers=headers, subtype='related')
+ envelope.add('application/json', json.dumps(couch_doc))
+ for part in parts:
+ envelope.add('application/octet-stream', part)
+ envelope.close()
+ # try to save and fail if there's a revision conflict
+ try:
+ resource = self._new_resource()
+ resource.put_json(
+ doc.doc_id, body=str(buf.getvalue()), headers=headers)
+ except ResourceConflict:
+ raise RevisionConflict()
+ return transactions[-1][1]
+
+ def _new_resource(self, *path):
+ """
+ Return a new resource for accessing a couch database.
+
+ :return: A resource for accessing a couch database.
+ :rtype: couchdb.http.Resource
+ """
+ # Workaround for: https://leap.se/code/issues/5448
+ url = couch_urljoin(self._database.resource.url, *path)
+ resource = Resource(url, Session(timeout=COUCH_TIMEOUT))
+ resource.credentials = self._database.resource.credentials
+ resource.headers = self._database.resource.headers.copy()
+ return resource
diff --git a/common/src/leap/soledad/common/couch/errors.py b/common/src/leap/soledad/common/couch/errors.py
new file mode 100644
index 00000000..e894d58f
--- /dev/null
+++ b/common/src/leap/soledad/common/couch/errors.py
@@ -0,0 +1,144 @@
+# -*- coding: utf-8 -*-
+# errors.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from leap.soledad.common.errors import SoledadError
+from leap.soledad.common.errors import register_exception
+
+"""
+Specific errors that can be raised by CouchDatabase.
+"""
+
+
+@register_exception
+class MissingDesignDocError(SoledadError):
+
+ """
+ Raised when trying to access a missing couch design document.
+ """
+
+ wire_description = "missing design document"
+ status = 500
+
+
+@register_exception
+class MissingDesignDocNamedViewError(SoledadError):
+
+ """
+ Raised when trying to access a missing named view on a couch design
+ document.
+ """
+
+ wire_description = "missing design document named function"
+ status = 500
+
+
+@register_exception
+class MissingDesignDocListFunctionError(SoledadError):
+
+ """
+ Raised when trying to access a missing list function on a couch design
+ document.
+ """
+
+ wire_description = "missing design document list function"
+ status = 500
+
+
+@register_exception
+class MissingDesignDocDeletedError(SoledadError):
+
+ """
+ Raised when trying to access a deleted couch design document.
+ """
+
+ wire_description = "design document was deleted"
+ status = 500
+
+
+@register_exception
+class DesignDocUnknownError(SoledadError):
+
+ """
+ Raised when trying to access a couch design document and getting an
+ unknown error.
+ """
+
+ wire_description = "missing design document unknown error"
+ status = 500
+
+
+def raise_missing_design_doc_error(exc, ddoc_path):
+ """
+ Raise an appropriate exception when catching a ResourceNotFound when
+ accessing a design document.
+
+ :param exc: The exception cought.
+ :type exc: ResourceNotFound
+ :param ddoc_path: A list representing the requested path.
+ :type ddoc_path: list
+
+ :raise MissingDesignDocError: Raised when tried to access a missing design
+ document.
+ :raise MissingDesignDocListFunctionError: Raised when trying to access a
+ missing list function on a
+ design document.
+ :raise MissingDesignDocNamedViewError: Raised when trying to access a
+ missing named view on a design
+ document.
+ :raise MissingDesignDocDeletedError: Raised when trying to access a
+ deleted design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a design
+ document for an yet unknown reason.
+ """
+ path = "".join(ddoc_path)
+ if exc.message[1] == 'missing':
+ raise MissingDesignDocError(path)
+ elif exc.message[1] == 'missing function' or \
+ exc.message[1].startswith('missing lists function'):
+ raise MissingDesignDocListFunctionError(path)
+ elif exc.message[1] == 'missing_named_view':
+ raise MissingDesignDocNamedViewError(path)
+ elif exc.message[1] == 'deleted':
+ raise MissingDesignDocDeletedError(path)
+ # other errors are unknown for now
+ raise DesignDocUnknownError("%s: %s" % (path, str(exc.message)))
+
+
+def raise_server_error(exc, ddoc_path):
+ """
+ Raise an appropriate exception when catching a ServerError when
+ accessing a design document.
+
+ :param exc: The exception cought.
+ :type exc: ResourceNotFound
+ :param ddoc_path: A list representing the requested path.
+ :type ddoc_path: list
+
+ :raise MissingDesignDocListFunctionError: Raised when trying to access a
+ missing list function on a
+ design document.
+ :raise MissingDesignDocUnknownError: Raised when failed to access a design
+ document for an yet unknown reason.
+ """
+ path = "".join(ddoc_path)
+ msg = exc.message[1][0]
+ if msg == 'unnamed_error':
+ raise MissingDesignDocListFunctionError(path)
+ elif msg == 'TypeError':
+ if 'point is undefined' in exc.message[1][1]:
+ raise MissingDesignDocListFunctionError
+ # other errors are unknown for now
+ raise DesignDocUnknownError("%s: %s" % (path, str(exc.message)))
diff --git a/common/src/leap/soledad/common/couch/state.py b/common/src/leap/soledad/common/couch/state.py
new file mode 100644
index 00000000..4f07c105
--- /dev/null
+++ b/common/src/leap/soledad/common/couch/state.py
@@ -0,0 +1,160 @@
+# -*- coding: utf-8 -*-
+# state.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Server state using CouchDatabase as backend.
+"""
+import re
+import logging
+import time
+from urlparse import urljoin
+from hashlib import sha512
+
+from u1db.remote.server_state import ServerState
+from leap.soledad.common.command import exec_validated_cmd
+from leap.soledad.common.couch import CouchDatabase
+from leap.soledad.common.couch import couch_server
+from u1db.errors import Unauthorized
+
+
+logger = logging.getLogger(__name__)
+
+
+def is_db_name_valid(name):
+ """
+ Validate a user database using a regular expression.
+
+ :param name: database name.
+ :type name: str
+
+ :return: boolean for name vailidity
+ :rtype: bool
+ """
+ db_name_regex = "^user-[a-f0-9]+$"
+ return re.match(db_name_regex, name) is not None
+
+
+class CouchServerState(ServerState):
+
+ """
+ Inteface of the WSGI server with the CouchDB backend.
+ """
+
+ TOKENS_DB_PREFIX = "tokens_"
+ TOKENS_DB_EXPIRE = 30 * 24 * 3600 # 30 days in seconds
+ TOKENS_TYPE_KEY = "type"
+ TOKENS_TYPE_DEF = "Token"
+ TOKENS_USER_ID_KEY = "user_id"
+
+ def __init__(self, couch_url, create_cmd=None):
+ """
+ Initialize the couch server state.
+
+ :param couch_url: The URL for the couch database.
+ :type couch_url: str
+ """
+ self.couch_url = couch_url
+ self.create_cmd = create_cmd
+
+ def open_database(self, dbname):
+ """
+ Open a couch database.
+
+ :param dbname: The name of the database to open.
+ :type dbname: str
+
+ :return: The SoledadBackend object.
+ :rtype: SoledadBackend
+ """
+ url = urljoin(self.couch_url, dbname)
+ db = CouchDatabase.open_database(url, create=False, ensure_ddocs=False)
+ return db
+
+ def ensure_database(self, dbname):
+ """
+ Ensure couch database exists.
+
+ :param dbname: The name of the database to ensure.
+ :type dbname: str
+
+ :raise Unauthorized: If disabled or other error was raised.
+
+ :return: The SoledadBackend object and its replica_uid.
+ :rtype: (SoledadBackend, str)
+ """
+ if not self.create_cmd:
+ raise Unauthorized()
+ else:
+ code, out = exec_validated_cmd(self.create_cmd, dbname,
+ validator=is_db_name_valid)
+ if code is not 0:
+ logger.error("""
+ Error while creating database (%s) with (%s) command.
+ Output: %s
+ Exit code: %d
+ """ % (dbname, self.create_cmd, out, code))
+ raise Unauthorized()
+ db = self.open_database(dbname)
+ return db, db.replica_uid
+
+ def delete_database(self, dbname):
+ """
+ Delete couch database.
+
+ :param dbname: The name of the database to delete.
+ :type dbname: str
+
+ :raise Unauthorized: Always, because Soledad server is not allowed to
+ delete databases.
+ """
+ raise Unauthorized()
+
+ def verify_token(self, uuid, token):
+ """
+ Query couchdb to decide if C{token} is valid for C{uuid}.
+
+ @param uuid: The user uuid.
+ @type uuid: str
+ @param token: The token.
+ @type token: str
+ """
+ with couch_server(self.couch_url) as server:
+ # the tokens db rotates every 30 days, and the current db name is
+ # "tokens_NNN", where NNN is the number of seconds since epoch
+ # divide dby the rotate period in seconds. When rotating, old and
+ # new tokens db coexist during a certain window of time and valid
+ # tokens are replicated from the old db to the new one. See:
+ # https://leap.se/code/issues/6785
+ dbname = self._tokens_dbname()
+ db = server[dbname]
+ # lookup key is a hash of the token to prevent timing attacks.
+ token = db.get(sha512(token).hexdigest())
+ if token is None:
+ return False
+ # we compare uuid hashes to avoid possible timing attacks that
+ # might exploit python's builtin comparison operator behaviour,
+ # which fails immediatelly when non-matching bytes are found.
+ couch_uuid_hash = sha512(token[self.TOKENS_USER_ID_KEY]).digest()
+ req_uuid_hash = sha512(uuid).digest()
+ if token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF \
+ or couch_uuid_hash != req_uuid_hash:
+ return False
+ return True
+
+ def _tokens_dbname(self):
+ dbname = self.TOKENS_DB_PREFIX + \
+ str(int(time.time() / self.TOKENS_DB_EXPIRE))
+ return dbname
diff --git a/common/src/leap/soledad/common/document.py b/common/src/leap/soledad/common/document.py
index 919ade12..9e0c0976 100644
--- a/common/src/leap/soledad/common/document.py
+++ b/common/src/leap/soledad/common/document.py
@@ -108,3 +108,73 @@ class SoledadDocument(Document):
_get_rev,
_set_rev,
doc="Wrapper to ensure `doc.rev` is always returned as bytes.")
+
+
+class ServerDocument(SoledadDocument):
+ """
+ This is the document used by server to hold conflicts and transactions
+ on a database.
+
+ The goal is to ensure an atomic and consistent update of the database.
+ """
+
+ def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False):
+ """
+ Container for handling a document that stored on server.
+
+ :param doc_id: The unique document identifier.
+ :type doc_id: str
+ :param rev: The revision identifier of the document.
+ :type rev: str
+ :param json: The JSON string for this document.
+ :type json: str
+ :param has_conflicts: Boolean indicating if this document has conflicts
+ :type has_conflicts: bool
+ """
+ SoledadDocument.__init__(self, doc_id, rev, json, has_conflicts)
+ self._conflicts = None
+
+ def get_conflicts(self):
+ """
+ Get the conflicted versions of the document.
+
+ :return: The conflicted versions of the document.
+ :rtype: [ServerDocument]
+ """
+ return self._conflicts or []
+
+ def set_conflicts(self, conflicts):
+ """
+ Set the conflicted versions of the document.
+
+ :param conflicts: The conflicted versions of the document.
+ :type conflicts: list
+ """
+ self._conflicts = conflicts
+ self.has_conflicts = len(self._conflicts) > 0
+
+ def add_conflict(self, doc):
+ """
+ Add a conflict to this document.
+
+ :param doc: The conflicted version to be added.
+ :type doc: Document
+ """
+ if self._conflicts is None:
+ raise Exception("Fetch conflicts first!")
+ self._conflicts.append(doc)
+ self.has_conflicts = len(self._conflicts) > 0
+
+ def delete_conflicts(self, conflict_revs):
+ """
+ Delete conflicted versions of this document.
+
+ :param conflict_revs: The conflicted revisions to be deleted.
+ :type conflict_revs: [str]
+ """
+ if self._conflicts is None:
+ raise Exception("Fetch conflicts first!")
+ self._conflicts = filter(
+ lambda doc: doc.rev not in conflict_revs,
+ self._conflicts)
+ self.has_conflicts = len(self._conflicts) > 0
diff --git a/common/src/leap/soledad/common/errors.py b/common/src/leap/soledad/common/errors.py
index 5798770b..2f6fd1d8 100644
--- a/common/src/leap/soledad/common/errors.py
+++ b/common/src/leap/soledad/common/errors.py
@@ -132,67 +132,14 @@ class CouldNotObtainLockError(SoledadError):
#
-# CouchDatabase errors
-#
-
-@register_exception
-class MissingDesignDocError(SoledadError):
-
- """
- Raised when trying to access a missing couch design document.
- """
-
- wire_description = "missing design document"
- status = 500
-
-
-@register_exception
-class MissingDesignDocNamedViewError(SoledadError):
-
- """
- Raised when trying to access a missing named view on a couch design
- document.
- """
-
- wire_description = "missing design document named function"
- status = 500
-
-
-@register_exception
-class MissingDesignDocListFunctionError(SoledadError):
-
- """
- Raised when trying to access a missing list function on a couch design
- document.
- """
-
- wire_description = "missing design document list function"
- status = 500
-
-
-@register_exception
-class MissingDesignDocDeletedError(SoledadError):
-
- """
- Raised when trying to access a deleted couch design document.
- """
-
- wire_description = "design document was deleted"
- status = 500
+# SoledadBackend errors
+# u1db error statuses also have to be updated
+http_errors.ERROR_STATUSES = set(
+ http_errors.wire_description_to_status.values())
-@register_exception
-class DesignDocUnknownError(SoledadError):
+class InvalidURLError(Exception):
"""
- Raised when trying to access a couch design document and getting an
- unknown error.
+ Exception raised when Soledad encounters a malformed URL.
"""
-
- wire_description = "missing design document unknown error"
- status = 500
-
-
-# u1db error statuses also have to be updated
-http_errors.ERROR_STATUSES = set(
- http_errors.wire_description_to_status.values())
diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py
index 86cc0881..7ba50e11 100644
--- a/common/src/leap/soledad/common/tests/test_couch.py
+++ b/common/src/leap/soledad/common/tests/test_couch.py
@@ -36,7 +36,8 @@ from u1db import SyncTarget
from u1db import vectorclock
from leap.soledad.common import couch
-from leap.soledad.common import errors
+from leap.soledad.common.document import ServerDocument
+from leap.soledad.common.couch import errors
from leap.soledad.common.tests import u1db_tests as tests
from leap.soledad.common.tests.util import CouchDBTestCase
@@ -46,8 +47,6 @@ from leap.soledad.common.tests.util import sync_via_synchronizer
from leap.soledad.common.tests.u1db_tests import test_backends
from leap.soledad.common.tests.u1db_tests import DatabaseBaseTests
-from u1db.backends.inmemory import InMemoryIndex
-
# -----------------------------------------------------------------------------
# The following tests come from `u1db.tests.test_common_backend`.
@@ -133,7 +132,7 @@ def copy_couch_database_for_test(test, db):
def make_document_for_test(test, doc_id, rev, content, has_conflicts=False):
- return couch.CouchDocument(
+ return ServerDocument(
doc_id, rev, content, has_conflicts=has_conflicts)
@@ -150,7 +149,7 @@ class CouchTests(
scenarios = COUCH_SCENARIOS
-class CouchDatabaseTests(
+class SoledadBackendTests(
TestWithScenarios,
test_backends.LocalDatabaseTests,
CouchDBTestCase):
@@ -206,7 +205,7 @@ simple_doc = tests.simple_doc
nested_doc = tests.nested_doc
-class CouchDatabaseSyncTargetTests(
+class SoledadBackendSyncTargetTests(
TestWithScenarios,
DatabaseBaseTests,
CouchDBTestCase):
@@ -529,92 +528,6 @@ class CouchDatabaseSyncTargetTests(
self.st.record_sync_info('replica', 0, 'T-sid')
self.assertEqual(expected, called)
-
-# The following tests need that the database have an index, so we fake one.
-
-class IndexedCouchDatabase(couch.CouchDatabase):
-
- def __init__(self, url, dbname, replica_uid=None, ensure_ddocs=True,
- database_security=None):
- old_class.__init__(self, url, dbname, replica_uid=replica_uid,
- ensure_ddocs=ensure_ddocs,
- database_security=database_security)
- self._indexes = {}
-
- def _put_doc(self, old_doc, doc):
- for index in self._indexes.itervalues():
- if old_doc is not None and not old_doc.is_tombstone():
- index.remove_json(old_doc.doc_id, old_doc.get_json())
- if not doc.is_tombstone():
- index.add_json(doc.doc_id, doc.get_json())
- old_class._put_doc(self, old_doc, doc)
-
- def create_index(self, index_name, *index_expressions):
- if index_name in self._indexes:
- if self._indexes[index_name]._definition == list(
- index_expressions):
- return
- raise u1db_errors.IndexNameTakenError
- index = InMemoryIndex(index_name, list(index_expressions))
- _, all_docs = self.get_all_docs()
- for doc in all_docs:
- index.add_json(doc.doc_id, doc.get_json())
- self._indexes[index_name] = index
-
- def delete_index(self, index_name):
- del self._indexes[index_name]
-
- def list_indexes(self):
- definitions = []
- for idx in self._indexes.itervalues():
- definitions.append((idx._name, idx._definition))
- return definitions
-
- def get_from_index(self, index_name, *key_values):
- try:
- index = self._indexes[index_name]
- except KeyError:
- raise u1db_errors.IndexDoesNotExist
- doc_ids = index.lookup(key_values)
- result = []
- for doc_id in doc_ids:
- result.append(self._get_doc(doc_id, check_for_conflicts=True))
- return result
-
- def get_range_from_index(self, index_name, start_value=None,
- end_value=None):
- """Return all documents with key values in the specified range."""
- try:
- index = self._indexes[index_name]
- except KeyError:
- raise u1db_errors.IndexDoesNotExist
- if isinstance(start_value, basestring):
- start_value = (start_value,)
- if isinstance(end_value, basestring):
- end_value = (end_value,)
- doc_ids = index.lookup_range(start_value, end_value)
- result = []
- for doc_id in doc_ids:
- result.append(self._get_doc(doc_id, check_for_conflicts=True))
- return result
-
- def get_index_keys(self, index_name):
- try:
- index = self._indexes[index_name]
- except KeyError:
- raise u1db_errors.IndexDoesNotExist
- keys = index.keys()
- # XXX inefficiency warning
- return list(set([tuple(key.split('\x01')) for key in keys]))
-
-
-# monkey patch CouchDatabase (once) to include virtual indexes
-if getattr(couch.CouchDatabase, '_old_class', None) is None:
- old_class = couch.CouchDatabase
- IndexedCouchDatabase._old_class = old_class
- couch.CouchDatabase = IndexedCouchDatabase
-
-
sync_scenarios = []
for name, scenario in COUCH_SCENARIOS:
scenario = dict(scenario)
@@ -623,7 +536,7 @@ for name, scenario in COUCH_SCENARIOS:
scenario = dict(scenario)
-class CouchDatabaseSyncTests(
+class SoledadBackendSyncTests(
TestWithScenarios,
DatabaseBaseTests,
CouchDBTestCase):
@@ -924,7 +837,6 @@ class CouchDatabaseSyncTests(
self.db1 = self.create_database('test1', 'source')
self.db2 = self.create_database('test2', 'target')
doc = self.db2.create_doc_from_json(simple_doc)
- self.db1.create_index('test-idx', 'key')
self.assertEqual(0, self.sync(self.db1, self.db2))
self.assertGetDoc(self.db1, doc.doc_id, doc.rev, simple_doc, False)
self.assertEqual(1, self.db1._get_replica_gen_and_trans_id('test2')[0])
@@ -934,7 +846,7 @@ class CouchDatabaseSyncTests(
{'receive': {'docs': [], 'last_known_gen': 0},
'return': {'docs': [(doc.doc_id, doc.rev)],
'last_gen': 1}})
- self.assertEqual([doc], self.db1.get_from_index('test-idx', 'value'))
+ self.assertGetDoc(self.db2, doc.doc_id, doc.rev, simple_doc, False)
def test_sync_pulling_doesnt_update_other_if_changed(self):
self.db1 = self.create_database('test1', 'source')
@@ -1023,7 +935,6 @@ class CouchDatabaseSyncTests(
doc1 = self.db1.create_doc_from_json(simple_doc)
doc_id = doc1.doc_id
doc1_rev = doc1.rev
- self.db1.create_index('test-idx', 'key')
new_doc = '{"key": "altval"}'
doc2 = self.db2.create_doc_from_json(new_doc, doc_id=doc_id)
doc2_rev = doc2.rev
@@ -1039,18 +950,12 @@ class CouchDatabaseSyncTests(
self.assertTransactionLog([doc_id, doc_id], self.db1)
self.assertGetDoc(self.db1, doc_id, doc2_rev, new_doc, True)
self.assertGetDoc(self.db2, doc_id, doc2_rev, new_doc, False)
- from_idx = self.db1.get_from_index('test-idx', 'altval')[0]
- self.assertEqual(doc2.doc_id, from_idx.doc_id)
- self.assertEqual(doc2.rev, from_idx.rev)
- self.assertTrue(from_idx.has_conflicts)
- self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
def test_sync_sees_remote_delete_conflicted(self):
self.db1 = self.create_database('test1', 'source')
self.db2 = self.create_database('test2', 'target')
doc1 = self.db1.create_doc_from_json(simple_doc)
doc_id = doc1.doc_id
- self.db1.create_index('test-idx', 'key')
self.sync(self.db1, self.db2)
doc2 = self.make_document(doc1.doc_id, doc1.rev, doc1.get_json())
new_doc = '{"key": "altval"}'
@@ -1070,7 +975,6 @@ class CouchDatabaseSyncTests(
self.assertGetDocIncludeDeleted(self.db1, doc_id, doc2.rev, None, True)
self.assertGetDocIncludeDeleted(
self.db2, doc_id, doc2.rev, None, False)
- self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
def test_sync_local_race_conflicted(self):
self.db1 = self.create_database('test1', 'source')
@@ -1078,7 +982,6 @@ class CouchDatabaseSyncTests(
doc = self.db1.create_doc_from_json(simple_doc)
doc_id = doc.doc_id
doc1_rev = doc.rev
- self.db1.create_index('test-idx', 'key')
self.sync(self.db1, self.db2)
content1 = '{"key": "localval"}'
content2 = '{"key": "altval"}'
@@ -1097,21 +1000,13 @@ class CouchDatabaseSyncTests(
self.sync(self.db1, self.db2, trace_hook=after_whatschanged)
self.assertEqual([True], triggered)
self.assertGetDoc(self.db1, doc_id, doc2_rev2, content2, True)
- from_idx = self.db1.get_from_index('test-idx', 'altval')[0]
- self.assertEqual(doc.doc_id, from_idx.doc_id)
- self.assertEqual(doc.rev, from_idx.rev)
- self.assertTrue(from_idx.has_conflicts)
- self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
- self.assertEqual([], self.db1.get_from_index('test-idx', 'localval'))
def test_sync_propagates_deletes(self):
self.db1 = self.create_database('test1', 'source')
self.db2 = self.create_database('test2', 'both')
doc1 = self.db1.create_doc_from_json(simple_doc)
doc_id = doc1.doc_id
- self.db1.create_index('test-idx', 'key')
self.sync(self.db1, self.db2)
- self.db2.create_index('test-idx', 'key')
self.db3 = self.create_database('test3', 'target')
self.sync(self.db1, self.db3)
self.db1.delete_doc(doc1)
@@ -1127,8 +1022,6 @@ class CouchDatabaseSyncTests(
self.db1, doc_id, deleted_rev, None, False)
self.assertGetDocIncludeDeleted(
self.db2, doc_id, deleted_rev, None, False)
- self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
- self.assertEqual([], self.db2.get_from_index('test-idx', 'value'))
self.sync(self.db2, self.db3)
self.assertLastExchangeLog(
self.db3,
@@ -1319,7 +1212,7 @@ class CouchDatabaseSyncTests(
self.assertEqual(cont2, self.db1.get_doc("2").get_json())
-class CouchDatabaseExceptionsTests(CouchDBTestCase):
+class SoledadBackendExceptionsTests(CouchDBTestCase):
def setUp(self):
CouchDBTestCase.setUp(self)
@@ -1327,9 +1220,11 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
def create_db(self, ensure=True, dbname=None):
if not dbname:
dbname = ('test-%s' % uuid4().hex)
- self.db = couch.CouchDatabase.open_database(
- urljoin('http://127.0.0.1:%d' % self.couch_port, dbname),
- create=True,
+ if dbname not in self.couch_server:
+ self.couch_server.create(dbname)
+ self.db = couch.CouchDatabase(
+ ('http://127.0.0.1:%d' % self.couch_port),
+ dbname,
ensure_ddocs=ensure)
def tearDown(self):
@@ -1343,22 +1238,18 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
design docs are not present.
"""
self.create_db(ensure=False)
- # _get_generation()
+ # get_generation_info()
self.assertRaises(
errors.MissingDesignDocError,
- self.db._get_generation)
- # _get_generation_info()
+ self.db.get_generation_info)
+ # get_trans_id_for_gen()
self.assertRaises(
errors.MissingDesignDocError,
- self.db._get_generation_info)
- # _get_trans_id_for_gen()
- self.assertRaises(
- errors.MissingDesignDocError,
- self.db._get_trans_id_for_gen, 1)
- # _get_transaction_log()
+ self.db.get_trans_id_for_gen, 1)
+ # get_transaction_log()
self.assertRaises(
errors.MissingDesignDocError,
- self.db._get_transaction_log)
+ self.db.get_transaction_log)
# whats_changed()
self.assertRaises(
errors.MissingDesignDocError,
@@ -1374,18 +1265,14 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
transactions = self.db._database['_design/transactions']
transactions['lists'] = {}
self.db._database.save(transactions)
- # _get_generation()
+ # get_generation_info()
self.assertRaises(
errors.MissingDesignDocListFunctionError,
- self.db._get_generation)
- # _get_generation_info()
+ self.db.get_generation_info)
+ # get_trans_id_for_gen()
self.assertRaises(
errors.MissingDesignDocListFunctionError,
- self.db._get_generation_info)
- # _get_trans_id_for_gen()
- self.assertRaises(
- errors.MissingDesignDocListFunctionError,
- self.db._get_trans_id_for_gen, 1)
+ self.db.get_trans_id_for_gen, 1)
# whats_changed()
self.assertRaises(
errors.MissingDesignDocListFunctionError,
@@ -1401,18 +1288,14 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
transactions = self.db._database['_design/transactions']
del transactions['lists']
self.db._database.save(transactions)
- # _get_generation()
+ # get_generation_info()
self.assertRaises(
errors.MissingDesignDocListFunctionError,
- self.db._get_generation)
- # _get_generation_info()
- self.assertRaises(
- errors.MissingDesignDocListFunctionError,
- self.db._get_generation_info)
+ self.db.get_generation_info)
# _get_trans_id_for_gen()
self.assertRaises(
errors.MissingDesignDocListFunctionError,
- self.db._get_trans_id_for_gen, 1)
+ self.db.get_trans_id_for_gen, 1)
# whats_changed()
self.assertRaises(
errors.MissingDesignDocListFunctionError,
@@ -1436,22 +1319,18 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
transactions = self.db._database['_design/transactions']
del transactions['views']
self.db._database.save(transactions)
- # _get_generation()
- self.assertRaises(
- errors.MissingDesignDocNamedViewError,
- self.db._get_generation)
- # _get_generation_info()
+ # get_generation_info()
self.assertRaises(
errors.MissingDesignDocNamedViewError,
- self.db._get_generation_info)
+ self.db.get_generation_info)
# _get_trans_id_for_gen()
self.assertRaises(
errors.MissingDesignDocNamedViewError,
- self.db._get_trans_id_for_gen, 1)
+ self.db.get_trans_id_for_gen, 1)
# _get_transaction_log()
self.assertRaises(
errors.MissingDesignDocNamedViewError,
- self.db._get_transaction_log)
+ self.db.get_transaction_log)
# whats_changed()
self.assertRaises(
errors.MissingDesignDocNamedViewError,
@@ -1469,22 +1348,18 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
del self.db._database['_design/syncs']
# delete _design/transactions
del self.db._database['_design/transactions']
- # _get_generation()
+ # get_generation_info()
self.assertRaises(
errors.MissingDesignDocDeletedError,
- self.db._get_generation)
- # _get_generation_info()
+ self.db.get_generation_info)
+ # get_trans_id_for_gen()
self.assertRaises(
errors.MissingDesignDocDeletedError,
- self.db._get_generation_info)
- # _get_trans_id_for_gen()
- self.assertRaises(
- errors.MissingDesignDocDeletedError,
- self.db._get_trans_id_for_gen, 1)
- # _get_transaction_log()
+ self.db.get_trans_id_for_gen, 1)
+ # get_transaction_log()
self.assertRaises(
errors.MissingDesignDocDeletedError,
- self.db._get_transaction_log)
+ self.db.get_transaction_log)
# whats_changed()
self.assertRaises(
errors.MissingDesignDocDeletedError,
@@ -1499,9 +1374,9 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
del self.db._database['_design/transactions']
self.assertRaises(
errors.MissingDesignDocDeletedError,
- self.db._get_transaction_log)
+ self.db.get_transaction_log)
self.create_db(ensure=True, dbname=self.db._dbname)
- self.db._get_transaction_log()
+ self.db.get_transaction_log()
def test_ensure_security_doc(self):
"""
@@ -1543,14 +1418,15 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
class DatabaseNameValidationTest(unittest.TestCase):
def test_database_name_validation(self):
- self.assertFalse(couch.is_db_name_valid("user-deadbeef | cat /secret"))
- self.assertTrue(couch.is_db_name_valid("user-cafe1337"))
+ inject = couch.state.is_db_name_valid("user-deadbeef | cat /secret")
+ self.assertFalse(inject)
+ self.assertTrue(couch.state.is_db_name_valid("user-cafe1337"))
class CommandBasedDBCreationTest(unittest.TestCase):
def test_ensure_db_using_custom_command(self):
- state = couch.CouchServerState("url", create_cmd="echo")
+ state = couch.state.CouchServerState("url", create_cmd="echo")
mock_db = Mock()
mock_db.replica_uid = 'replica_uid'
state.open_database = Mock(return_value=mock_db)
@@ -1559,11 +1435,11 @@ class CommandBasedDBCreationTest(unittest.TestCase):
self.assertEquals(mock_db.replica_uid, replica_uid)
def test_raises_unauthorized_on_failure(self):
- state = couch.CouchServerState("url", create_cmd="inexistent")
+ state = couch.state.CouchServerState("url", create_cmd="inexistent")
self.assertRaises(u1db_errors.Unauthorized,
state.ensure_database, "user-1337")
def test_raises_unauthorized_by_default(self):
- state = couch.CouchServerState("url")
+ state = couch.state.CouchServerState("url")
self.assertRaises(u1db_errors.Unauthorized,
state.ensure_database, "user-1337")
diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
index 507f2984..8cd3ae08 100644
--- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
+++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
@@ -26,7 +26,8 @@ from twisted.internet import defer
from uuid import uuid4
from leap.soledad.client import Soledad
-from leap.soledad.common.couch import CouchDatabase, CouchServerState
+from leap.soledad.common.couch.state import CouchServerState
+from leap.soledad.common.couch import CouchDatabase
from leap.soledad.common.tests.util import (
make_token_soledad_app,
diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py
index 7c006121..e1129a9f 100644
--- a/common/src/leap/soledad/common/tests/test_server.py
+++ b/common/src/leap/soledad/common/tests/test_server.py
@@ -24,15 +24,14 @@ import time
import binascii
from pkg_resources import resource_filename
from uuid import uuid4
+from hashlib import sha512
from urlparse import urljoin
from twisted.internet import defer
from twisted.trial import unittest
-from leap.soledad.common.couch import (
- CouchServerState,
- CouchDatabase,
-)
+from leap.soledad.common.couch.state import CouchServerState
+from leap.soledad.common.couch import CouchDatabase
from leap.soledad.common.tests.u1db_tests import TestCaseWithServer
from leap.soledad.common.tests.test_couch import CouchDBTestCase
from leap.soledad.common.tests.util import (
@@ -48,6 +47,36 @@ from leap.soledad.server import LockResource
from leap.soledad.server import load_configuration
from leap.soledad.server import CONFIG_DEFAULTS
from leap.soledad.server.auth import URLToAuthorization
+from leap.soledad.server.auth import SoledadTokenAuthMiddleware
+
+
+class ServerAuthenticationMiddlewareTestCase(CouchDBTestCase):
+
+ def setUp(self):
+ super(ServerAuthenticationMiddlewareTestCase, self).setUp()
+ app = mock.Mock()
+ self._state = CouchServerState(self.couch_url)
+ app.state = self._state
+ self.auth_middleware = SoledadTokenAuthMiddleware(app)
+ self._authorize('valid-uuid', 'valid-token')
+
+ def _authorize(self, uuid, token):
+ token_doc = {}
+ token_doc['_id'] = sha512(token).hexdigest()
+ token_doc[self._state.TOKENS_USER_ID_KEY] = uuid
+ token_doc[self._state.TOKENS_TYPE_KEY] = \
+ self._state.TOKENS_TYPE_DEF
+ dbname = self._state._tokens_dbname()
+ db = self.couch_server.create(dbname)
+ db.save(token_doc)
+ self.addCleanup(self.delete_db, db.name)
+
+ def test_authorized_user(self):
+ is_authorized = self.auth_middleware._verify_authentication_data
+ self.assertTrue(is_authorized('valid-uuid', 'valid-token'))
+ self.assertFalse(is_authorized('valid-uuid', 'invalid-token'))
+ self.assertFalse(is_authorized('invalid-uuid', 'valid-token'))
+ self.assertFalse(is_authorized('eve', 'invalid-token'))
class ServerAuthorizationTestCase(BaseSoledadTest):
diff --git a/common/src/leap/soledad/common/tests/test_sync_mutex.py b/common/src/leap/soledad/common/tests/test_sync_mutex.py
index 2e2123a7..973a8587 100644
--- a/common/src/leap/soledad/common/tests/test_sync_mutex.py
+++ b/common/src/leap/soledad/common/tests/test_sync_mutex.py
@@ -33,7 +33,8 @@ from twisted.internet import defer
from leap.soledad.client.sync import SoledadSynchronizer
-from leap.soledad.common import couch
+from leap.soledad.common.couch.state import CouchServerState
+from leap.soledad.common.couch import CouchDatabase
from leap.soledad.common.tests.u1db_tests import TestCaseWithServer
from leap.soledad.common.tests.test_couch import CouchDBTestCase
@@ -84,7 +85,7 @@ class TestSyncMutex(
sync_target = soledad_sync_target
def make_app(self):
- self.request_state = couch.CouchServerState(self.couch_url)
+ self.request_state = CouchServerState(self.couch_url)
return self.make_app_with_state(self.request_state)
def setUp(self):
@@ -102,7 +103,7 @@ class TestSyncMutex(
self.startServer()
# ensure remote db exists before syncing
- db = couch.CouchDatabase.open_database(
+ db = CouchDatabase.open_database(
urljoin(self.couch_url, 'user-' + self.user),
create=True,
ensure_ddocs=True)
diff --git a/common/src/leap/soledad/common/tests/test_sync_target.py b/common/src/leap/soledad/common/tests/test_sync_target.py
index c0987e90..f25e84dd 100644
--- a/common/src/leap/soledad/common/tests/test_sync_target.py
+++ b/common/src/leap/soledad/common/tests/test_sync_target.py
@@ -29,7 +29,6 @@ from uuid import uuid4
from testscenarios import TestWithScenarios
from twisted.internet import defer
-from urlparse import urljoin
from leap.soledad.client import http_target as target
from leap.soledad.client import crypto
@@ -37,7 +36,6 @@ from leap.soledad.client.sqlcipher import SQLCipherU1DBSync
from leap.soledad.client.sqlcipher import SQLCipherOptions
from leap.soledad.client.sqlcipher import SQLCipherDatabase
-from leap.soledad.common import couch
from leap.soledad.common.document import SoledadDocument
from leap.soledad.common.tests import u1db_tests as tests
@@ -265,9 +263,9 @@ class TestSoledadSyncTarget(
replica_trans_id=replica_trans_id,
number_of_docs=number_of_docs,
doc_idx=doc_idx, sync_id=sync_id)
- from leap.soledad.common.tests.test_couch import IndexedCouchDatabase
+ from leap.soledad.common.backend import SoledadBackend
self.patch(
- IndexedCouchDatabase, '_put_doc_if_newer', bomb_put_doc_if_newer)
+ SoledadBackend, '_put_doc_if_newer', bomb_put_doc_if_newer)
remote_target = self.getSyncTarget(
source_replica_uid='replica')
other_changes = []
diff --git a/common/src/leap/soledad/common/tests/util.py b/common/src/leap/soledad/common/tests/util.py
index 1c7adb91..f7f9ebd0 100644
--- a/common/src/leap/soledad/common/tests/util.py
+++ b/common/src/leap/soledad/common/tests/util.py
@@ -27,7 +27,6 @@ import shutil
import random
import string
import u1db
-import traceback
import couchdb
from uuid import uuid4
@@ -37,17 +36,17 @@ from StringIO import StringIO
from pysqlcipher import dbapi2
from u1db import sync
-from u1db.errors import DatabaseDoesNotExist
from u1db.remote import http_database
from twisted.trial import unittest
-from leap.common.files import mkdir_p
from leap.common.testing.basetest import BaseLeapTest
from leap.soledad.common import soledad_assert
from leap.soledad.common.document import SoledadDocument
-from leap.soledad.common.couch import CouchDatabase, CouchServerState
+from leap.soledad.common.couch import CouchDatabase
+from leap.soledad.common.couch.state import CouchServerState
+
from leap.soledad.common.crypto import ENC_SCHEME_KEY
from leap.soledad.client import Soledad
diff --git a/scripts/profiling/mail/mx.py b/scripts/profiling/mail/mx.py
index b6a1e5cf..55084a40 100644
--- a/scripts/profiling/mail/mx.py
+++ b/scripts/profiling/mail/mx.py
@@ -5,7 +5,7 @@ import timeit
from leap.keymanager import openpgp
-from leap.soledad.common.couch import CouchDocument
+from leap.soledad.common.document import ServerDocument
from leap.soledad.common.crypto import (
EncryptionSchemes,
ENC_JSON_KEY,
@@ -47,7 +47,7 @@ def get_enc_json(pubkey, message):
def get_new_doc(enc_json):
- doc = CouchDocument(doc_id=str(uuid.uuid4()))
+ doc = ServerDocument(doc_id=str(uuid.uuid4()))
doc.content = {
'incoming': True,
ENC_SCHEME_KEY: EncryptionSchemes.PUBKEY,
@@ -71,6 +71,7 @@ def put_lots_of_messages(db, number):
log("Populating database with %d encrypted messages... "
% number, line_break=False)
pubkey = get_pubkey()
+
def _put_one_message():
put_one_message(pubkey, db)
time = timeit.timeit(_put_one_message, number=number)
diff --git a/server/pkg/create-user-db b/server/pkg/create-user-db
index ae5d15dc..54856643 100755
--- a/server/pkg/create-user-db
+++ b/server/pkg/create-user-db
@@ -20,7 +20,7 @@ import sys
import netrc
import argparse
from leap.soledad.common.couch import CouchDatabase
-from leap.soledad.common.couch import is_db_name_valid
+from leap.soledad.common.couch.state import is_db_name_valid
from leap.soledad.common.couch import list_users_dbs
from leap.soledad.server import load_configuration
diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py
index 618ccb2b..00e1e9fb 100644
--- a/server/src/leap/soledad/server/__init__.py
+++ b/server/src/leap/soledad/server/__init__.py
@@ -104,7 +104,7 @@ from leap.soledad.server.sync import (
)
from leap.soledad.common import SHARED_DB_NAME
-from leap.soledad.common.couch import CouchServerState
+from leap.soledad.common.couch.state import CouchServerState
# ----------------------------------------------------------------------------
# Soledad WSGI application
diff --git a/server/src/leap/soledad/server/auth.py b/server/src/leap/soledad/server/auth.py
index 02b54cca..ccbd6fbd 100644
--- a/server/src/leap/soledad/server/auth.py
+++ b/server/src/leap/soledad/server/auth.py
@@ -21,20 +21,16 @@ Authentication facilities for Soledad Server.
"""
-import time
import httplib
import json
from u1db import DBNAME_CONSTRAINTS, errors as u1db_errors
from abc import ABCMeta, abstractmethod
from routes.mapper import Mapper
-from couchdb.client import Server
from twisted.python import log
-from hashlib import sha512
from leap.soledad.common import SHARED_DB_NAME
from leap.soledad.common import USER_DB_PREFIX
-from leap.soledad.common.errors import InvalidAuthTokenError
class URLToAuthorization(object):
@@ -351,14 +347,12 @@ class SoledadTokenAuthMiddleware(SoledadAuthMiddleware):
Token based authentication.
"""
- TOKENS_DB_PREFIX = "tokens_"
- TOKENS_DB_EXPIRE = 30 * 24 * 3600 # 30 days in seconds
- TOKENS_TYPE_KEY = "type"
- TOKENS_TYPE_DEF = "Token"
- TOKENS_USER_ID_KEY = "user_id"
-
TOKEN_AUTH_ERROR_STRING = "Incorrect address or token."
+ def __init__(self, app):
+ self._state = app.state
+ super(SoledadTokenAuthMiddleware, self).__init__(app)
+
def _verify_authentication_scheme(self, scheme):
"""
Verify if authentication scheme is valid.
@@ -391,50 +385,11 @@ class SoledadTokenAuthMiddleware(SoledadAuthMiddleware):
"""
token = auth_data # we expect a cleartext token at this point
try:
- return self._verify_token_in_couch(uuid, token)
- except InvalidAuthTokenError:
- raise
+ return self._state.verify_token(uuid, token)
except Exception as e:
log.err(e)
return False
- def _verify_token_in_couch(self, uuid, token):
- """
- Query couchdb to decide if C{token} is valid for C{uuid}.
-
- @param uuid: The user uuid.
- @type uuid: str
- @param token: The token.
- @type token: str
-
- @raise InvalidAuthTokenError: Raised when token received from user is
- either missing in the tokens db or is
- invalid.
- """
- server = Server(url=self._app.state.couch_url)
- # the tokens db rotates every 30 days, and the current db name is
- # "tokens_NNN", where NNN is the number of seconds since epoch divided
- # by the rotate period in seconds. When rotating, old and new tokens
- # db coexist during a certain window of time and valid tokens are
- # replicated from the old db to the new one. See:
- # https://leap.se/code/issues/6785
- dbname = self.TOKENS_DB_PREFIX + \
- str(int(time.time() / self.TOKENS_DB_EXPIRE))
- db = server[dbname]
- # lookup key is a hash of the token to prevent timing attacks.
- token = db.get(sha512(token).hexdigest())
- if token is None:
- raise InvalidAuthTokenError()
- # we compare uuid hashes to avoid possible timing attacks that
- # might exploit python's builtin comparison operator behaviour,
- # which fails immediatelly when non-matching bytes are found.
- couch_uuid_hash = sha512(token[self.TOKENS_USER_ID_KEY]).digest()
- req_uuid_hash = sha512(uuid).digest()
- if token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF \
- or couch_uuid_hash != req_uuid_hash:
- raise InvalidAuthTokenError()
- return True
-
def _get_auth_error_string(self):
"""
Get the error string for token auth.
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index 92b29102..db25c406 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -32,7 +32,7 @@ class SyncExchange(sync.SyncExchange):
def __init__(self, db, source_replica_uid, last_known_generation, sync_id):
"""
:param db: The target syncing database.
- :type db: CouchDatabase
+ :type db: SoledadBackend
:param source_replica_uid: The uid of the source syncing replica.
:type source_replica_uid: str
:param last_known_generation: The last target replica generation the