summaryrefslogtreecommitdiff
path: root/src/leap/soledad/backends/couch.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad/backends/couch.py')
-rw-r--r--src/leap/soledad/backends/couch.py270
1 files changed, 270 insertions, 0 deletions
diff --git a/src/leap/soledad/backends/couch.py b/src/leap/soledad/backends/couch.py
new file mode 100644
index 00000000..b7a77054
--- /dev/null
+++ b/src/leap/soledad/backends/couch.py
@@ -0,0 +1,270 @@
+"""A U1DB backend that uses CouchDB as its persistence layer."""
+
+# general imports
+import uuid
+from base64 import b64encode, b64decode
+import re
+# u1db
+from u1db import errors
+from u1db.sync import LocalSyncTarget
+from u1db.backends.inmemory import InMemoryIndex
+from u1db.remote.server_state import ServerState
+from u1db.errors import DatabaseDoesNotExist
+# couchdb
+from couchdb.client import Server, Document as CouchDocument
+from couchdb.http import ResourceNotFound
+# leap
+from leap.soledad.backends.objectstore import (
+ ObjectStoreDatabase,
+ ObjectStoreSyncTarget,
+)
+from leap.soledad.backends.leap_backend import LeapDocument
+
+try:
+ import simplejson as json
+except ImportError:
+ import json # noqa
+
+
+class InvalidURLError(Exception):
+ """Exception raised when Soledad encounters a malformed URL."""
+ pass
+
+
+class CouchDatabase(ObjectStoreDatabase):
+ """A U1DB backend that uses Couch as its persistence layer."""
+
+ @classmethod
+ def open_database(cls, url, create):
+ """Open a U1DB database using CouchDB as backend."""
+ # get database from url
+ m = re.match('(^https?://[^/]+)/(.+)$', url)
+ if not m:
+ raise InvalidURLError
+ url = m.group(1)
+ dbname = m.group(2)
+ server = Server(url=url)
+ try:
+ server[dbname]
+ except ResourceNotFound:
+ if not create:
+ raise DatabaseDoesNotExist()
+ return cls(url, dbname)
+
+ def __init__(self, url, database, replica_uid=None, full_commit=True,
+ session=None):
+ """Create a new Couch data container."""
+ self._url = url
+ self._full_commit = full_commit
+ self._session = session
+ self._server = Server(url=self._url,
+ full_commit=self._full_commit,
+ session=self._session)
+ self._dbname = database
+ # this will ensure that transaction and sync logs exist and are
+ # up-to-date.
+ try:
+ self._database = self._server[database]
+ except ResourceNotFound:
+ self._server.create(database)
+ self._database = self._server[database]
+ super(CouchDatabase, self).__init__(replica_uid=replica_uid,
+ document_factory=LeapDocument)
+
+ #-------------------------------------------------------------------------
+ # methods from Database
+ #-------------------------------------------------------------------------
+
+ def _get_doc(self, doc_id, check_for_conflicts=False):
+ """Get just the document content, without fancy handling."""
+ cdoc = self._database.get(doc_id)
+ if cdoc is None:
+ return None
+ has_conflicts = False
+ if check_for_conflicts:
+ has_conflicts = self._has_conflicts(doc_id)
+ doc = self._factory(
+ doc_id=doc_id,
+ rev=cdoc['u1db_rev'],
+ has_conflicts=has_conflicts)
+ contents = self._database.get_attachment(cdoc, 'u1db_json')
+ if contents:
+ doc.content = json.loads(contents.getvalue())
+ else:
+ doc.make_tombstone()
+ return doc
+
+ def get_all_docs(self, include_deleted=False):
+ """Get the JSON content for all documents in the database."""
+ generation = self._get_generation()
+ results = []
+ for doc_id in self._database:
+ if doc_id == self.U1DB_DATA_DOC_ID:
+ continue
+ doc = self._get_doc(doc_id, check_for_conflicts=True)
+ if doc.content is None and not include_deleted:
+ continue
+ results.append(doc)
+ return (generation, results)
+
+ def _put_doc(self, doc):
+ """Store document in database."""
+ # prepare couch's Document
+ cdoc = CouchDocument()
+ cdoc['_id'] = doc.doc_id
+ # we have to guarantee that couch's _rev is cosistent
+ old_cdoc = self._database.get(doc.doc_id)
+ if old_cdoc is not None:
+ cdoc['_rev'] = old_cdoc['_rev']
+ # store u1db's rev
+ cdoc['u1db_rev'] = doc.rev
+ # save doc in db
+ self._database.save(cdoc)
+ # store u1db's content as json string
+ if not doc.is_tombstone():
+ self._database.put_attachment(cdoc, doc.get_json(),
+ filename='u1db_json')
+ else:
+ self._database.delete_attachment(cdoc, 'u1db_json')
+
+ def get_sync_target(self):
+ """
+ Return a SyncTarget object, for another u1db to synchronize with.
+ """
+ return CouchSyncTarget(self)
+
+ def create_index(self, index_name, *index_expressions):
+ """
+ Create a named index, which can then be queried for future lookups.
+ """
+ if index_name in self._indexes:
+ if self._indexes[index_name]._definition == list(
+ index_expressions):
+ return
+ raise errors.IndexNameTakenError
+ index = InMemoryIndex(index_name, list(index_expressions))
+ for doc_id in self._database:
+ if doc_id == self.U1DB_DATA_DOC_ID:
+ continue
+ doc = self._get_doc(doc_id)
+ if doc.content is not None:
+ index.add_json(doc_id, doc.get_json())
+ self._indexes[index_name] = index
+ # save data in object store
+ self._store_u1db_data()
+
+ def close(self):
+ """Release any resources associated with this database."""
+ # TODO: fix this method so the connection is properly closed and
+ # test_close (+tearDown, which deletes the db) works without problems.
+ self._url = None
+ self._full_commit = None
+ self._session = None
+ #self._server = None
+ self._database = None
+ return True
+
+ def sync(self, url, creds=None, autocreate=True):
+ """Synchronize documents with remote replica exposed at url."""
+ from u1db.sync import Synchronizer
+ return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync(
+ autocreate=autocreate)
+
+ #-------------------------------------------------------------------------
+ # methods from ObjectStoreDatabase
+ #-------------------------------------------------------------------------
+
+ def _init_u1db_data(self):
+ if self._replica_uid is None:
+ self._replica_uid = uuid.uuid4().hex
+ doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID)
+ doc.content = {'transaction_log': [],
+ 'conflicts': b64encode(json.dumps({})),
+ 'other_generations': {},
+ 'indexes': b64encode(json.dumps({})),
+ 'replica_uid': self._replica_uid}
+ self._put_doc(doc)
+
+ def _fetch_u1db_data(self):
+ # retrieve u1db data from couch db
+ cdoc = self._database.get(self.U1DB_DATA_DOC_ID)
+ jsonstr = self._database.get_attachment(cdoc, 'u1db_json').getvalue()
+ content = json.loads(jsonstr)
+ # set u1db database info
+ #self._sync_log = content['sync_log']
+ self._transaction_log = content['transaction_log']
+ self._conflicts = json.loads(b64decode(content['conflicts']))
+ self._other_generations = content['other_generations']
+ self._indexes = self._load_indexes_from_json(
+ b64decode(content['indexes']))
+ self._replica_uid = content['replica_uid']
+ # save couch _rev
+ self._couch_rev = cdoc['_rev']
+
+ def _store_u1db_data(self):
+ doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID)
+ doc.content = {
+ 'transaction_log': self._transaction_log,
+ # Here, the b64 encode ensures that document content
+ # does not cause strange behaviour in couchdb because
+ # of encoding.
+ 'conflicts': b64encode(json.dumps(self._conflicts)),
+ 'other_generations': self._other_generations,
+ 'indexes': b64encode(self._dump_indexes_as_json()),
+ 'replica_uid': self._replica_uid,
+ '_rev': self._couch_rev}
+ self._put_doc(doc)
+
+ #-------------------------------------------------------------------------
+ # Couch specific methods
+ #-------------------------------------------------------------------------
+
+ def delete_database(self):
+ """Delete a U1DB CouchDB database."""
+ del(self._server[self._dbname])
+
+ def _dump_indexes_as_json(self):
+ indexes = {}
+ for name, idx in self._indexes.iteritems():
+ indexes[name] = {}
+ for attr in ['name', 'definition', 'values']:
+ indexes[name][attr] = getattr(idx, '_' + attr)
+ return json.dumps(indexes)
+
+ def _load_indexes_from_json(self, indexes):
+ dict = {}
+ for name, idx_dict in json.loads(indexes).iteritems():
+ idx = InMemoryIndex(name, idx_dict['definition'])
+ idx._values = idx_dict['values']
+ dict[name] = idx
+ return dict
+
+
+class CouchSyncTarget(ObjectStoreSyncTarget):
+ pass
+
+
+class CouchServerState(ServerState):
+ """Inteface of the WSGI server with the CouchDB backend."""
+
+ def __init__(self, couch_url):
+ self.couch_url = couch_url
+
+ def open_database(self, dbname):
+ """Open a database at the given location."""
+ # TODO: open couch
+ from leap.soledad.backends.couch import CouchDatabase
+ return CouchDatabase.open_database(self.couch_url + '/' + dbname,
+ create=False)
+
+ def ensure_database(self, dbname):
+ """Ensure database at the given location."""
+ from leap.soledad.backends.couch import CouchDatabase
+ db = CouchDatabase.open_database(self.couch_url + '/' + dbname,
+ create=True)
+ return db, db._replica_uid
+
+ def delete_database(self, dbname):
+ """Delete database at the given location."""
+ from leap.soledad.backends.couch import CouchDatabase
+ CouchDatabase.delete_database(self.couch_url + '/' + dbname)