diff options
author | drebs <drebs@leap.se> | 2012-12-24 10:14:24 -0200 |
---|---|---|
committer | drebs <drebs@leap.se> | 2012-12-24 10:14:24 -0200 |
commit | ca5fb41a55e1292005ed186baf3710831d9ad678 (patch) | |
tree | ac7eb9e8f5d993732e769607903ef626518749cd /src/leap/soledad/backends | |
parent | 564b82fa30ebcd8a0abfea54e00506dd77446a54 (diff) | |
parent | 277f17aa7b7bbcc48583149a3d72d8621f83c0ff (diff) |
Merge branch 'feature/u1db-openstack-backend' into develop
Diffstat (limited to 'src/leap/soledad/backends')
-rw-r--r-- | src/leap/soledad/backends/__init__.py | 0 | ||||
-rw-r--r-- | src/leap/soledad/backends/couch.py | 127 | ||||
-rw-r--r-- | src/leap/soledad/backends/leap.py | 175 | ||||
-rw-r--r-- | src/leap/soledad/backends/objectstore.py | 210 | ||||
-rw-r--r-- | src/leap/soledad/backends/openstack.py | 98 | ||||
-rw-r--r-- | src/leap/soledad/backends/sqlcipher.py | 127 |
6 files changed, 737 insertions, 0 deletions
diff --git a/src/leap/soledad/backends/__init__.py b/src/leap/soledad/backends/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/leap/soledad/backends/__init__.py diff --git a/src/leap/soledad/backends/couch.py b/src/leap/soledad/backends/couch.py new file mode 100644 index 00000000..ed356fdd --- /dev/null +++ b/src/leap/soledad/backends/couch.py @@ -0,0 +1,127 @@ +from u1db import errors +from u1db.remote.http_target import HTTPSyncTarget +from couchdb.client import Server, Document +from couchdb.http import ResourceNotFound +from soledad.backends.objectstore import ObjectStore +from soledad.backends.leap import LeapDocument + +try: + import simplejson as json +except ImportError: + import json # noqa + + +class CouchDatabase(ObjectStore): + """A U1DB implementation that uses Couch as its persistence layer.""" + + def __init__(self, url, database, full_commit=True, session=None): + """Create a new Couch data container.""" + self._url = url + self._full_commit = full_commit + self._session = session + self._server = Server(url=self._url, + full_commit=self._full_commit, + session=self._session) + # this will ensure that transaction and sync logs exist and are + # up-to-date. + self.set_document_factory(LeapDocument) + try: + self._database = self._server[database] + except ResourceNotFound: + self._server.create(database) + self._database = self._server[database] + super(CouchDatabase, self).__init__() + + #------------------------------------------------------------------------- + # implemented methods from Database + #------------------------------------------------------------------------- + + def _get_doc(self, doc_id, check_for_conflicts=False): + """Get just the document content, without fancy handling. + + Conflicts do not happen on server side, so there's no need to check + for them. + """ + cdoc = self._database.get(doc_id) + if cdoc is None: + return None + doc = self._factory(doc_id=doc_id, rev=cdoc['u1db_rev']) + if cdoc['u1db_json'] is not None: + doc.content = json.loads(cdoc['u1db_json']) + else: + doc.make_tombstone() + return doc + + def get_all_docs(self, include_deleted=False): + """Get all documents from the database.""" + generation = self._get_generation() + results = [] + for doc_id in self._database: + doc = self._get_doc(doc_id) + if doc.content is None and not include_deleted: + continue + results.append(doc) + return (generation, results) + + def _put_doc(self, doc): + # prepare couch's Document + cdoc = Document() + cdoc['_id'] = doc.doc_id + # we have to guarantee that couch's _rev is cosistent + old_cdoc = self._database.get(doc.doc_id) + if old_cdoc is not None: + cdoc['_rev'] = old_cdoc['_rev'] + # store u1db's rev + cdoc['u1db_rev'] = doc.rev + # store u1db's content as json string + if not doc.is_tombstone(): + cdoc['u1db_json'] = doc.get_json() + else: + cdoc['u1db_json'] = None + self._database.save(cdoc) + + def get_sync_target(self): + return CouchSyncTarget(self) + + def close(self): + raise NotImplementedError(self.close) + + def sync(self, url, creds=None, autocreate=True): + from u1db.sync import Synchronizer + from u1db.remote.http_target import CouchSyncTarget + return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync( + autocreate=autocreate) + + def _get_u1db_data(self): + cdoc = self._database.get(self.U1DB_DATA_DOC_ID) + content = json.loads(cdoc['u1db_json']) + self._sync_log.log = content['sync_log'] + self._transaction_log.log = content['transaction_log'] + self._replica_uid = content['replica_uid'] + self._couch_rev = cdoc['_rev'] + + #------------------------------------------------------------------------- + # Couch specific methods + #------------------------------------------------------------------------- + + # no specific methods so far. + +class CouchSyncTarget(HTTPSyncTarget): + + def get_sync_info(self, source_replica_uid): + source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( + source_replica_uid) + my_gen, my_trans_id = self._db._get_generation_info() + return ( + self._db._replica_uid, my_gen, my_trans_id, source_gen, + source_trans_id) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + if self._trace_hook: + self._trace_hook('record_sync_info') + self._db._set_replica_gen_and_trans_id( + source_replica_uid, source_replica_generation, + source_replica_transaction_id) + + diff --git a/src/leap/soledad/backends/leap.py b/src/leap/soledad/backends/leap.py new file mode 100644 index 00000000..9fbd49fe --- /dev/null +++ b/src/leap/soledad/backends/leap.py @@ -0,0 +1,175 @@ +try: + import simplejson as json +except ImportError: + import json # noqa + +from u1db import Document +from u1db.remote.http_target import HTTPSyncTarget +from u1db.remote.http_database import HTTPDatabase +import base64 +from soledad.util import GPGWrapper + + +class NoDefaultKey(Exception): + pass + +class NoSoledadInstance(Exception): + pass + + +class LeapDocument(Document): + """ + LEAP Documents are standard u1db documents with cabability of returning an + encrypted version of the document json string as well as setting document + content based on an encrypted version of json string. + """ + + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, + encrypted_json=None, soledad=None): + super(LeapDocument, self).__init__(doc_id, rev, json, has_conflicts) + self._soledad = soledad + if encrypted_json: + self.set_encrypted_json(encrypted_json) + + def get_encrypted_json(self): + """ + Returns document's json serialization encrypted with user's public key. + """ + if not self._soledad: + raise NoSoledadInstance() + ciphertext = self._soledad.encrypt_symmetric(self.doc_id, self.get_json()) + return json.dumps({'_encrypted_json' : ciphertext}) + + def set_encrypted_json(self, encrypted_json): + """ + Set document's content based on encrypted version of json string. + """ + if not self._soledad: + raise NoSoledadInstance() + ciphertext = json.loads(encrypted_json)['_encrypted_json'] + plaintext = self._soledad.decrypt_symmetric(self.doc_id, ciphertext) + return self.set_json(plaintext) + + +class LeapDatabase(HTTPDatabase): + """Implement the HTTP remote database API to a Leap server.""" + + def __init__(self, url, document_factory=None, creds=None, soledad=None): + super(LeapDatabase, self).__init__(url, creds=creds) + self._soledad = soledad + self._factory = LeapDocument + + @staticmethod + def open_database(url, create): + db = LeapDatabase(url) + db.open(create) + return db + + @staticmethod + def delete_database(url): + db = LeapDatabase(url) + db._delete() + db.close() + + def get_sync_target(self): + st = LeapSyncTarget(self._url.geturl()) + st._creds = self._creds + return st + + def create_doc_from_json(self, content, doc_id=None): + if doc_id is None: + doc_id = self._allocate_doc_id() + res, headers = self._request_json('PUT', ['doc', doc_id], {}, + content, 'application/json') + new_doc = self._factory(doc_id, res['rev'], content, soledad=self._soledad) + return new_doc + + +class LeapSyncTarget(HTTPSyncTarget): + + def __init__(self, url, creds=None, soledad=None): + super(LeapSyncTarget, self).__init__(url, creds) + self._soledad = soledad + + def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None): + """ + Does the same as parent's method but ensures incoming content will be + decrypted. + """ + parts = data.splitlines() # one at a time + if not parts or parts[0] != '[': + raise BrokenSyncStream + data = parts[1:-1] + comma = False + if data: + line, comma = utils.check_and_strip_comma(data[0]) + res = json.loads(line) + if ensure_callback and 'replica_uid' in res: + ensure_callback(res['replica_uid']) + for entry in data[1:]: + if not comma: # missing in between comma + raise BrokenSyncStream + line, comma = utils.check_and_strip_comma(entry) + entry = json.loads(line) + # decrypt after receiving from server. + doc = LeapDocument(entry['id'], entry['rev'], + encrypted_json=entry['content'], + soledad=self._soledad) + return_doc_cb(doc, entry['gen'], entry['trans_id']) + if parts[-1] != ']': + try: + partdic = json.loads(parts[-1]) + except ValueError: + pass + else: + if isinstance(partdic, dict): + self._error(partdic) + raise BrokenSyncStream + if not data or comma: # no entries or bad extra comma + raise BrokenSyncStream + return res + + def sync_exchange(self, docs_by_generations, source_replica_uid, + last_known_generation, last_known_trans_id, + return_doc_cb, ensure_callback=None): + """ + Does the same as parent's method but encrypts content before syncing. + """ + self._ensure_connection() + if self._trace_hook: # for tests + self._trace_hook('sync_exchange') + url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) + self._conn.putrequest('POST', url) + self._conn.putheader('content-type', 'application/x-u1db-sync-stream') + for header_name, header_value in self._sign_request('POST', url, {}): + self._conn.putheader(header_name, header_value) + entries = ['['] + size = 1 + + def prepare(**dic): + entry = comma + '\r\n' + json.dumps(dic) + entries.append(entry) + return len(entry) + + comma = '' + size += prepare( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + ensure=ensure_callback is not None) + comma = ',' + for doc, gen, trans_id in docs_by_generations: + # encrypt before sending to server. + size += prepare(id=doc.doc_id, rev=doc.rev, + content=doc.get_encrypted_json(), + gen=gen, trans_id=trans_id) + entries.append('\r\n]') + size += len(entries[-1]) + self._conn.putheader('content-length', str(size)) + self._conn.endheaders() + for entry in entries: + self._conn.send(entry) + entries = None + data, _ = self._response() + res = self._parse_sync_stream(data, return_doc_cb, ensure_callback) + data = None + return res['new_generation'], res['new_transaction_id'] diff --git a/src/leap/soledad/backends/objectstore.py b/src/leap/soledad/backends/objectstore.py new file mode 100644 index 00000000..61445a1f --- /dev/null +++ b/src/leap/soledad/backends/objectstore.py @@ -0,0 +1,210 @@ +import uuid +from u1db.backends import CommonBackend +from u1db import errors, Document +from soledad.util import SyncLog, TransactionLog + + +class ObjectStore(CommonBackend): + """ + A backend for storing u1db data in an object store. + """ + + def __init__(self): + # This initialization method should be called after the connection + # with the database is established, so it can ensure that u1db data is + # configured and up-to-date. + self.set_document_factory(Document) + self._sync_log = SyncLog() + self._transaction_log = TransactionLog() + self._ensure_u1db_data() + + #------------------------------------------------------------------------- + # implemented methods from Database + #------------------------------------------------------------------------- + + def set_document_factory(self, factory): + self._factory = factory + + def set_document_size_limit(self, limit): + raise NotImplementedError(self.set_document_size_limit) + + def whats_changed(self, old_generation=0): + self._get_u1db_data() + return self._transaction_log.whats_changed(old_generation) + + def get_doc(self, doc_id, include_deleted=False): + doc = self._get_doc(doc_id, check_for_conflicts=True) + if doc is None: + return None + if doc.is_tombstone() and not include_deleted: + return None + return doc + + def _put_doc(self, doc): + raise NotImplementedError(self._put_doc) + + def put_doc(self, doc): + # consistency check + if doc.doc_id is None: + raise errors.InvalidDocId() + self._check_doc_id(doc.doc_id) + self._check_doc_size(doc) + # check if document exists + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc and old_doc.has_conflicts: + raise errors.ConflictedDoc() + if old_doc and doc.rev is None and old_doc.is_tombstone(): + new_rev = self._allocate_doc_rev(old_doc.rev) + else: + if old_doc is not None: + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + else: + if doc.rev is not None: + raise errors.RevisionConflict() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + self._put_doc(doc) + # update u1db generation and logs + new_gen = self._get_generation() + 1 + trans_id = self._allocate_transaction_id() + self._transaction_log.append((new_gen, doc.doc_id, trans_id)) + self._set_u1db_data() + return doc.rev + + def delete_doc(self, doc): + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc is None: + raise errors.DocumentDoesNotExist + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + if old_doc.is_tombstone(): + raise errors.DocumentAlreadyDeleted + if old_doc.has_conflicts: + raise errors.ConflictedDoc() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + doc.make_tombstone() + self._put_doc(doc) + return new_rev + + # start of index-related methods: these are not supported by this backend. + + def create_index(self, index_name, *index_expressions): + return False + + def delete_index(self, index_name): + return False + + def list_indexes(self): + return [] + + def get_from_index(self, index_name, *key_values): + return [] + + def get_range_from_index(self, index_name, start_value=None, + end_value=None): + return [] + + def get_index_keys(self, index_name): + return [] + + # end of index-related methods: these are not supported by this backend. + + def get_doc_conflicts(self, doc_id): + return [] + + def resolve_doc(self, doc, conflicted_doc_revs): + raise NotImplementedError(self.resolve_doc) + + def _get_replica_gen_and_trans_id(self, other_replica_uid): + self._get_u1db_data() + return self._sync_log.get_replica_gen_and_trans_id(other_replica_uid) + + def _set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + self._get_u1db_data() + self._sync_log.set_replica_gen_and_trans_id(other_replica_uid, + other_generation, + other_transaction_id) + self._set_u1db_data() + + #------------------------------------------------------------------------- + # implemented methods from CommonBackend + #------------------------------------------------------------------------- + + def _get_generation(self): + self._get_u1db_data() + return self._transaction_log.get_generation() + + def _get_generation_info(self): + self._get_u1db_data() + return self._transaction_log.get_generation_info() + + def _has_conflicts(self, doc_id): + # Documents never have conflicts on server. + return False + + def _put_and_update_indexes(self, doc_id, old_doc, new_rev, content): + raise NotImplementedError(self._put_and_update_indexes) + + + def _get_trans_id_for_gen(self, generation): + self._get_u1db_data() + trans_id = self._transaction_log.get_trans_id_for_gen(generation) + if trans_id is None: + raise errors.InvalidGeneration + return trans_id + + #------------------------------------------------------------------------- + # methods specific for object stores + #------------------------------------------------------------------------- + + def _ensure_u1db_data(self): + """ + Guarantee that u1db data (logs and replica info) exists in store. + """ + if not self._is_initialized(): + self._initialize() + self._get_u1db_data() + + U1DB_DATA_DOC_ID = 'u1db_data' + + def _is_initialized(self): + """ + Verify if u1db data exists in store. + """ + doc = self._get_doc(self.U1DB_DATA_DOC_ID) + if not self._get_doc(self.U1DB_DATA_DOC_ID): + return False + return True + + def _initialize(self): + """ + Create u1db data object in store. + """ + self._replica_uid = uuid.uuid4().hex + doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) + doc.content = { 'transaction_log' : [], + 'sync_log' : [], + 'replica_uid' : self._replica_uid } + self._put_doc(doc) + + def _get_u1db_data(self, u1db_data_doc_id): + """ + Fetch u1db configuration data from backend storage. + """ + NotImplementedError(self._get_u1db_data) + + def _set_u1db_data(self): + """ + Save u1db configuration data on backend storage. + """ + doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) + doc.content = { 'transaction_log' : self._transaction_log.log, + 'sync_log' : self._sync_log.log, + 'replica_uid' : self._replica_uid, + '_rev' : self._couch_rev} + self._put_doc(doc) + + diff --git a/src/leap/soledad/backends/openstack.py b/src/leap/soledad/backends/openstack.py new file mode 100644 index 00000000..c027231c --- /dev/null +++ b/src/leap/soledad/backends/openstack.py @@ -0,0 +1,98 @@ +from u1db import errors +from u1db.remote.http_target import HTTPSyncTarget +from swiftclient import client +from soledad.backends.objectstore import ObjectStore + + +class OpenStackDatabase(ObjectStore): + """A U1DB implementation that uses OpenStack as its persistence layer.""" + + def __init__(self, auth_url, user, auth_key, container): + """Create a new OpenStack data container.""" + self._auth_url = auth_url + self._user = user + self._auth_key = auth_key + self._container = container + self._connection = swiftclient.Connection(self._auth_url, self._user, + self._auth_key) + self._get_auth() + # this will ensure transaction and sync logs exist and are up-to-date. + super(OpenStackDatabase, self).__init__() + + #------------------------------------------------------------------------- + # implemented methods from Database + #------------------------------------------------------------------------- + + def _get_doc(self, doc_id, check_for_conflicts=False): + """Get just the document content, without fancy handling. + + Conflicts do not happen on server side, so there's no need to check + for them. + """ + try: + response, contents = self._connection.get_object(self._container, doc_id) + # TODO: change revision to be a dictionary element? + rev = response['x-object-meta-rev'] + return self._factory(doc_id, rev, contents) + except swiftclient.ClientException: + return None + + def get_all_docs(self, include_deleted=False): + """Get all documents from the database.""" + generation = self._get_generation() + results = [] + _, doc_ids = self._connection.get_container(self._container, + full_listing=True) + for doc_id in doc_ids: + doc = self._get_doc(doc_id) + if doc.content is None and not include_deleted: + continue + results.append(doc) + return (generation, results) + + def _put_doc(self, doc, new_rev): + new_rev = self._allocate_doc_rev(doc.rev) + # TODO: change revision to be a dictionary element? + headers = { 'X-Object-Meta-Rev' : new_rev } + self._connection.put_object(self._container, doc_id, doc.get_json(), + headers=headers) + + def get_sync_target(self): + return OpenStackSyncTarget(self) + + def close(self): + raise NotImplementedError(self.close) + + def sync(self, url, creds=None, autocreate=True): + from u1db.sync import Synchronizer + from u1db.remote.http_target import OpenStackSyncTarget + return Synchronizer(self, OpenStackSyncTarget(url, creds=creds)).sync( + autocreate=autocreate) + + #------------------------------------------------------------------------- + # OpenStack specific methods + #------------------------------------------------------------------------- + + def _get_auth(self): + self._url, self._auth_token = self._connection.get_auth() + return self._url, self.auth_token + +class OpenStackSyncTarget(HTTPSyncTarget): + + def get_sync_info(self, source_replica_uid): + source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( + source_replica_uid) + my_gen, my_trans_id = self._db._get_generation_info() + return ( + self._db._replica_uid, my_gen, my_trans_id, source_gen, + source_trans_id) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + if self._trace_hook: + self._trace_hook('record_sync_info') + self._db._set_replica_gen_and_trans_id( + source_replica_uid, source_replica_generation, + source_replica_transaction_id) + + diff --git a/src/leap/soledad/backends/sqlcipher.py b/src/leap/soledad/backends/sqlcipher.py new file mode 100644 index 00000000..6fd6e619 --- /dev/null +++ b/src/leap/soledad/backends/sqlcipher.py @@ -0,0 +1,127 @@ +# Copyright 2011 Canonical Ltd. +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +"""A U1DB implementation that uses SQLCipher as its persistence layer.""" + +import errno +import os +try: + import simplejson as json +except ImportError: + import json # noqa +from sqlite3 import dbapi2 +import sys +import time +import uuid + +import pkg_resources + +from u1db.backends import CommonBackend, CommonSyncTarget +from u1db.backends.sqlite_backend import SQLitePartialExpandDatabase +from u1db import ( + Document, + errors, + query_parser, + vectorclock, + ) + + +def open(path, create, document_factory=None, password=None): + """Open a database at the given location. + + Will raise u1db.errors.DatabaseDoesNotExist if create=False and the + database does not already exist. + + :param path: The filesystem path for the database to open. + :param create: True/False, should the database be created if it doesn't + already exist? + :param document_factory: A function that will be called with the same + parameters as Document.__init__. + :return: An instance of Database. + """ + from u1db.backends import sqlite_backend + return sqlite_backend.SQLCipherDatabase.open_database( + path, password, create=create, document_factory=document_factory) + + +class SQLCipherDatabase(SQLitePartialExpandDatabase): + """A U1DB implementation that uses SQLCipher as its persistence layer.""" + + _index_storage_value = 'expand referenced encrypted' + + + @classmethod + def set_pragma_key(cls, db_handle, key): + db_handle.cursor().execute("PRAGMA key = '%s'" % key) + + def __init__(self, sqlite_file, password, document_factory=None): + """Create a new sqlite file.""" + self._db_handle = dbapi2.connect(sqlite_file) + SQLCipherDatabase.set_pragma_key(self._db_handle, password) + self._real_replica_uid = None + self._ensure_schema() + self._factory = document_factory or Document + + @classmethod + def _open_database(cls, sqlite_file, password, document_factory=None): + if not os.path.isfile(sqlite_file): + raise errors.DatabaseDoesNotExist() + tries = 2 + while True: + # Note: There seems to be a bug in sqlite 3.5.9 (with python2.6) + # where without re-opening the database on Windows, it + # doesn't see the transaction that was just committed + db_handle = dbapi2.connect(sqlite_file) + SQLCipherDatabase.set_pragma_key(db_handle, password) + c = db_handle.cursor() + v, err = cls._which_index_storage(c) + db_handle.close() + if v is not None: + break + # possibly another process is initializing it, wait for it to be + # done + if tries == 0: + raise err # go for the richest error? + tries -= 1 + time.sleep(cls.WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL) + return SQLCipherDatabase._sqlite_registry[v]( + sqlite_file, password, document_factory=document_factory) + + @classmethod + def open_database(cls, sqlite_file, password, create, backend_cls=None, + document_factory=None): + try: + return cls._open_database(sqlite_file, password, + document_factory=document_factory) + except errors.DatabaseDoesNotExist: + if not create: + raise + if backend_cls is None: + # default is SQLCipherPartialExpandDatabase + backend_cls = SQLCipherDatabase + return backend_cls(sqlite_file, password, + document_factory=document_factory) + + @staticmethod + def register_implementation(klass): + """Register that we implement an SQLCipherDatabase. + + The attribute _index_storage_value will be used as the lookup key. + """ + SQLCipherDatabase._sqlite_registry[klass._index_storage_value] = klass + + +SQLCipherDatabase.register_implementation(SQLCipherDatabase) |