diff options
author | drebs <drebs@leap.se> | 2012-12-06 11:07:53 -0200 |
---|---|---|
committer | drebs <drebs@leap.se> | 2012-12-06 11:07:53 -0200 |
commit | 30c9ec2a115b157e608e97c9b8449fc9c753b60f (patch) | |
tree | e36dfed8ef7cd0beae446909eef22b662cb92554 /backends | |
parent | 08fd87714f503c551b8e2dc29e55ae4497b42759 (diff) |
Remove u1db and swiftclient dirs and refactor.
Diffstat (limited to 'backends')
-rw-r--r-- | backends/__init__.py | 0 | ||||
-rw-r--r-- | backends/leap.py | 157 | ||||
-rw-r--r-- | backends/openstack.py | 369 |
3 files changed, 526 insertions, 0 deletions
diff --git a/backends/__init__.py b/backends/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/backends/__init__.py diff --git a/backends/leap.py b/backends/leap.py new file mode 100644 index 00000000..2c815632 --- /dev/null +++ b/backends/leap.py @@ -0,0 +1,157 @@ +try: + import simplejson as json +except ImportError: + import json # noqa + +from u1db import Document +from u1db.remote.http_target import HTTPSyncTarget +from u1db.remote.http_database import HTTPDatabase +import base64 + + +class NoDefaultKey(Exception): + pass + + +class LeapDocument(Document): + """ + LEAP Documents are standard u1db documents with cabability of returning an + encrypted version of the document json string as well as setting document + content based on an encrypted version of json string. + """ + + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, + encrypted_json=None, default_key=None, gpg_wrapper=None): + super(LeapDocument, self).__init__(doc_id, rev, json, has_conflicts) + # we might want to get already initialized wrappers for testing. + if gpg_wrapper is None: + self._gpg = GPGWrapper() + else: + self._gpg = gpg_wrapper + if encrypted_json: + self.set_encrypted_json(encrypted_json) + self._default_key = default_key + + def get_encrypted_json(self): + """ + Returns document's json serialization encrypted with user's public key. + """ + if self._default_key is None: + raise NoDefaultKey() + cyphertext = self._gpg.encrypt(self.get_json(), + self._default_key, + always_trust = True) + # TODO: always trust? + return json.dumps({'cyphertext' : str(cyphertext)}) + + def set_encrypted_json(self, encrypted_json): + """ + Set document's content based on encrypted version of json string. + """ + cyphertext = json.loads(encrypted_json)['cyphertext'] + plaintext = str(self._gpg.decrypt(cyphertext)) + return self.set_json(plaintext) + + +class LeapDatabase(HTTPDatabase): + """Implement the HTTP remote database API to a Leap server.""" + + @staticmethod + def open_database(url, create): + db = LeapDatabase(url) + db.open(create) + return db + + @staticmethod + def delete_database(url): + db = LeapDatabase(url) + db._delete() + db.close() + + def get_sync_target(self): + st = LeapSyncTarget(self._url.geturl()) + st._creds = self._creds + return st + + +class LeapSyncTarget(HTTPSyncTarget): + + def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None): + """ + Does the same as parent's method but ensures incoming content will be + decrypted. + """ + parts = data.splitlines() # one at a time + if not parts or parts[0] != '[': + raise BrokenSyncStream + data = parts[1:-1] + comma = False + if data: + line, comma = utils.check_and_strip_comma(data[0]) + res = json.loads(line) + if ensure_callback and 'replica_uid' in res: + ensure_callback(res['replica_uid']) + for entry in data[1:]: + if not comma: # missing in between comma + raise BrokenSyncStream + line, comma = utils.check_and_strip_comma(entry) + entry = json.loads(line) + doc = LeapDocument(entry['id'], entry['rev'], + encrypted_json=entry['content']) + return_doc_cb(doc, entry['gen'], entry['trans_id']) + if parts[-1] != ']': + try: + partdic = json.loads(parts[-1]) + except ValueError: + pass + else: + if isinstance(partdic, dict): + self._error(partdic) + raise BrokenSyncStream + if not data or comma: # no entries or bad extra comma + raise BrokenSyncStream + return res + + def sync_exchange(self, docs_by_generations, source_replica_uid, + last_known_generation, last_known_trans_id, + return_doc_cb, ensure_callback=None): + """ + Does the same as parent's method but encrypts content before syncing. + """ + self._ensure_connection() + if self._trace_hook: # for tests + self._trace_hook('sync_exchange') + url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) + self._conn.putrequest('POST', url) + self._conn.putheader('content-type', 'application/x-u1db-sync-stream') + for header_name, header_value in self._sign_request('POST', url, {}): + self._conn.putheader(header_name, header_value) + entries = ['['] + size = 1 + + def prepare(**dic): + entry = comma + '\r\n' + json.dumps(dic) + entries.append(entry) + return len(entry) + + comma = '' + size += prepare( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + ensure=ensure_callback is not None) + comma = ',' + for doc, gen, trans_id in docs_by_generations: + size += prepare(id=doc.doc_id, rev=doc.rev, + content=doc.get_encrypted_json(), + gen=gen, trans_id=trans_id) + entries.append('\r\n]') + size += len(entries[-1]) + self._conn.putheader('content-length', str(size)) + self._conn.endheaders() + for entry in entries: + self._conn.send(entry) + entries = None + data, _ = self._response() + res = self._parse_sync_stream(data, return_doc_cb, ensure_callback) + data = None + return res['new_generation'], res['new_transaction_id'] diff --git a/backends/openstack.py b/backends/openstack.py new file mode 100644 index 00000000..ec4609b4 --- /dev/null +++ b/backends/openstack.py @@ -0,0 +1,369 @@ +from leap import * +from u1db import errors +from u1db.backends import CommonBackend +from u1db.remote.http_target import HTTPSyncTarget +from swiftclient import client + + +class OpenStackDatabase(CommonBackend): + """A U1DB implementation that uses OpenStack as its persistence layer.""" + + def __init__(self, auth_url, user, auth_key, container): + """Create a new OpenStack data container.""" + self._auth_url = auth_url + self._user = user + self._auth_key = auth_key + self._container = container + self.set_document_factory(LeapDocument) + self._connection = swiftclient.Connection(self._auth_url, self._user, + self._auth_key) + self._get_auth() + self._ensure_u1db_data() + + #------------------------------------------------------------------------- + # implemented methods from Database + #------------------------------------------------------------------------- + + def set_document_factory(self, factory): + self._factory = factory + + def set_document_size_limit(self, limit): + raise NotImplementedError(self.set_document_size_limit) + + def whats_changed(self, old_generation=0): + self._get_u1db_data() + # This method is implemented in TransactionLog because testing is + # easier like this for now, but it can be moved to here afterwards. + return self._transaction_log.whats_changed(old_generation) + + def _get_doc(self, doc_id, check_for_conflicts=False): + """Get just the document content, without fancy handling. + + Conflicts do not happen on server side, so there's no need to check + for them. + """ + try: + response, contents = self._connection.get_object(self._container, doc_id) + rev = response['x-object-meta-rev'] + return self._factory(doc_id, rev, contents) + except swiftclient.ClientException: + return None + + def get_doc(self, doc_id, include_deleted=False): + doc = self._get_doc(doc_id, check_for_conflicts=True) + if doc is None: + return None + if doc.is_tombstone() and not include_deleted: + return None + return doc + + def get_all_docs(self, include_deleted=False): + """Get all documents from the database.""" + generation = self._get_generation() + results = [] + _, doc_ids = self._connection.get_container(self._container, + full_listing=True) + for doc_id in doc_ids: + doc = self._get_doc(doc_id) + if doc.content is None and not include_deleted: + continue + results.append(doc) + return (generation, results) + + def put_doc(self, doc): + if doc.doc_id is None: + raise errors.InvalidDocId() + self._check_doc_id(doc.doc_id) + self._check_doc_size(doc) + # TODO: check for conflicts? + new_rev = self._allocate_doc_rev(doc.rev) + headers = { 'X-Object-Meta-Rev' : new_rev } + self._connection.put_object(self._container, doc_id, doc.get_json(), + headers=headers) + new_gen = self._get_generation() + 1 + trans_id = self._allocate_transaction_id() + self._transaction_log.append((new_gen, doc.doc_id, trans_id)) + self._set_u1db_data() + return new_rev + + def delete_doc(self, doc): + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc is None: + raise errors.DocumentDoesNotExist + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + if old_doc.is_tombstone(): + raise errors.DocumentAlreadyDeleted + if old_doc.has_conflicts: + raise errors.ConflictedDoc() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + doc.make_tombstone() + self._put_doc(olddoc) + return new_rev + + # start of index-related methods: these are not supported by this backend. + + def create_index(self, index_name, *index_expressions): + return False + + def delete_index(self, index_name): + return False + + def list_indexes(self): + return [] + + def get_from_index(self, index_name, *key_values): + return [] + + def get_range_from_index(self, index_name, start_value=None, + end_value=None): + return [] + + def get_index_keys(self, index_name): + return [] + + # end of index-related methods: these are not supported by this backend. + + def get_doc_conflicts(self, doc_id): + return [] + + def resolve_doc(self, doc, conflicted_doc_revs): + raise NotImplementedError(self.resolve_doc) + + def get_sync_target(self): + return OpenStackSyncTarget(self) + + def close(self): + raise NotImplementedError(self.close) + + def sync(self, url, creds=None, autocreate=True): + from u1db.sync import Synchronizer + from u1db.remote.http_target import OpenStackSyncTarget + return Synchronizer(self, OpenStackSyncTarget(url, creds=creds)).sync( + autocreate=autocreate) + + def _get_replica_gen_and_trans_id(self, other_replica_uid): + self._get_u1db_data() + return self._sync_log.get_replica_gen_and_trans_id(other_replica_uid) + + def _set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + self._get_u1db_data() + self._sync_log.set_replica_gen_and_trans_id(other_replica_uid, + other_generation, + other_transaction_id) + self._set_u1db_data() + + #------------------------------------------------------------------------- + # implemented methods from CommonBackend + #------------------------------------------------------------------------- + + def _get_generation(self): + self._get_u1db_data() + return self._transaction_log.get_generation() + + def _get_generation_info(self): + self._get_u1db_data() + return self._transaction_log.get_generation_info() + + def _has_conflicts(self, doc_id): + # Documents never have conflicts on server. + return False + + def _put_and_update_indexes(self, doc_id, old_doc, new_rev, content): + raise NotImplementedError(self._put_and_update_indexes) + + + def _get_trans_id_for_gen(self, generation): + self._get_u1db_data() + trans_id = self._transaction_log.get_trans_id_for_gen(generation) + if trans_id is None: + raise errors.InvalidGeneration + return trans_id + + #------------------------------------------------------------------------- + # OpenStack specific methods + #------------------------------------------------------------------------- + + def _ensure_u1db_data(self): + """ + Guarantee that u1db data exists in store. + """ + if self._is_initialized(): + return + self._initialize() + + def _is_initialized(self): + """ + Verify if u1db data exists in store. + """ + if not self._get_doc('u1db_data'): + return False + return True + + def _initialize(self): + """ + Create u1db data object in store. + """ + content = { 'transaction_log' : [], + 'sync_log' : [] } + doc = self.create_doc('u1db_data', content) + + def _get_auth(self): + self._url, self._auth_token = self._connection.get_auth() + return self._url, self.auth_token + + def _get_u1db_data(self): + data = self.get_doc('u1db_data').content + self._transaction_log = data['transaction_log'] + self._sync_log = data['sync_log'] + + def _set_u1db_data(self): + doc = self._factory('u1db_data') + doc.content = { 'transaction_log' : self._transaction_log, + 'sync_log' : self._sync_log } + self.put_doc(doc) + + +class OpenStackSyncTarget(HTTPSyncTarget): + + def get_sync_info(self, source_replica_uid): + source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( + source_replica_uid) + my_gen, my_trans_id = self._db._get_generation_info() + return ( + self._db._replica_uid, my_gen, my_trans_id, source_gen, + source_trans_id) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + if self._trace_hook: + self._trace_hook('record_sync_info') + self._db._set_replica_gen_and_trans_id( + source_replica_uid, source_replica_generation, + source_replica_transaction_id) + + +class SimpleLog(object): + def __init__(self): + self._log = [] + + def _set_log(self, log): + self._log = log + + def _get_log(self): + return self._log + + log = property( + _get_log, _set_log, doc="Log contents.") + + def append(self, msg): + self._log.append(msg) + + def reduce(self, func, initializer=None): + return reduce(func, self.log, initializer) + + def map(self, func): + return map(func, self.log) + + def filter(self, func): + return filter(func, self.log) + + +class TransactionLog(SimpleLog): + """ + A list of (generation, doc_id, transaction_id) tuples. + """ + + def _set_log(self, log): + self._log = log + + def _get_log(self): + return sorted(self._log, reverse=True) + + log = property( + _get_log, _set_log, doc="Log contents.") + + def get_generation(self): + """ + Return the current generation. + """ + gens = self.map(lambda x: x[0]) + if not gens: + return 0 + return max(gens) + + def get_generation_info(self): + """ + Return the current generation and transaction id. + """ + if not self._log: + return(0, '') + info = self.map(lambda x: (x[0], x[2])) + return reduce(lambda x, y: x if (x[0] > y[0]) else y, info) + + def get_trans_id_for_gen(self, gen): + """ + Get the transaction id corresponding to a particular generation. + """ + log = self.reduce(lambda x, y: y if y[0] == gen else x) + if log is None: + return None + return log[2] + + def whats_changed(self, old_generation): + results = self.filter(lambda x: x[0] > old_generation) + seen = set() + changes = [] + newest_trans_id = '' + for generation, doc_id, trans_id in results: + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + if changes: + cur_gen = changes[0][1] # max generation + newest_trans_id = changes[0][2] + changes.reverse() + else: + results = self.log + if not results: + cur_gen = 0 + newest_trans_id = '' + else: + cur_gen, _, newest_trans_id = results[0] + + return cur_gen, newest_trans_id, changes + + + +class SyncLog(SimpleLog): + """ + A list of (replica_id, generation, transaction_id) tuples. + """ + + def find_by_replica_uid(self, replica_uid): + if not self.log: + return () + return self.reduce(lambda x, y: y if y[0] == replica_uid else x) + + def get_replica_gen_and_trans_id(self, other_replica_uid): + """ + Return the last known generation and transaction id for the other db + replica. + """ + info = self.find_by_replica_uid(other_replica_uid) + if not info: + return (0, '') + return (info[1], info[2]) + + def set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + """ + self.log = self.filter(lambda x: x[0] != other_replica_uid) + self.append((other_replica_uid, other_generation, + other_transaction_id)) + |