diff options
| author | drebs <drebs@joselito.semvergonha.org> | 2013-03-07 16:34:22 -0300 | 
|---|---|---|
| committer | drebs <drebs@joselito.semvergonha.org> | 2013-03-07 16:34:22 -0300 | 
| commit | 1b1def113e6ed9b8af6897e16f0d9b4c96bbfa6b (patch) | |
| tree | c34b751dde12bc92628165e8af902da54e39e898 /soledad/backends/couch.py | |
| parent | 4672ec4f25daa6466b2850bea416eaf77fa90d9d (diff) | |
Move source files to subfolder.
Diffstat (limited to 'soledad/backends/couch.py')
| -rw-r--r-- | soledad/backends/couch.py | 270 | 
1 files changed, 270 insertions, 0 deletions
| diff --git a/soledad/backends/couch.py b/soledad/backends/couch.py new file mode 100644 index 00000000..b7a77054 --- /dev/null +++ b/soledad/backends/couch.py @@ -0,0 +1,270 @@ +"""A U1DB backend that uses CouchDB as its persistence layer.""" + +# general imports +import uuid +from base64 import b64encode, b64decode +import re +# u1db +from u1db import errors +from u1db.sync import LocalSyncTarget +from u1db.backends.inmemory import InMemoryIndex +from u1db.remote.server_state import ServerState +from u1db.errors import DatabaseDoesNotExist +# couchdb +from couchdb.client import Server, Document as CouchDocument +from couchdb.http import ResourceNotFound +# leap +from leap.soledad.backends.objectstore import ( +    ObjectStoreDatabase, +    ObjectStoreSyncTarget, +) +from leap.soledad.backends.leap_backend import LeapDocument + +try: +    import simplejson as json +except ImportError: +    import json  # noqa + + +class InvalidURLError(Exception): +    """Exception raised when Soledad encounters a malformed URL.""" +    pass + + +class CouchDatabase(ObjectStoreDatabase): +    """A U1DB backend that uses Couch as its persistence layer.""" + +    @classmethod +    def open_database(cls, url, create): +        """Open a U1DB database using CouchDB as backend.""" +        # get database from url +        m = re.match('(^https?://[^/]+)/(.+)$', url) +        if not m: +            raise InvalidURLError +        url = m.group(1) +        dbname = m.group(2) +        server = Server(url=url) +        try: +            server[dbname] +        except ResourceNotFound: +            if not create: +                raise DatabaseDoesNotExist() +        return cls(url, dbname) + +    def __init__(self, url, database, replica_uid=None, full_commit=True, +                 session=None): +        """Create a new Couch data container.""" +        self._url = url +        self._full_commit = full_commit +        self._session = session +        self._server = Server(url=self._url, +                              full_commit=self._full_commit, +                              session=self._session) +        self._dbname = database +        # this will ensure that transaction and sync logs exist and are +        # up-to-date. +        try: +            self._database = self._server[database] +        except ResourceNotFound: +            self._server.create(database) +            self._database = self._server[database] +        super(CouchDatabase, self).__init__(replica_uid=replica_uid, +                                            document_factory=LeapDocument) + +    #------------------------------------------------------------------------- +    # methods from Database +    #------------------------------------------------------------------------- + +    def _get_doc(self, doc_id, check_for_conflicts=False): +        """Get just the document content, without fancy handling.""" +        cdoc = self._database.get(doc_id) +        if cdoc is None: +            return None +        has_conflicts = False +        if check_for_conflicts: +            has_conflicts = self._has_conflicts(doc_id) +        doc = self._factory( +            doc_id=doc_id, +            rev=cdoc['u1db_rev'], +            has_conflicts=has_conflicts) +        contents = self._database.get_attachment(cdoc, 'u1db_json') +        if contents: +            doc.content = json.loads(contents.getvalue()) +        else: +            doc.make_tombstone() +        return doc + +    def get_all_docs(self, include_deleted=False): +        """Get the JSON content for all documents in the database.""" +        generation = self._get_generation() +        results = [] +        for doc_id in self._database: +            if doc_id == self.U1DB_DATA_DOC_ID: +                continue +            doc = self._get_doc(doc_id, check_for_conflicts=True) +            if doc.content is None and not include_deleted: +                continue +            results.append(doc) +        return (generation, results) + +    def _put_doc(self, doc): +        """Store document in database.""" +        # prepare couch's Document +        cdoc = CouchDocument() +        cdoc['_id'] = doc.doc_id +        # we have to guarantee that couch's _rev is cosistent +        old_cdoc = self._database.get(doc.doc_id) +        if old_cdoc is not None: +            cdoc['_rev'] = old_cdoc['_rev'] +        # store u1db's rev +        cdoc['u1db_rev'] = doc.rev +        # save doc in db +        self._database.save(cdoc) +        # store u1db's content as json string +        if not doc.is_tombstone(): +            self._database.put_attachment(cdoc, doc.get_json(), +                                          filename='u1db_json') +        else: +            self._database.delete_attachment(cdoc, 'u1db_json') + +    def get_sync_target(self): +        """ +        Return a SyncTarget object, for another u1db to synchronize with. +        """ +        return CouchSyncTarget(self) + +    def create_index(self, index_name, *index_expressions): +        """ +        Create a named index, which can then be queried for future lookups. +        """ +        if index_name in self._indexes: +            if self._indexes[index_name]._definition == list( +                    index_expressions): +                return +            raise errors.IndexNameTakenError +        index = InMemoryIndex(index_name, list(index_expressions)) +        for doc_id in self._database: +            if doc_id == self.U1DB_DATA_DOC_ID: +                continue +            doc = self._get_doc(doc_id) +            if doc.content is not None: +                index.add_json(doc_id, doc.get_json()) +        self._indexes[index_name] = index +        # save data in object store +        self._store_u1db_data() + +    def close(self): +        """Release any resources associated with this database.""" +        # TODO: fix this method so the connection is properly closed and +        # test_close (+tearDown, which deletes the db) works without problems. +        self._url = None +        self._full_commit = None +        self._session = None +        #self._server = None +        self._database = None +        return True + +    def sync(self, url, creds=None, autocreate=True): +        """Synchronize documents with remote replica exposed at url.""" +        from u1db.sync import Synchronizer +        return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync( +            autocreate=autocreate) + +    #------------------------------------------------------------------------- +    # methods from ObjectStoreDatabase +    #------------------------------------------------------------------------- + +    def _init_u1db_data(self): +        if self._replica_uid is None: +            self._replica_uid = uuid.uuid4().hex +        doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) +        doc.content = {'transaction_log': [], +                       'conflicts': b64encode(json.dumps({})), +                       'other_generations': {}, +                       'indexes': b64encode(json.dumps({})), +                       'replica_uid': self._replica_uid} +        self._put_doc(doc) + +    def _fetch_u1db_data(self): +        # retrieve u1db data from couch db +        cdoc = self._database.get(self.U1DB_DATA_DOC_ID) +        jsonstr = self._database.get_attachment(cdoc, 'u1db_json').getvalue() +        content = json.loads(jsonstr) +        # set u1db database info +        #self._sync_log = content['sync_log'] +        self._transaction_log = content['transaction_log'] +        self._conflicts = json.loads(b64decode(content['conflicts'])) +        self._other_generations = content['other_generations'] +        self._indexes = self._load_indexes_from_json( +            b64decode(content['indexes'])) +        self._replica_uid = content['replica_uid'] +        # save couch _rev +        self._couch_rev = cdoc['_rev'] + +    def _store_u1db_data(self): +        doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) +        doc.content = { +            'transaction_log': self._transaction_log, +            # Here, the b64 encode ensures that document content +            # does not cause strange behaviour in couchdb because +            # of encoding. +            'conflicts': b64encode(json.dumps(self._conflicts)), +            'other_generations': self._other_generations, +            'indexes': b64encode(self._dump_indexes_as_json()), +            'replica_uid': self._replica_uid, +            '_rev': self._couch_rev} +        self._put_doc(doc) + +    #------------------------------------------------------------------------- +    # Couch specific methods +    #------------------------------------------------------------------------- + +    def delete_database(self): +        """Delete a U1DB CouchDB database.""" +        del(self._server[self._dbname]) + +    def _dump_indexes_as_json(self): +        indexes = {} +        for name, idx in self._indexes.iteritems(): +            indexes[name] = {} +            for attr in ['name', 'definition', 'values']: +                indexes[name][attr] = getattr(idx, '_' + attr) +        return json.dumps(indexes) + +    def _load_indexes_from_json(self, indexes): +        dict = {} +        for name, idx_dict in json.loads(indexes).iteritems(): +            idx = InMemoryIndex(name, idx_dict['definition']) +            idx._values = idx_dict['values'] +            dict[name] = idx +        return dict + + +class CouchSyncTarget(ObjectStoreSyncTarget): +    pass + + +class CouchServerState(ServerState): +    """Inteface of the WSGI server with the CouchDB backend.""" + +    def __init__(self, couch_url): +        self.couch_url = couch_url + +    def open_database(self, dbname): +        """Open a database at the given location.""" +        # TODO: open couch +        from leap.soledad.backends.couch import CouchDatabase +        return CouchDatabase.open_database(self.couch_url + '/' + dbname, +                                           create=False) + +    def ensure_database(self, dbname): +        """Ensure database at the given location.""" +        from leap.soledad.backends.couch import CouchDatabase +        db = CouchDatabase.open_database(self.couch_url + '/' + dbname, +                                         create=True) +        return db, db._replica_uid + +    def delete_database(self, dbname): +        """Delete database at the given location.""" +        from leap.soledad.backends.couch import CouchDatabase +        CouchDatabase.delete_database(self.couch_url + '/' + dbname) | 
