From 7a932811c018bb30b584451d4fe114cf69ab420c Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 3 Dec 2012 11:13:51 -0200 Subject: Split leap infrastructure and openstack backend in different files. --- src/leap/soledad/__init__.py | 256 +----------------------------------------- src/leap/soledad/leap.py | 114 +++++++++++++++++++ src/leap/soledad/openstack.py | 141 +++++++++++++++++++++++ 3 files changed, 257 insertions(+), 254 deletions(-) create mode 100644 src/leap/soledad/leap.py create mode 100644 src/leap/soledad/openstack.py diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py index 5174d818..6ba64a61 100644 --- a/src/leap/soledad/__init__.py +++ b/src/leap/soledad/__init__.py @@ -2,257 +2,5 @@ """A U1DB implementation that uses OpenStack Swift as its persistence layer.""" -try: - import simplejson as json -except ImportError: - import json # noqa - -from u1db.backends import CommonBackend, CommonSyncTarget -from u1db import ( - Document, - errors, - query_parser, - vectorclock, - ) -from u1db.remote.http_target import HTTPSyncTarget - -from swiftclient import client -import base64 - - -class OpenStackDatabase(CommonBackend): - """A U1DB implementation that uses OpenStack as its persistence layer.""" - - def __init__(self, auth_url, user, auth_key): - """Create a new OpenStack data container.""" - self._auth_url = auth_url - self._user = user - self._auth_key = auth_key - self.set_document_factory(LeapDocument) - self._connection = swiftclient.Connection(self._auth_url, self._user, - self._auth_key) - - #------------------------------------------------------------------------- - # implemented methods from Database - #------------------------------------------------------------------------- - - def set_document_factory(self, factory): - self._factory = factory - - def set_document_size_limit(self, limit): - raise NotImplementedError(self.set_document_size_limit) - - def whats_changed(self, old_generation=0): - raise NotImplementedError(self.whats_changed) - - def get_doc(self, doc_id, include_deleted=False): - raise NotImplementedError(self.get_doc) - - def get_all_docs(self, include_deleted=False): - """Get all documents from the database.""" - raise NotImplementedError(self.get_all_docs) - - def put_doc(self, doc): - raise NotImplementedError(self.put_doc) - - def delete_doc(self, doc): - raise NotImplementedError(self.delete_doc) - - # start of index-related methods: these are not supported by this backend. - - def create_index(self, index_name, *index_expressions): - return False - - def delete_index(self, index_name): - return False - - def list_indexes(self): - return [] - - def get_from_index(self, index_name, *key_values): - return [] - - def get_range_from_index(self, index_name, start_value=None, - end_value=None): - return [] - - def get_index_keys(self, index_name): - return [] - - # end of index-related methods: these are not supported by this backend. - - def get_doc_conflicts(self, doc_id): - return [] - - def resolve_doc(self, doc, conflicted_doc_revs): - raise NotImplementedError(self.resolve_doc) - - def get_sync_target(self): - return OpenStackSyncTarget(self) - - def close(self): - raise NotImplementedError(self.close) - - def sync(self, url, creds=None, autocreate=True): - raise NotImplementedError(self.close) - - def _get_replica_gen_and_trans_id(self, other_replica_uid): - raise NotImplementedError(self._get_replica_gen_and_trans_id) - - def _set_replica_gen_and_trans_id(self, other_replica_uid, - other_generation, other_transaction_id): - raise NotImplementedError(self._set_replica_gen_and_trans_id) - - #------------------------------------------------------------------------- - # implemented methods from CommonBackend - #------------------------------------------------------------------------- - - def _get_generation(self): - raise NotImplementedError(self._get_generation) - - def _get_generation_info(self): - raise NotImplementedError(self._get_generation_info) - - def _get_doc(self, doc_id, check_for_conflicts=False): - """Get just the document content, without fancy handling.""" - raise NotImplementedError(self._get_doc) - - def _has_conflicts(self, doc_id): - raise NotImplementedError(self._has_conflicts) - - def _get_transaction_log(self): - raise NotImplementedError(self._get_transaction_log) - - def _put_and_update_indexes(self, doc_id, old_doc, new_rev, content): - raise NotImplementedError(self._put_and_update_indexes) - - - def _get_trans_id_for_gen(self, generation): - raise NotImplementedError(self._get_trans_id_for_gen) - - #------------------------------------------------------------------------- - # OpenStack specific methods - #------------------------------------------------------------------------- - - def _is_initialized(self, c): - raise NotImplementedError(self._is_initialized) - - def _initialize(self, c): - raise NotImplementedError(self._initialize) - - def _get_auth(self): - self._url, self._auth_token = self._connection.get_auth(self._auth_url, - self._user, - self._auth_key) - return self._url, self.auth_token - - -class LeapDocument(Document): - - def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, - encrypted_json=None): - super(Document, self).__init__(doc_id, rev, json, has_conflicts) - if encrypted_json: - self.set_encrypted_json(encrypted_json) - - def get_encrypted_json(self): - """ - Returns document's json serialization encrypted with user's public key. - """ - # TODO: replace for openpgp encryption with users's pub key. - return base64.b64encode(self.get_json()) - - def set_encrypted_json(self): - """ - Set document's content based on encrypted version of json string. - """ - # TODO: - # - replace for openpgp decryption using user's priv key. - # - raise error if unsuccessful. - return self.set_json(base64.b64decode(self.get_json())) - - -class LeapSyncTarget(HTTPSyncTarget): - - def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None): - parts = data.splitlines() # one at a time - if not parts or parts[0] != '[': - raise BrokenSyncStream - data = parts[1:-1] - comma = False - if data: - line, comma = utils.check_and_strip_comma(data[0]) - res = json.loads(line) - if ensure_callback and 'replica_uid' in res: - ensure_callback(res['replica_uid']) - for entry in data[1:]: - if not comma: # missing in between comma - raise BrokenSyncStream - line, comma = utils.check_and_strip_comma(entry) - entry = json.loads(line) - doc = LeapDocument(entry['id'], entry['rev'], - encrypted_json=entry['content']) - return_doc_cb(doc, entry['gen'], entry['trans_id']) - if parts[-1] != ']': - try: - partdic = json.loads(parts[-1]) - except ValueError: - pass - else: - if isinstance(partdic, dict): - self._error(partdic) - raise BrokenSyncStream - if not data or comma: # no entries or bad extra comma - raise BrokenSyncStream - return res - - def sync_exchange(self, docs_by_generations, source_replica_uid, - last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback=None): - self._ensure_connection() - if self._trace_hook: # for tests - self._trace_hook('sync_exchange') - url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) - self._conn.putrequest('POST', url) - self._conn.putheader('content-type', 'application/x-u1db-sync-stream') - for header_name, header_value in self._sign_request('POST', url, {}): - self._conn.putheader(header_name, header_value) - entries = ['['] - size = 1 - - def prepare(**dic): - entry = comma + '\r\n' + json.dumps(dic) - entries.append(entry) - return len(entry) - - comma = '' - size += prepare( - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - ensure=ensure_callback is not None) - comma = ',' - for doc, gen, trans_id in docs_by_generations: - size += prepare(id=doc.doc_id, rev=doc.rev, - content=doc.get_encrypted_json(), - gen=gen, trans_id=trans_id) - entries.append('\r\n]') - size += len(entries[-1]) - self._conn.putheader('content-length', str(size)) - self._conn.endheaders() - for entry in entries: - self._conn.send(entry) - entries = None - data, _ = self._response() - res = self._parse_sync_stream(data, return_doc_cb, ensure_callback) - data = None - return res['new_generation'], res['new_transaction_id'] - - -class OpenStackSyncTarget(CommonSyncTarget): - - def get_sync_info(self, source_replica_uid): - raise NotImplementedError(self.get_sync_info) - - def record_sync_info(self, source_replica_uid, source_replica_generation, - source_replica_transaction_id): - raise NotImplementedError(self.record_sync_info) +from leap import * +from openstack import * diff --git a/src/leap/soledad/leap.py b/src/leap/soledad/leap.py new file mode 100644 index 00000000..08330618 --- /dev/null +++ b/src/leap/soledad/leap.py @@ -0,0 +1,114 @@ +try: + import simplejson as json +except ImportError: + import json # noqa + +from u1db import Document +from u1db.remote.http_target import HTTPSyncTarget +import base64 + + +class LeapDocument(Document): + """ + LEAP Documents are standard u1db documents with cabability of returning an + encrypted version of the document json string as well as setting document + content based on an encrypted version of json string. + """ + + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, + encrypted_json=None): + super(Document, self).__init__(doc_id, rev, json, has_conflicts) + if encrypted_json: + self.set_encrypted_json(encrypted_json) + + def get_encrypted_json(self): + """ + Returns document's json serialization encrypted with user's public key. + """ + # TODO: replace for openpgp encryption with users's pub key. + return base64.b64encode(self.get_json()) + + def set_encrypted_json(self): + """ + Set document's content based on encrypted version of json string. + """ + # TODO: + # - replace for openpgp decryption using user's priv key. + # - raise error if unsuccessful. + return self.set_json(base64.b64decode(self.get_json())) + + +class LeapSyncTarget(HTTPSyncTarget): + + def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None): + parts = data.splitlines() # one at a time + if not parts or parts[0] != '[': + raise BrokenSyncStream + data = parts[1:-1] + comma = False + if data: + line, comma = utils.check_and_strip_comma(data[0]) + res = json.loads(line) + if ensure_callback and 'replica_uid' in res: + ensure_callback(res['replica_uid']) + for entry in data[1:]: + if not comma: # missing in between comma + raise BrokenSyncStream + line, comma = utils.check_and_strip_comma(entry) + entry = json.loads(line) + doc = LeapDocument(entry['id'], entry['rev'], + encrypted_json=entry['content']) + return_doc_cb(doc, entry['gen'], entry['trans_id']) + if parts[-1] != ']': + try: + partdic = json.loads(parts[-1]) + except ValueError: + pass + else: + if isinstance(partdic, dict): + self._error(partdic) + raise BrokenSyncStream + if not data or comma: # no entries or bad extra comma + raise BrokenSyncStream + return res + + def sync_exchange(self, docs_by_generations, source_replica_uid, + last_known_generation, last_known_trans_id, + return_doc_cb, ensure_callback=None): + self._ensure_connection() + if self._trace_hook: # for tests + self._trace_hook('sync_exchange') + url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) + self._conn.putrequest('POST', url) + self._conn.putheader('content-type', 'application/x-u1db-sync-stream') + for header_name, header_value in self._sign_request('POST', url, {}): + self._conn.putheader(header_name, header_value) + entries = ['['] + size = 1 + + def prepare(**dic): + entry = comma + '\r\n' + json.dumps(dic) + entries.append(entry) + return len(entry) + + comma = '' + size += prepare( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + ensure=ensure_callback is not None) + comma = ',' + for doc, gen, trans_id in docs_by_generations: + size += prepare(id=doc.doc_id, rev=doc.rev, + content=doc.get_encrypted_json(), + gen=gen, trans_id=trans_id) + entries.append('\r\n]') + size += len(entries[-1]) + self._conn.putheader('content-length', str(size)) + self._conn.endheaders() + for entry in entries: + self._conn.send(entry) + entries = None + data, _ = self._response() + res = self._parse_sync_stream(data, return_doc_cb, ensure_callback) + data = None + return res['new_generation'], res['new_transaction_id'] diff --git a/src/leap/soledad/openstack.py b/src/leap/soledad/openstack.py new file mode 100644 index 00000000..514a4c58 --- /dev/null +++ b/src/leap/soledad/openstack.py @@ -0,0 +1,141 @@ +from u1db.backends import CommonBackend +from leap import * +from u1db.remote.http_target import HTTPSyncTarget +from swiftclient import client + + +class OpenStackDatabase(CommonBackend): + """A U1DB implementation that uses OpenStack as its persistence layer.""" + + def __init__(self, auth_url, user, auth_key): + """Create a new OpenStack data container.""" + self._auth_url = auth_url + self._user = user + self._auth_key = auth_key + self.set_document_factory(LeapDocument) + self._connection = swiftclient.Connection(self._auth_url, self._user, + self._auth_key) + + #------------------------------------------------------------------------- + # implemented methods from Database + #------------------------------------------------------------------------- + + def set_document_factory(self, factory): + self._factory = factory + + def set_document_size_limit(self, limit): + raise NotImplementedError(self.set_document_size_limit) + + def whats_changed(self, old_generation=0): + raise NotImplementedError(self.whats_changed) + + def get_doc(self, doc_id, include_deleted=False): + raise NotImplementedError(self.get_doc) + + def get_all_docs(self, include_deleted=False): + """Get all documents from the database.""" + raise NotImplementedError(self.get_all_docs) + + def put_doc(self, doc): + raise NotImplementedError(self.put_doc) + + def delete_doc(self, doc): + raise NotImplementedError(self.delete_doc) + + # start of index-related methods: these are not supported by this backend. + + def create_index(self, index_name, *index_expressions): + return False + + def delete_index(self, index_name): + return False + + def list_indexes(self): + return [] + + def get_from_index(self, index_name, *key_values): + return [] + + def get_range_from_index(self, index_name, start_value=None, + end_value=None): + return [] + + def get_index_keys(self, index_name): + return [] + + # end of index-related methods: these are not supported by this backend. + + def get_doc_conflicts(self, doc_id): + return [] + + def resolve_doc(self, doc, conflicted_doc_revs): + raise NotImplementedError(self.resolve_doc) + + def get_sync_target(self): + return OpenStackSyncTarget(self) + + def close(self): + raise NotImplementedError(self.close) + + def sync(self, url, creds=None, autocreate=True): + raise NotImplementedError(self.close) + + def _get_replica_gen_and_trans_id(self, other_replica_uid): + raise NotImplementedError(self._get_replica_gen_and_trans_id) + + def _set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + raise NotImplementedError(self._set_replica_gen_and_trans_id) + + #------------------------------------------------------------------------- + # implemented methods from CommonBackend + #------------------------------------------------------------------------- + + def _get_generation(self): + raise NotImplementedError(self._get_generation) + + def _get_generation_info(self): + raise NotImplementedError(self._get_generation_info) + + def _get_doc(self, doc_id, check_for_conflicts=False): + """Get just the document content, without fancy handling.""" + raise NotImplementedError(self._get_doc) + + def _has_conflicts(self, doc_id): + raise NotImplementedError(self._has_conflicts) + + def _get_transaction_log(self): + raise NotImplementedError(self._get_transaction_log) + + def _put_and_update_indexes(self, doc_id, old_doc, new_rev, content): + raise NotImplementedError(self._put_and_update_indexes) + + + def _get_trans_id_for_gen(self, generation): + raise NotImplementedError(self._get_trans_id_for_gen) + + #------------------------------------------------------------------------- + # OpenStack specific methods + #------------------------------------------------------------------------- + + def _is_initialized(self, c): + raise NotImplementedError(self._is_initialized) + + def _initialize(self, c): + raise NotImplementedError(self._initialize) + + def _get_auth(self): + self._url, self._auth_token = self._connection.get_auth(self._auth_url, + self._user, + self._auth_key) + return self._url, self.auth_token + + +class OpenStackSyncTarget(HTTPSyncTarget): + + def get_sync_info(self, source_replica_uid): + raise NotImplementedError(self.get_sync_info) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + raise NotImplementedError(self.record_sync_info) -- cgit v1.2.3