summaryrefslogtreecommitdiff
path: root/src/leap/soledad/backends
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2012-12-06 11:07:53 -0200
committerdrebs <drebs@leap.se>2012-12-06 11:07:53 -0200
commit584696e4dbfc13b793208dc4c5c6cdc224db5a12 (patch)
treea52830511908c8a135acc16095d61acba177873b /src/leap/soledad/backends
parent2cf00360bce0193d8fa73194a148c28426172043 (diff)
Remove u1db and swiftclient dirs and refactor.
Diffstat (limited to 'src/leap/soledad/backends')
-rw-r--r--src/leap/soledad/backends/__init__.py0
-rw-r--r--src/leap/soledad/backends/leap.py157
-rw-r--r--src/leap/soledad/backends/openstack.py369
3 files changed, 526 insertions, 0 deletions
diff --git a/src/leap/soledad/backends/__init__.py b/src/leap/soledad/backends/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/leap/soledad/backends/__init__.py
diff --git a/src/leap/soledad/backends/leap.py b/src/leap/soledad/backends/leap.py
new file mode 100644
index 00000000..2c815632
--- /dev/null
+++ b/src/leap/soledad/backends/leap.py
@@ -0,0 +1,157 @@
+try:
+ import simplejson as json
+except ImportError:
+ import json # noqa
+
+from u1db import Document
+from u1db.remote.http_target import HTTPSyncTarget
+from u1db.remote.http_database import HTTPDatabase
+import base64
+
+
+class NoDefaultKey(Exception):
+ pass
+
+
+class LeapDocument(Document):
+ """
+ LEAP Documents are standard u1db documents with cabability of returning an
+ encrypted version of the document json string as well as setting document
+ content based on an encrypted version of json string.
+ """
+
+ def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False,
+ encrypted_json=None, default_key=None, gpg_wrapper=None):
+ super(LeapDocument, self).__init__(doc_id, rev, json, has_conflicts)
+ # we might want to get already initialized wrappers for testing.
+ if gpg_wrapper is None:
+ self._gpg = GPGWrapper()
+ else:
+ self._gpg = gpg_wrapper
+ if encrypted_json:
+ self.set_encrypted_json(encrypted_json)
+ self._default_key = default_key
+
+ def get_encrypted_json(self):
+ """
+ Returns document's json serialization encrypted with user's public key.
+ """
+ if self._default_key is None:
+ raise NoDefaultKey()
+ cyphertext = self._gpg.encrypt(self.get_json(),
+ self._default_key,
+ always_trust = True)
+ # TODO: always trust?
+ return json.dumps({'cyphertext' : str(cyphertext)})
+
+ def set_encrypted_json(self, encrypted_json):
+ """
+ Set document's content based on encrypted version of json string.
+ """
+ cyphertext = json.loads(encrypted_json)['cyphertext']
+ plaintext = str(self._gpg.decrypt(cyphertext))
+ return self.set_json(plaintext)
+
+
+class LeapDatabase(HTTPDatabase):
+ """Implement the HTTP remote database API to a Leap server."""
+
+ @staticmethod
+ def open_database(url, create):
+ db = LeapDatabase(url)
+ db.open(create)
+ return db
+
+ @staticmethod
+ def delete_database(url):
+ db = LeapDatabase(url)
+ db._delete()
+ db.close()
+
+ def get_sync_target(self):
+ st = LeapSyncTarget(self._url.geturl())
+ st._creds = self._creds
+ return st
+
+
+class LeapSyncTarget(HTTPSyncTarget):
+
+ def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None):
+ """
+ Does the same as parent's method but ensures incoming content will be
+ decrypted.
+ """
+ parts = data.splitlines() # one at a time
+ if not parts or parts[0] != '[':
+ raise BrokenSyncStream
+ data = parts[1:-1]
+ comma = False
+ if data:
+ line, comma = utils.check_and_strip_comma(data[0])
+ res = json.loads(line)
+ if ensure_callback and 'replica_uid' in res:
+ ensure_callback(res['replica_uid'])
+ for entry in data[1:]:
+ if not comma: # missing in between comma
+ raise BrokenSyncStream
+ line, comma = utils.check_and_strip_comma(entry)
+ entry = json.loads(line)
+ doc = LeapDocument(entry['id'], entry['rev'],
+ encrypted_json=entry['content'])
+ return_doc_cb(doc, entry['gen'], entry['trans_id'])
+ if parts[-1] != ']':
+ try:
+ partdic = json.loads(parts[-1])
+ except ValueError:
+ pass
+ else:
+ if isinstance(partdic, dict):
+ self._error(partdic)
+ raise BrokenSyncStream
+ if not data or comma: # no entries or bad extra comma
+ raise BrokenSyncStream
+ return res
+
+ def sync_exchange(self, docs_by_generations, source_replica_uid,
+ last_known_generation, last_known_trans_id,
+ return_doc_cb, ensure_callback=None):
+ """
+ Does the same as parent's method but encrypts content before syncing.
+ """
+ self._ensure_connection()
+ if self._trace_hook: # for tests
+ self._trace_hook('sync_exchange')
+ url = '%s/sync-from/%s' % (self._url.path, source_replica_uid)
+ self._conn.putrequest('POST', url)
+ self._conn.putheader('content-type', 'application/x-u1db-sync-stream')
+ for header_name, header_value in self._sign_request('POST', url, {}):
+ self._conn.putheader(header_name, header_value)
+ entries = ['[']
+ size = 1
+
+ def prepare(**dic):
+ entry = comma + '\r\n' + json.dumps(dic)
+ entries.append(entry)
+ return len(entry)
+
+ comma = ''
+ size += prepare(
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ ensure=ensure_callback is not None)
+ comma = ','
+ for doc, gen, trans_id in docs_by_generations:
+ size += prepare(id=doc.doc_id, rev=doc.rev,
+ content=doc.get_encrypted_json(),
+ gen=gen, trans_id=trans_id)
+ entries.append('\r\n]')
+ size += len(entries[-1])
+ self._conn.putheader('content-length', str(size))
+ self._conn.endheaders()
+ for entry in entries:
+ self._conn.send(entry)
+ entries = None
+ data, _ = self._response()
+ res = self._parse_sync_stream(data, return_doc_cb, ensure_callback)
+ data = None
+ return res['new_generation'], res['new_transaction_id']
diff --git a/src/leap/soledad/backends/openstack.py b/src/leap/soledad/backends/openstack.py
new file mode 100644
index 00000000..ec4609b4
--- /dev/null
+++ b/src/leap/soledad/backends/openstack.py
@@ -0,0 +1,369 @@
+from leap import *
+from u1db import errors
+from u1db.backends import CommonBackend
+from u1db.remote.http_target import HTTPSyncTarget
+from swiftclient import client
+
+
+class OpenStackDatabase(CommonBackend):
+ """A U1DB implementation that uses OpenStack as its persistence layer."""
+
+ def __init__(self, auth_url, user, auth_key, container):
+ """Create a new OpenStack data container."""
+ self._auth_url = auth_url
+ self._user = user
+ self._auth_key = auth_key
+ self._container = container
+ self.set_document_factory(LeapDocument)
+ self._connection = swiftclient.Connection(self._auth_url, self._user,
+ self._auth_key)
+ self._get_auth()
+ self._ensure_u1db_data()
+
+ #-------------------------------------------------------------------------
+ # implemented methods from Database
+ #-------------------------------------------------------------------------
+
+ def set_document_factory(self, factory):
+ self._factory = factory
+
+ def set_document_size_limit(self, limit):
+ raise NotImplementedError(self.set_document_size_limit)
+
+ def whats_changed(self, old_generation=0):
+ self._get_u1db_data()
+ # This method is implemented in TransactionLog because testing is
+ # easier like this for now, but it can be moved to here afterwards.
+ return self._transaction_log.whats_changed(old_generation)
+
+ def _get_doc(self, doc_id, check_for_conflicts=False):
+ """Get just the document content, without fancy handling.
+
+ Conflicts do not happen on server side, so there's no need to check
+ for them.
+ """
+ try:
+ response, contents = self._connection.get_object(self._container, doc_id)
+ rev = response['x-object-meta-rev']
+ return self._factory(doc_id, rev, contents)
+ except swiftclient.ClientException:
+ return None
+
+ def get_doc(self, doc_id, include_deleted=False):
+ doc = self._get_doc(doc_id, check_for_conflicts=True)
+ if doc is None:
+ return None
+ if doc.is_tombstone() and not include_deleted:
+ return None
+ return doc
+
+ def get_all_docs(self, include_deleted=False):
+ """Get all documents from the database."""
+ generation = self._get_generation()
+ results = []
+ _, doc_ids = self._connection.get_container(self._container,
+ full_listing=True)
+ for doc_id in doc_ids:
+ doc = self._get_doc(doc_id)
+ if doc.content is None and not include_deleted:
+ continue
+ results.append(doc)
+ return (generation, results)
+
+ def put_doc(self, doc):
+ if doc.doc_id is None:
+ raise errors.InvalidDocId()
+ self._check_doc_id(doc.doc_id)
+ self._check_doc_size(doc)
+ # TODO: check for conflicts?
+ new_rev = self._allocate_doc_rev(doc.rev)
+ headers = { 'X-Object-Meta-Rev' : new_rev }
+ self._connection.put_object(self._container, doc_id, doc.get_json(),
+ headers=headers)
+ new_gen = self._get_generation() + 1
+ trans_id = self._allocate_transaction_id()
+ self._transaction_log.append((new_gen, doc.doc_id, trans_id))
+ self._set_u1db_data()
+ return new_rev
+
+ def delete_doc(self, doc):
+ old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True)
+ if old_doc is None:
+ raise errors.DocumentDoesNotExist
+ if old_doc.rev != doc.rev:
+ raise errors.RevisionConflict()
+ if old_doc.is_tombstone():
+ raise errors.DocumentAlreadyDeleted
+ if old_doc.has_conflicts:
+ raise errors.ConflictedDoc()
+ new_rev = self._allocate_doc_rev(doc.rev)
+ doc.rev = new_rev
+ doc.make_tombstone()
+ self._put_doc(olddoc)
+ return new_rev
+
+ # start of index-related methods: these are not supported by this backend.
+
+ def create_index(self, index_name, *index_expressions):
+ return False
+
+ def delete_index(self, index_name):
+ return False
+
+ def list_indexes(self):
+ return []
+
+ def get_from_index(self, index_name, *key_values):
+ return []
+
+ def get_range_from_index(self, index_name, start_value=None,
+ end_value=None):
+ return []
+
+ def get_index_keys(self, index_name):
+ return []
+
+ # end of index-related methods: these are not supported by this backend.
+
+ def get_doc_conflicts(self, doc_id):
+ return []
+
+ def resolve_doc(self, doc, conflicted_doc_revs):
+ raise NotImplementedError(self.resolve_doc)
+
+ def get_sync_target(self):
+ return OpenStackSyncTarget(self)
+
+ def close(self):
+ raise NotImplementedError(self.close)
+
+ def sync(self, url, creds=None, autocreate=True):
+ from u1db.sync import Synchronizer
+ from u1db.remote.http_target import OpenStackSyncTarget
+ return Synchronizer(self, OpenStackSyncTarget(url, creds=creds)).sync(
+ autocreate=autocreate)
+
+ def _get_replica_gen_and_trans_id(self, other_replica_uid):
+ self._get_u1db_data()
+ return self._sync_log.get_replica_gen_and_trans_id(other_replica_uid)
+
+ def _set_replica_gen_and_trans_id(self, other_replica_uid,
+ other_generation, other_transaction_id):
+ self._get_u1db_data()
+ self._sync_log.set_replica_gen_and_trans_id(other_replica_uid,
+ other_generation,
+ other_transaction_id)
+ self._set_u1db_data()
+
+ #-------------------------------------------------------------------------
+ # implemented methods from CommonBackend
+ #-------------------------------------------------------------------------
+
+ def _get_generation(self):
+ self._get_u1db_data()
+ return self._transaction_log.get_generation()
+
+ def _get_generation_info(self):
+ self._get_u1db_data()
+ return self._transaction_log.get_generation_info()
+
+ def _has_conflicts(self, doc_id):
+ # Documents never have conflicts on server.
+ return False
+
+ def _put_and_update_indexes(self, doc_id, old_doc, new_rev, content):
+ raise NotImplementedError(self._put_and_update_indexes)
+
+
+ def _get_trans_id_for_gen(self, generation):
+ self._get_u1db_data()
+ trans_id = self._transaction_log.get_trans_id_for_gen(generation)
+ if trans_id is None:
+ raise errors.InvalidGeneration
+ return trans_id
+
+ #-------------------------------------------------------------------------
+ # OpenStack specific methods
+ #-------------------------------------------------------------------------
+
+ def _ensure_u1db_data(self):
+ """
+ Guarantee that u1db data exists in store.
+ """
+ if self._is_initialized():
+ return
+ self._initialize()
+
+ def _is_initialized(self):
+ """
+ Verify if u1db data exists in store.
+ """
+ if not self._get_doc('u1db_data'):
+ return False
+ return True
+
+ def _initialize(self):
+ """
+ Create u1db data object in store.
+ """
+ content = { 'transaction_log' : [],
+ 'sync_log' : [] }
+ doc = self.create_doc('u1db_data', content)
+
+ def _get_auth(self):
+ self._url, self._auth_token = self._connection.get_auth()
+ return self._url, self.auth_token
+
+ def _get_u1db_data(self):
+ data = self.get_doc('u1db_data').content
+ self._transaction_log = data['transaction_log']
+ self._sync_log = data['sync_log']
+
+ def _set_u1db_data(self):
+ doc = self._factory('u1db_data')
+ doc.content = { 'transaction_log' : self._transaction_log,
+ 'sync_log' : self._sync_log }
+ self.put_doc(doc)
+
+
+class OpenStackSyncTarget(HTTPSyncTarget):
+
+ def get_sync_info(self, source_replica_uid):
+ source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id(
+ source_replica_uid)
+ my_gen, my_trans_id = self._db._get_generation_info()
+ return (
+ self._db._replica_uid, my_gen, my_trans_id, source_gen,
+ source_trans_id)
+
+ def record_sync_info(self, source_replica_uid, source_replica_generation,
+ source_replica_transaction_id):
+ if self._trace_hook:
+ self._trace_hook('record_sync_info')
+ self._db._set_replica_gen_and_trans_id(
+ source_replica_uid, source_replica_generation,
+ source_replica_transaction_id)
+
+
+class SimpleLog(object):
+ def __init__(self):
+ self._log = []
+
+ def _set_log(self, log):
+ self._log = log
+
+ def _get_log(self):
+ return self._log
+
+ log = property(
+ _get_log, _set_log, doc="Log contents.")
+
+ def append(self, msg):
+ self._log.append(msg)
+
+ def reduce(self, func, initializer=None):
+ return reduce(func, self.log, initializer)
+
+ def map(self, func):
+ return map(func, self.log)
+
+ def filter(self, func):
+ return filter(func, self.log)
+
+
+class TransactionLog(SimpleLog):
+ """
+ A list of (generation, doc_id, transaction_id) tuples.
+ """
+
+ def _set_log(self, log):
+ self._log = log
+
+ def _get_log(self):
+ return sorted(self._log, reverse=True)
+
+ log = property(
+ _get_log, _set_log, doc="Log contents.")
+
+ def get_generation(self):
+ """
+ Return the current generation.
+ """
+ gens = self.map(lambda x: x[0])
+ if not gens:
+ return 0
+ return max(gens)
+
+ def get_generation_info(self):
+ """
+ Return the current generation and transaction id.
+ """
+ if not self._log:
+ return(0, '')
+ info = self.map(lambda x: (x[0], x[2]))
+ return reduce(lambda x, y: x if (x[0] > y[0]) else y, info)
+
+ def get_trans_id_for_gen(self, gen):
+ """
+ Get the transaction id corresponding to a particular generation.
+ """
+ log = self.reduce(lambda x, y: y if y[0] == gen else x)
+ if log is None:
+ return None
+ return log[2]
+
+ def whats_changed(self, old_generation):
+ results = self.filter(lambda x: x[0] > old_generation)
+ seen = set()
+ changes = []
+ newest_trans_id = ''
+ for generation, doc_id, trans_id in results:
+ if doc_id not in seen:
+ changes.append((doc_id, generation, trans_id))
+ seen.add(doc_id)
+ if changes:
+ cur_gen = changes[0][1] # max generation
+ newest_trans_id = changes[0][2]
+ changes.reverse()
+ else:
+ results = self.log
+ if not results:
+ cur_gen = 0
+ newest_trans_id = ''
+ else:
+ cur_gen, _, newest_trans_id = results[0]
+
+ return cur_gen, newest_trans_id, changes
+
+
+
+class SyncLog(SimpleLog):
+ """
+ A list of (replica_id, generation, transaction_id) tuples.
+ """
+
+ def find_by_replica_uid(self, replica_uid):
+ if not self.log:
+ return ()
+ return self.reduce(lambda x, y: y if y[0] == replica_uid else x)
+
+ def get_replica_gen_and_trans_id(self, other_replica_uid):
+ """
+ Return the last known generation and transaction id for the other db
+ replica.
+ """
+ info = self.find_by_replica_uid(other_replica_uid)
+ if not info:
+ return (0, '')
+ return (info[1], info[2])
+
+ def set_replica_gen_and_trans_id(self, other_replica_uid,
+ other_generation, other_transaction_id):
+ """
+ Set the last-known generation and transaction id for the other
+ database replica.
+ """
+ self.log = self.filter(lambda x: x[0] != other_replica_uid)
+ self.append((other_replica_uid, other_generation,
+ other_transaction_id))
+