summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2012-12-03 11:13:51 -0200
committerdrebs <drebs@leap.se>2012-12-03 11:13:51 -0200
commit7a932811c018bb30b584451d4fe114cf69ab420c (patch)
tree64947e47f4ef84170953a0a42848f9a48fd3840b
parentdae0dacb59e2b06b681fab88ddefb038b7e16bb6 (diff)
Split leap infrastructure and openstack backend in different files.
-rw-r--r--src/leap/soledad/__init__.py256
-rw-r--r--src/leap/soledad/leap.py114
-rw-r--r--src/leap/soledad/openstack.py141
3 files changed, 257 insertions, 254 deletions
diff --git a/src/leap/soledad/__init__.py b/src/leap/soledad/__init__.py
index 5174d818..6ba64a61 100644
--- a/src/leap/soledad/__init__.py
+++ b/src/leap/soledad/__init__.py
@@ -2,257 +2,5 @@
"""A U1DB implementation that uses OpenStack Swift as its persistence layer."""
-try:
- import simplejson as json
-except ImportError:
- import json # noqa
-
-from u1db.backends import CommonBackend, CommonSyncTarget
-from u1db import (
- Document,
- errors,
- query_parser,
- vectorclock,
- )
-from u1db.remote.http_target import HTTPSyncTarget
-
-from swiftclient import client
-import base64
-
-
-class OpenStackDatabase(CommonBackend):
- """A U1DB implementation that uses OpenStack as its persistence layer."""
-
- def __init__(self, auth_url, user, auth_key):
- """Create a new OpenStack data container."""
- self._auth_url = auth_url
- self._user = user
- self._auth_key = auth_key
- self.set_document_factory(LeapDocument)
- self._connection = swiftclient.Connection(self._auth_url, self._user,
- self._auth_key)
-
- #-------------------------------------------------------------------------
- # implemented methods from Database
- #-------------------------------------------------------------------------
-
- def set_document_factory(self, factory):
- self._factory = factory
-
- def set_document_size_limit(self, limit):
- raise NotImplementedError(self.set_document_size_limit)
-
- def whats_changed(self, old_generation=0):
- raise NotImplementedError(self.whats_changed)
-
- def get_doc(self, doc_id, include_deleted=False):
- raise NotImplementedError(self.get_doc)
-
- def get_all_docs(self, include_deleted=False):
- """Get all documents from the database."""
- raise NotImplementedError(self.get_all_docs)
-
- def put_doc(self, doc):
- raise NotImplementedError(self.put_doc)
-
- def delete_doc(self, doc):
- raise NotImplementedError(self.delete_doc)
-
- # start of index-related methods: these are not supported by this backend.
-
- def create_index(self, index_name, *index_expressions):
- return False
-
- def delete_index(self, index_name):
- return False
-
- def list_indexes(self):
- return []
-
- def get_from_index(self, index_name, *key_values):
- return []
-
- def get_range_from_index(self, index_name, start_value=None,
- end_value=None):
- return []
-
- def get_index_keys(self, index_name):
- return []
-
- # end of index-related methods: these are not supported by this backend.
-
- def get_doc_conflicts(self, doc_id):
- return []
-
- def resolve_doc(self, doc, conflicted_doc_revs):
- raise NotImplementedError(self.resolve_doc)
-
- def get_sync_target(self):
- return OpenStackSyncTarget(self)
-
- def close(self):
- raise NotImplementedError(self.close)
-
- def sync(self, url, creds=None, autocreate=True):
- raise NotImplementedError(self.close)
-
- def _get_replica_gen_and_trans_id(self, other_replica_uid):
- raise NotImplementedError(self._get_replica_gen_and_trans_id)
-
- def _set_replica_gen_and_trans_id(self, other_replica_uid,
- other_generation, other_transaction_id):
- raise NotImplementedError(self._set_replica_gen_and_trans_id)
-
- #-------------------------------------------------------------------------
- # implemented methods from CommonBackend
- #-------------------------------------------------------------------------
-
- def _get_generation(self):
- raise NotImplementedError(self._get_generation)
-
- def _get_generation_info(self):
- raise NotImplementedError(self._get_generation_info)
-
- def _get_doc(self, doc_id, check_for_conflicts=False):
- """Get just the document content, without fancy handling."""
- raise NotImplementedError(self._get_doc)
-
- def _has_conflicts(self, doc_id):
- raise NotImplementedError(self._has_conflicts)
-
- def _get_transaction_log(self):
- raise NotImplementedError(self._get_transaction_log)
-
- def _put_and_update_indexes(self, doc_id, old_doc, new_rev, content):
- raise NotImplementedError(self._put_and_update_indexes)
-
-
- def _get_trans_id_for_gen(self, generation):
- raise NotImplementedError(self._get_trans_id_for_gen)
-
- #-------------------------------------------------------------------------
- # OpenStack specific methods
- #-------------------------------------------------------------------------
-
- def _is_initialized(self, c):
- raise NotImplementedError(self._is_initialized)
-
- def _initialize(self, c):
- raise NotImplementedError(self._initialize)
-
- def _get_auth(self):
- self._url, self._auth_token = self._connection.get_auth(self._auth_url,
- self._user,
- self._auth_key)
- return self._url, self.auth_token
-
-
-class LeapDocument(Document):
-
- def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False,
- encrypted_json=None):
- super(Document, self).__init__(doc_id, rev, json, has_conflicts)
- if encrypted_json:
- self.set_encrypted_json(encrypted_json)
-
- def get_encrypted_json(self):
- """
- Returns document's json serialization encrypted with user's public key.
- """
- # TODO: replace for openpgp encryption with users's pub key.
- return base64.b64encode(self.get_json())
-
- def set_encrypted_json(self):
- """
- Set document's content based on encrypted version of json string.
- """
- # TODO:
- # - replace for openpgp decryption using user's priv key.
- # - raise error if unsuccessful.
- return self.set_json(base64.b64decode(self.get_json()))
-
-
-class LeapSyncTarget(HTTPSyncTarget):
-
- def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None):
- parts = data.splitlines() # one at a time
- if not parts or parts[0] != '[':
- raise BrokenSyncStream
- data = parts[1:-1]
- comma = False
- if data:
- line, comma = utils.check_and_strip_comma(data[0])
- res = json.loads(line)
- if ensure_callback and 'replica_uid' in res:
- ensure_callback(res['replica_uid'])
- for entry in data[1:]:
- if not comma: # missing in between comma
- raise BrokenSyncStream
- line, comma = utils.check_and_strip_comma(entry)
- entry = json.loads(line)
- doc = LeapDocument(entry['id'], entry['rev'],
- encrypted_json=entry['content'])
- return_doc_cb(doc, entry['gen'], entry['trans_id'])
- if parts[-1] != ']':
- try:
- partdic = json.loads(parts[-1])
- except ValueError:
- pass
- else:
- if isinstance(partdic, dict):
- self._error(partdic)
- raise BrokenSyncStream
- if not data or comma: # no entries or bad extra comma
- raise BrokenSyncStream
- return res
-
- def sync_exchange(self, docs_by_generations, source_replica_uid,
- last_known_generation, last_known_trans_id,
- return_doc_cb, ensure_callback=None):
- self._ensure_connection()
- if self._trace_hook: # for tests
- self._trace_hook('sync_exchange')
- url = '%s/sync-from/%s' % (self._url.path, source_replica_uid)
- self._conn.putrequest('POST', url)
- self._conn.putheader('content-type', 'application/x-u1db-sync-stream')
- for header_name, header_value in self._sign_request('POST', url, {}):
- self._conn.putheader(header_name, header_value)
- entries = ['[']
- size = 1
-
- def prepare(**dic):
- entry = comma + '\r\n' + json.dumps(dic)
- entries.append(entry)
- return len(entry)
-
- comma = ''
- size += prepare(
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- ensure=ensure_callback is not None)
- comma = ','
- for doc, gen, trans_id in docs_by_generations:
- size += prepare(id=doc.doc_id, rev=doc.rev,
- content=doc.get_encrypted_json(),
- gen=gen, trans_id=trans_id)
- entries.append('\r\n]')
- size += len(entries[-1])
- self._conn.putheader('content-length', str(size))
- self._conn.endheaders()
- for entry in entries:
- self._conn.send(entry)
- entries = None
- data, _ = self._response()
- res = self._parse_sync_stream(data, return_doc_cb, ensure_callback)
- data = None
- return res['new_generation'], res['new_transaction_id']
-
-
-class OpenStackSyncTarget(CommonSyncTarget):
-
- def get_sync_info(self, source_replica_uid):
- raise NotImplementedError(self.get_sync_info)
-
- def record_sync_info(self, source_replica_uid, source_replica_generation,
- source_replica_transaction_id):
- raise NotImplementedError(self.record_sync_info)
+from leap import *
+from openstack import *
diff --git a/src/leap/soledad/leap.py b/src/leap/soledad/leap.py
new file mode 100644
index 00000000..08330618
--- /dev/null
+++ b/src/leap/soledad/leap.py
@@ -0,0 +1,114 @@
+try:
+ import simplejson as json
+except ImportError:
+ import json # noqa
+
+from u1db import Document
+from u1db.remote.http_target import HTTPSyncTarget
+import base64
+
+
+class LeapDocument(Document):
+ """
+ LEAP Documents are standard u1db documents with cabability of returning an
+ encrypted version of the document json string as well as setting document
+ content based on an encrypted version of json string.
+ """
+
+ def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False,
+ encrypted_json=None):
+ super(Document, self).__init__(doc_id, rev, json, has_conflicts)
+ if encrypted_json:
+ self.set_encrypted_json(encrypted_json)
+
+ def get_encrypted_json(self):
+ """
+ Returns document's json serialization encrypted with user's public key.
+ """
+ # TODO: replace for openpgp encryption with users's pub key.
+ return base64.b64encode(self.get_json())
+
+ def set_encrypted_json(self):
+ """
+ Set document's content based on encrypted version of json string.
+ """
+ # TODO:
+ # - replace for openpgp decryption using user's priv key.
+ # - raise error if unsuccessful.
+ return self.set_json(base64.b64decode(self.get_json()))
+
+
+class LeapSyncTarget(HTTPSyncTarget):
+
+ def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None):
+ parts = data.splitlines() # one at a time
+ if not parts or parts[0] != '[':
+ raise BrokenSyncStream
+ data = parts[1:-1]
+ comma = False
+ if data:
+ line, comma = utils.check_and_strip_comma(data[0])
+ res = json.loads(line)
+ if ensure_callback and 'replica_uid' in res:
+ ensure_callback(res['replica_uid'])
+ for entry in data[1:]:
+ if not comma: # missing in between comma
+ raise BrokenSyncStream
+ line, comma = utils.check_and_strip_comma(entry)
+ entry = json.loads(line)
+ doc = LeapDocument(entry['id'], entry['rev'],
+ encrypted_json=entry['content'])
+ return_doc_cb(doc, entry['gen'], entry['trans_id'])
+ if parts[-1] != ']':
+ try:
+ partdic = json.loads(parts[-1])
+ except ValueError:
+ pass
+ else:
+ if isinstance(partdic, dict):
+ self._error(partdic)
+ raise BrokenSyncStream
+ if not data or comma: # no entries or bad extra comma
+ raise BrokenSyncStream
+ return res
+
+ def sync_exchange(self, docs_by_generations, source_replica_uid,
+ last_known_generation, last_known_trans_id,
+ return_doc_cb, ensure_callback=None):
+ self._ensure_connection()
+ if self._trace_hook: # for tests
+ self._trace_hook('sync_exchange')
+ url = '%s/sync-from/%s' % (self._url.path, source_replica_uid)
+ self._conn.putrequest('POST', url)
+ self._conn.putheader('content-type', 'application/x-u1db-sync-stream')
+ for header_name, header_value in self._sign_request('POST', url, {}):
+ self._conn.putheader(header_name, header_value)
+ entries = ['[']
+ size = 1
+
+ def prepare(**dic):
+ entry = comma + '\r\n' + json.dumps(dic)
+ entries.append(entry)
+ return len(entry)
+
+ comma = ''
+ size += prepare(
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ ensure=ensure_callback is not None)
+ comma = ','
+ for doc, gen, trans_id in docs_by_generations:
+ size += prepare(id=doc.doc_id, rev=doc.rev,
+ content=doc.get_encrypted_json(),
+ gen=gen, trans_id=trans_id)
+ entries.append('\r\n]')
+ size += len(entries[-1])
+ self._conn.putheader('content-length', str(size))
+ self._conn.endheaders()
+ for entry in entries:
+ self._conn.send(entry)
+ entries = None
+ data, _ = self._response()
+ res = self._parse_sync_stream(data, return_doc_cb, ensure_callback)
+ data = None
+ return res['new_generation'], res['new_transaction_id']
diff --git a/src/leap/soledad/openstack.py b/src/leap/soledad/openstack.py
new file mode 100644
index 00000000..514a4c58
--- /dev/null
+++ b/src/leap/soledad/openstack.py
@@ -0,0 +1,141 @@
+from u1db.backends import CommonBackend
+from leap import *
+from u1db.remote.http_target import HTTPSyncTarget
+from swiftclient import client
+
+
+class OpenStackDatabase(CommonBackend):
+ """A U1DB implementation that uses OpenStack as its persistence layer."""
+
+ def __init__(self, auth_url, user, auth_key):
+ """Create a new OpenStack data container."""
+ self._auth_url = auth_url
+ self._user = user
+ self._auth_key = auth_key
+ self.set_document_factory(LeapDocument)
+ self._connection = swiftclient.Connection(self._auth_url, self._user,
+ self._auth_key)
+
+ #-------------------------------------------------------------------------
+ # implemented methods from Database
+ #-------------------------------------------------------------------------
+
+ def set_document_factory(self, factory):
+ self._factory = factory
+
+ def set_document_size_limit(self, limit):
+ raise NotImplementedError(self.set_document_size_limit)
+
+ def whats_changed(self, old_generation=0):
+ raise NotImplementedError(self.whats_changed)
+
+ def get_doc(self, doc_id, include_deleted=False):
+ raise NotImplementedError(self.get_doc)
+
+ def get_all_docs(self, include_deleted=False):
+ """Get all documents from the database."""
+ raise NotImplementedError(self.get_all_docs)
+
+ def put_doc(self, doc):
+ raise NotImplementedError(self.put_doc)
+
+ def delete_doc(self, doc):
+ raise NotImplementedError(self.delete_doc)
+
+ # start of index-related methods: these are not supported by this backend.
+
+ def create_index(self, index_name, *index_expressions):
+ return False
+
+ def delete_index(self, index_name):
+ return False
+
+ def list_indexes(self):
+ return []
+
+ def get_from_index(self, index_name, *key_values):
+ return []
+
+ def get_range_from_index(self, index_name, start_value=None,
+ end_value=None):
+ return []
+
+ def get_index_keys(self, index_name):
+ return []
+
+ # end of index-related methods: these are not supported by this backend.
+
+ def get_doc_conflicts(self, doc_id):
+ return []
+
+ def resolve_doc(self, doc, conflicted_doc_revs):
+ raise NotImplementedError(self.resolve_doc)
+
+ def get_sync_target(self):
+ return OpenStackSyncTarget(self)
+
+ def close(self):
+ raise NotImplementedError(self.close)
+
+ def sync(self, url, creds=None, autocreate=True):
+ raise NotImplementedError(self.close)
+
+ def _get_replica_gen_and_trans_id(self, other_replica_uid):
+ raise NotImplementedError(self._get_replica_gen_and_trans_id)
+
+ def _set_replica_gen_and_trans_id(self, other_replica_uid,
+ other_generation, other_transaction_id):
+ raise NotImplementedError(self._set_replica_gen_and_trans_id)
+
+ #-------------------------------------------------------------------------
+ # implemented methods from CommonBackend
+ #-------------------------------------------------------------------------
+
+ def _get_generation(self):
+ raise NotImplementedError(self._get_generation)
+
+ def _get_generation_info(self):
+ raise NotImplementedError(self._get_generation_info)
+
+ def _get_doc(self, doc_id, check_for_conflicts=False):
+ """Get just the document content, without fancy handling."""
+ raise NotImplementedError(self._get_doc)
+
+ def _has_conflicts(self, doc_id):
+ raise NotImplementedError(self._has_conflicts)
+
+ def _get_transaction_log(self):
+ raise NotImplementedError(self._get_transaction_log)
+
+ def _put_and_update_indexes(self, doc_id, old_doc, new_rev, content):
+ raise NotImplementedError(self._put_and_update_indexes)
+
+
+ def _get_trans_id_for_gen(self, generation):
+ raise NotImplementedError(self._get_trans_id_for_gen)
+
+ #-------------------------------------------------------------------------
+ # OpenStack specific methods
+ #-------------------------------------------------------------------------
+
+ def _is_initialized(self, c):
+ raise NotImplementedError(self._is_initialized)
+
+ def _initialize(self, c):
+ raise NotImplementedError(self._initialize)
+
+ def _get_auth(self):
+ self._url, self._auth_token = self._connection.get_auth(self._auth_url,
+ self._user,
+ self._auth_key)
+ return self._url, self.auth_token
+
+
+class OpenStackSyncTarget(HTTPSyncTarget):
+
+ def get_sync_info(self, source_replica_uid):
+ raise NotImplementedError(self.get_sync_info)
+
+ def record_sync_info(self, source_replica_uid, source_replica_generation,
+ source_replica_transaction_id):
+ raise NotImplementedError(self.record_sync_info)