"""A U1DB backend that uses CouchDB as its persistence layer.""" # general imports import uuid from base64 import b64encode, b64decode import re # u1db from u1db import errors from u1db.sync import LocalSyncTarget from u1db.backends.inmemory import InMemoryIndex from u1db.remote.server_state import ServerState from u1db.errors import DatabaseDoesNotExist # couchdb from couchdb.client import Server, Document as CouchDocument from couchdb.http import ResourceNotFound # leap from leap.soledad.backends.objectstore import ( ObjectStoreDatabase, ObjectStoreSyncTarget, ) from leap.soledad.backends.leap_backend import LeapDocument try: import simplejson as json except ImportError: import json # noqa class InvalidURLError(Exception): """Exception raised when Soledad encounters a malformed URL.""" pass class CouchDatabase(ObjectStoreDatabase): """A U1DB backend that uses Couch as its persistence layer.""" @classmethod def open_database(cls, url, create): """Open a U1DB database using CouchDB as backend.""" # get database from url m = re.match('(^https?://[^/]+)/(.+)$', url) if not m: raise InvalidURLError url = m.group(1) dbname = m.group(2) server = Server(url=url) try: server[dbname] except ResourceNotFound: if not create: raise DatabaseDoesNotExist() return cls(url, dbname) def __init__(self, url, database, replica_uid=None, full_commit=True, session=None): """Create a new Couch data container.""" self._url = url self._full_commit = full_commit self._session = session self._server = Server(url=self._url, full_commit=self._full_commit, session=self._session) self._dbname = database # this will ensure that transaction and sync logs exist and are # up-to-date. try: self._database = self._server[database] except ResourceNotFound: self._server.create(database) self._database = self._server[database] super(CouchDatabase, self).__init__(replica_uid=replica_uid, document_factory=LeapDocument) #------------------------------------------------------------------------- # methods from Database #------------------------------------------------------------------------- def _get_doc(self, doc_id, check_for_conflicts=False): """Get just the document content, without fancy handling.""" cdoc = self._database.get(doc_id) if cdoc is None: return None has_conflicts = False if check_for_conflicts: has_conflicts = self._has_conflicts(doc_id) doc = self._factory( doc_id=doc_id, rev=cdoc['u1db_rev'], has_conflicts=has_conflicts) contents = self._database.get_attachment(cdoc, 'u1db_json') if contents: doc.content = json.loads(contents.getvalue()) else: doc.make_tombstone() return doc def get_all_docs(self, include_deleted=False): """Get the JSON content for all documents in the database.""" generation = self._get_generation() results = [] for doc_id in self._database: if doc_id == self.U1DB_DATA_DOC_ID: continue doc = self._get_doc(doc_id, check_for_conflicts=True) if doc.content is None and not include_deleted: continue results.append(doc) return (generation, results) def _put_doc(self, doc): """Store document in database.""" # prepare couch's Document cdoc = CouchDocument() cdoc['_id'] = doc.doc_id # we have to guarantee that couch's _rev is cosistent old_cdoc = self._database.get(doc.doc_id) if old_cdoc is not None: cdoc['_rev'] = old_cdoc['_rev'] # store u1db's rev cdoc['u1db_rev'] = doc.rev # save doc in db self._database.save(cdoc) # store u1db's content as json string if not doc.is_tombstone(): self._database.put_attachment(cdoc, doc.get_json(), filename='u1db_json') else: self._database.delete_attachment(cdoc, 'u1db_json') def get_sync_target(self): """ Return a SyncTarget object, for another u1db to synchronize with. """ return CouchSyncTarget(self) def create_index(self, index_name, *index_expressions): """ Create a named index, which can then be queried for future lookups. """ if index_name in self._indexes: if self._indexes[index_name]._definition == list( index_expressions): return raise errors.IndexNameTakenError index = InMemoryIndex(index_name, list(index_expressions)) for doc_id in self._database: if doc_id == self.U1DB_DATA_DOC_ID: continue doc = self._get_doc(doc_id) if doc.content is not None: index.add_json(doc_id, doc.get_json()) self._indexes[index_name] = index # save data in object store self._store_u1db_data() def close(self): """Release any resources associated with this database.""" # TODO: fix this method so the connection is properly closed and # test_close (+tearDown, which deletes the db) works without problems. self._url = None self._full_commit = None self._session = None #self._server = None self._database = None return True def sync(self, url, creds=None, autocreate=True): """Synchronize documents with remote replica exposed at url.""" from u1db.sync import Synchronizer return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync( autocreate=autocreate) #------------------------------------------------------------------------- # methods from ObjectStoreDatabase #------------------------------------------------------------------------- def _init_u1db_data(self): if self._replica_uid is None: self._replica_uid = uuid.uuid4().hex doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) doc.content = {'transaction_log': [], 'conflicts': b64encode(json.dumps({})), 'other_generations': {}, 'indexes': b64encode(json.dumps({})), 'replica_uid': self._replica_uid} self._put_doc(doc) def _fetch_u1db_data(self): # retrieve u1db data from couch db cdoc = self._database.get(self.U1DB_DATA_DOC_ID) jsonstr = self._database.get_attachment(cdoc, 'u1db_json').getvalue() content = json.loads(jsonstr) # set u1db database info #self._sync_log = content['sync_log'] self._transaction_log = content['transaction_log'] self._conflicts = json.loads(b64decode(content['conflicts'])) self._other_generations = content['other_generations'] self._indexes = self._load_indexes_from_json( b64decode(content['indexes'])) self._replica_uid = content['replica_uid'] # save couch _rev self._couch_rev = cdoc['_rev'] def _store_u1db_data(self): doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) doc.content = { 'transaction_log': self._transaction_log, # Here, the b64 encode ensures that document content # does not cause strange behaviour in couchdb because # of encoding. 'conflicts': b64encode(json.dumps(self._conflicts)), 'other_generations': self._other_generations, 'indexes': b64encode(self._dump_indexes_as_json()), 'replica_uid': self._replica_uid, '_rev': self._couch_rev} self._put_doc(doc) #------------------------------------------------------------------------- # Couch specific methods #------------------------------------------------------------------------- def delete_database(self): """Delete a U1DB CouchDB database.""" del(self._server[self._dbname]) def _dump_indexes_as_json(self): indexes = {} for name, idx in self._indexes.iteritems(): indexes[name] = {} for attr in ['name', 'definition', 'values']: indexes[name][attr] = getattr(idx, '_' + attr) return json.dumps(indexes) def _load_indexes_from_json(self, indexes): dict = {} for name, idx_dict in json.loads(indexes).iteritems(): idx = InMemoryIndex(name, idx_dict['definition']) idx._values = idx_dict['values'] dict[name] = idx return dict class CouchSyncTarget(ObjectStoreSyncTarget): pass class CouchServerState(ServerState): """Inteface of the WSGI server with the CouchDB backend.""" def __init__(self, couch_url): self.couch_url = couch_url def open_database(self, dbname): """Open a database at the given location.""" # TODO: open couch from leap.soledad.backends.couch import CouchDatabase return CouchDatabase.open_database(self.couch_url + '/' + dbname, create=False) def ensure_database(self, dbname): """Ensure database at the given location.""" from leap.soledad.backends.couch import CouchDatabase db = CouchDatabase.open_database(self.couch_url + '/' + dbname, create=True) return db, db._replica_uid def delete_database(self, dbname): """Delete database at the given location.""" from leap.soledad.backends.couch import CouchDatabase CouchDatabase.delete_database(self.couch_url + '/' + dbname)