summaryrefslogtreecommitdiff
path: root/openstack.py
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2012-12-04 20:39:04 -0200
committerdrebs <drebs@leap.se>2012-12-04 20:39:04 -0200
commit287f01ad6bc0a93762e389eb6326921a729123a2 (patch)
treee2f4d4f115d6bb4fe795a0352ea3302419c42f93 /openstack.py
parentaffe129bcc5e6905b73ca74e5e6b0cfb00862690 (diff)
Add transaction and sync logs as openstack documents.
Diffstat (limited to 'openstack.py')
-rw-r--r--openstack.py114
1 files changed, 105 insertions, 9 deletions
diff --git a/openstack.py b/openstack.py
index 25f1a404..22a2d067 100644
--- a/openstack.py
+++ b/openstack.py
@@ -1,5 +1,6 @@
-from u1db.backends import CommonBackend
from leap import *
+from u1db import errors
+from u1db.backends import CommonBackend
from u1db.remote.http_target import HTTPSyncTarget
from swiftclient import client
@@ -96,21 +97,26 @@ class OpenStackDatabase(CommonBackend):
raise NotImplementedError(self.close)
def _get_replica_gen_and_trans_id(self, other_replica_uid):
- raise NotImplementedError(self._get_replica_gen_and_trans_id)
+ self._update_u1db_data()
+ return self._sync_log.get_replica_gen_and_trans_id(other_replica_uid)
def _set_replica_gen_and_trans_id(self, other_replica_uid,
other_generation, other_transaction_id):
- raise NotImplementedError(self._set_replica_gen_and_trans_id)
+ self._update_u1db_data()
+ return self._sync_log.set_replica_gen_and_trans_id(other_replica_uid,
+ other_generation, other_transaction_id)
#-------------------------------------------------------------------------
# implemented methods from CommonBackend
#-------------------------------------------------------------------------
def _get_generation(self):
- raise NotImplementedError(self._get_generation)
+ self._update_u1db_data()
+ return self._transaction_log.get_generation()
def _get_generation_info(self):
- raise NotImplementedError(self._get_generation_info)
+ self._update_u1db_data()
+ return self._transaction_log.get_generation_info()
def _get_doc(self, doc_id, check_for_conflicts=False):
"""Get just the document content, without fancy handling."""
@@ -119,15 +125,16 @@ class OpenStackDatabase(CommonBackend):
def _has_conflicts(self, doc_id):
raise NotImplementedError(self._has_conflicts)
- def _get_transaction_log(self):
- raise NotImplementedError(self._get_transaction_log)
-
def _put_and_update_indexes(self, doc_id, old_doc, new_rev, content):
raise NotImplementedError(self._put_and_update_indexes)
def _get_trans_id_for_gen(self, generation):
- raise NotImplementedError(self._get_trans_id_for_gen)
+ self._update_u1db_data()
+ trans_id = self._transaction_log.get_trans_id_for_gen(generation)
+ if trans_id is None:
+ raise errors.InvalidGeneration
+ return trans_id
#-------------------------------------------------------------------------
# OpenStack specific methods
@@ -143,6 +150,11 @@ class OpenStackDatabase(CommonBackend):
self._url, self._auth_token = self._connection.get_auth()
return self._url, self.auth_token
+ def _update_u1db_data(self):
+ data = self.get_doc('u1db_data').content
+ self._transaction_log = data['transaction_log']
+ self._sync_log = data['sync_log']
+
class OpenStackSyncTarget(HTTPSyncTarget):
@@ -152,3 +164,87 @@ class OpenStackSyncTarget(HTTPSyncTarget):
def record_sync_info(self, source_replica_uid, source_replica_generation,
source_replica_transaction_id):
raise NotImplementedError(self.record_sync_info)
+
+
+class SimpleLog(object):
+ def __init__(self, log=None):
+ self._log = []
+ if log:
+ self._log = log
+
+ def append(self, msg):
+ self._log.append(msg)
+
+ def reduce(self, func, initializer=None):
+ return reduce(func, self._log, initializer)
+
+ def map(self, func):
+ return map(func, self._log)
+
+
+class TransactionLog(SimpleLog):
+ """
+ A list of (generation, doc_id, transaction_id) tuples.
+ """
+
+ def get_generation(self):
+ """
+ Return the current generation.
+ """
+ gens = self.map(lambda x: x[0])
+ if not gens:
+ return 0
+ return max(gens)
+
+ def get_generation_info(self):
+ """
+ Return the current generation and transaction id.
+ """
+ if not self._log:
+ return(0, '')
+ info = self.map(lambda x: (x[0], x[2]))
+ return reduce(lambda x, y: x if (x[0] > y[0]) else y, info)
+
+ def get_trans_id_for_gen(self, gen):
+ """
+ Get the transaction id corresponding to a particular generation.
+ """
+ log = self.reduce(lambda x, y: y if y[0] == gen else x)
+ if log is None:
+ return None
+ return log[2]
+
+class SyncLog(SimpleLog):
+ """
+ A list of (replica_id, generation, transaction_id) tuples.
+ """
+
+ def find_by_replica_uid(self, replica_uid):
+ if not self._log:
+ return ()
+ return self.reduce(lambda x, y: y if y[0] == replica_uid else x)
+
+ def get_replica_gen_and_trans_id(self, other_replica_uid):
+ """
+ Return the last known generation and transaction id for the other db
+ replica.
+ """
+ info = self.find_by_replica_uid(other_replica_uid)
+ if not info:
+ return (0, '')
+ return (info[1], info[2])
+
+ def set_replica_gen_and_trans_id(self, other_replica_uid,
+ other_generation, other_transaction_id):
+ """
+ Set the last-known generation and transaction id for the other
+ database replica.
+ """
+ old_log = self._log
+ self._log = []
+ for log in old_log:
+ if log[0] != other_replica_uid:
+ self.append(log)
+ self.append((other_replica_uid, other_generation,
+ other_transaction_id))
+