diff options
author | kali <kali@leap.se> | 2013-01-23 00:31:14 +0900 |
---|---|---|
committer | kali <kali@leap.se> | 2013-01-23 00:31:14 +0900 |
commit | 06a1ef47e1719c3987006043fd1aae10e6da5c86 (patch) | |
tree | 87df182d2f8bea962b427e2f31211e8be0f31a6b /src/leap/soledad/backends | |
parent | 54802bf9c53fc32cfcceb23045c5aeb313c19829 (diff) | |
parent | 3359370d9ae06ffc0d60930e7f997d2331dc9037 (diff) |
Merge branch 'develop' of ssh://leap.se/leap_client into develop
Diffstat (limited to 'src/leap/soledad/backends')
-rw-r--r-- | src/leap/soledad/backends/couch.py | 75 | ||||
-rw-r--r-- | src/leap/soledad/backends/leap_backend.py | 11 | ||||
-rw-r--r-- | src/leap/soledad/backends/objectstore.py | 316 | ||||
-rw-r--r-- | src/leap/soledad/backends/sqlcipher.py | 68 |
4 files changed, 388 insertions, 82 deletions
diff --git a/src/leap/soledad/backends/couch.py b/src/leap/soledad/backends/couch.py index a3909596..78026af8 100644 --- a/src/leap/soledad/backends/couch.py +++ b/src/leap/soledad/backends/couch.py @@ -1,8 +1,10 @@ +import sys +import uuid +from base64 import b64encode, b64decode from u1db import errors -from u1db.remote.http_target import HTTPSyncTarget -from couchdb.client import Server, Document +from u1db.sync import LocalSyncTarget +from couchdb.client import Server, Document as CouchDocument from couchdb.http import ResourceNotFound - from leap.soledad.backends.objectstore import ObjectStore from leap.soledad.backends.leap_backend import LeapDocument @@ -15,7 +17,7 @@ except ImportError: class CouchDatabase(ObjectStore): """A U1DB implementation that uses Couch as its persistence layer.""" - def __init__(self, url, database, full_commit=True, session=None): + def __init__(self, url, database, replica_uid=None, full_commit=True, session=None): """Create a new Couch data container.""" self._url = url self._full_commit = full_commit @@ -23,6 +25,7 @@ class CouchDatabase(ObjectStore): self._server = Server(url=self._url, full_commit=self._full_commit, session=self._session) + self._dbname = database # this will ensure that transaction and sync logs exist and are # up-to-date. self.set_document_factory(LeapDocument) @@ -31,22 +34,26 @@ class CouchDatabase(ObjectStore): except ResourceNotFound: self._server.create(database) self._database = self._server[database] - super(CouchDatabase, self).__init__() + super(CouchDatabase, self).__init__(replica_uid=replica_uid) #------------------------------------------------------------------------- # implemented methods from Database #------------------------------------------------------------------------- def _get_doc(self, doc_id, check_for_conflicts=False): - """Get just the document content, without fancy handling. - - Conflicts do not happen on server side, so there's no need to check - for them. + """ + Get just the document content, without fancy handling. """ cdoc = self._database.get(doc_id) if cdoc is None: return None - doc = self._factory(doc_id=doc_id, rev=cdoc['u1db_rev']) + has_conflicts = False + if check_for_conflicts: + has_conflicts = self._has_conflicts(doc_id) + doc = self._factory( + doc_id=doc_id, + rev=cdoc['u1db_rev'], + has_conflicts=has_conflicts) if cdoc['u1db_json'] is not None: doc.content = json.loads(cdoc['u1db_json']) else: @@ -58,7 +65,9 @@ class CouchDatabase(ObjectStore): generation = self._get_generation() results = [] for doc_id in self._database: - doc = self._get_doc(doc_id) + if doc_id == self.U1DB_DATA_DOC_ID: + continue + doc = self._get_doc(doc_id, check_for_conflicts=True) if doc.content is None and not include_deleted: continue results.append(doc) @@ -66,7 +75,7 @@ class CouchDatabase(ObjectStore): def _put_doc(self, doc): # prepare couch's Document - cdoc = Document() + cdoc = CouchDocument() cdoc['_id'] = doc.doc_id # we have to guarantee that couch's _rev is cosistent old_cdoc = self._database.get(doc.doc_id) @@ -79,35 +88,68 @@ class CouchDatabase(ObjectStore): cdoc['u1db_json'] = doc.get_json() else: cdoc['u1db_json'] = None + # save doc in db self._database.save(cdoc) def get_sync_target(self): return CouchSyncTarget(self) def close(self): - raise NotImplementedError(self.close) + # TODO: fix this method so the connection is properly closed and + # test_close (+tearDown, which deletes the db) works without problems. + self._url = None + self._full_commit = None + self._session = None + #self._server = None + self._database = None + return True + def sync(self, url, creds=None, autocreate=True): from u1db.sync import Synchronizer - from u1db.remote.http_target import CouchSyncTarget return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync( autocreate=autocreate) + def _initialize(self): + if self._replica_uid is None: + self._replica_uid = uuid.uuid4().hex + doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) + doc.content = { 'sync_log' : [], + 'transaction_log' : [], + 'conflict_log' : b64encode(json.dumps([])), + 'replica_uid' : self._replica_uid } + self._put_doc(doc) + def _get_u1db_data(self): cdoc = self._database.get(self.U1DB_DATA_DOC_ID) content = json.loads(cdoc['u1db_json']) self._sync_log.log = content['sync_log'] self._transaction_log.log = content['transaction_log'] + self._conflict_log.log = json.loads(b64decode(content['conflict_log'])) self._replica_uid = content['replica_uid'] self._couch_rev = cdoc['_rev'] + def _set_u1db_data(self): + doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) + doc.content = { 'sync_log' : self._sync_log.log, + 'transaction_log' : self._transaction_log.log, + # Here, the b64 encode ensures that document content + # does not cause strange behaviour in couchdb because + # of encoding. + 'conflict_log' : b64encode(json.dumps(self._conflict_log.log)), + 'replica_uid' : self._replica_uid, + '_rev' : self._couch_rev} + self._put_doc(doc) + #------------------------------------------------------------------------- # Couch specific methods #------------------------------------------------------------------------- - # no specific methods so far. + def delete_database(self): + del(self._server[self._dbname]) -class CouchSyncTarget(HTTPSyncTarget): + +class CouchSyncTarget(LocalSyncTarget): def get_sync_info(self, source_replica_uid): source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( @@ -125,4 +167,3 @@ class CouchSyncTarget(HTTPSyncTarget): source_replica_uid, source_replica_generation, source_replica_transaction_id) - diff --git a/src/leap/soledad/backends/leap_backend.py b/src/leap/soledad/backends/leap_backend.py index a8a65eb4..3e859f7c 100644 --- a/src/leap/soledad/backends/leap_backend.py +++ b/src/leap/soledad/backends/leap_backend.py @@ -4,16 +4,19 @@ except ImportError: import json # noqa from u1db import Document +from u1db.remote import utils from u1db.remote.http_target import HTTPSyncTarget from u1db.remote.http_database import HTTPDatabase -import base64 # unused +from u1db.errors import BrokenSyncStream +from leap.soledad.util import GPGWrapper -#from leap.soledad import util # import GPGWrapper # unused +import uuid class NoDefaultKey(Exception): pass + class NoSoledadInstance(Exception): pass @@ -72,6 +75,10 @@ class LeapDatabase(HTTPDatabase): db._delete() db.close() + def _allocate_doc_id(self): + """Generate a unique identifier for this document.""" + return 'D-' + uuid.uuid4().hex # 'D-' stands for document + def get_sync_target(self): st = LeapSyncTarget(self._url.geturl()) st._creds = self._creds diff --git a/src/leap/soledad/backends/objectstore.py b/src/leap/soledad/backends/objectstore.py index 3cefdf5d..b6523336 100644 --- a/src/leap/soledad/backends/objectstore.py +++ b/src/leap/soledad/backends/objectstore.py @@ -1,22 +1,20 @@ -import uuid from u1db.backends import CommonBackend -from u1db import errors, Document - -from leap.soledad import util as soledadutil - +from u1db import errors, Document, vectorclock class ObjectStore(CommonBackend): """ A backend for storing u1db data in an object store. """ - def __init__(self): + def __init__(self, replica_uid=None): # This initialization method should be called after the connection - # with the database is established, so it can ensure that u1db data is - # configured and up-to-date. + # with the database is established in each implementation, so it can + # ensure that u1db data is configured and up-to-date. self.set_document_factory(Document) - self._sync_log = soledadutil.SyncLog() - self._transaction_log = soledadutil.TransactionLog() + self._sync_log = SyncLog() + self._transaction_log = TransactionLog() + self._conflict_log = ConflictLog(self._factory) + self._replica_uid = replica_uid self._ensure_u1db_data() #------------------------------------------------------------------------- @@ -44,6 +42,12 @@ class ObjectStore(CommonBackend): def _put_doc(self, doc): raise NotImplementedError(self._put_doc) + def _update_gen_and_transaction_log(self, doc_id): + new_gen = self._get_generation() + 1 + trans_id = self._allocate_transaction_id() + self._transaction_log.append((new_gen, doc_id, trans_id)) + self._set_u1db_data() + def put_doc(self, doc): # consistency check if doc.doc_id is None: @@ -65,12 +69,7 @@ class ObjectStore(CommonBackend): raise errors.RevisionConflict() new_rev = self._allocate_doc_rev(doc.rev) doc.rev = new_rev - self._put_doc(doc) - # update u1db generation and logs - new_gen = self._get_generation() + 1 - trans_id = self._allocate_transaction_id() - self._transaction_log.append((new_gen, doc.doc_id, trans_id)) - self._set_u1db_data() + self._put_and_update_indexes(old_doc, doc) return doc.rev def delete_doc(self, doc): @@ -86,7 +85,7 @@ class ObjectStore(CommonBackend): new_rev = self._allocate_doc_rev(doc.rev) doc.rev = new_rev doc.make_tombstone() - self._put_doc(doc) + self._put_and_update_indexes(old_doc, doc) return new_rev # start of index-related methods: these are not supported by this backend. @@ -113,10 +112,25 @@ class ObjectStore(CommonBackend): # end of index-related methods: these are not supported by this backend. def get_doc_conflicts(self, doc_id): - return [] + self._get_u1db_data() + conflict_docs = self._conflict_log.get_conflicts(doc_id) + if not conflict_docs: + return [] + this_doc = self._get_doc(doc_id) + this_doc.has_conflicts = True + return [this_doc] + list(conflict_docs) def resolve_doc(self, doc, conflicted_doc_revs): - raise NotImplementedError(self.resolve_doc) + cur_doc = self._get_doc(doc.doc_id) + new_rev = self._ensure_maximal_rev(cur_doc.rev, + conflicted_doc_revs) + superseded_revs = set(conflicted_doc_revs) + doc.rev = new_rev + if cur_doc.rev in superseded_revs: + self._put_and_update_indexes(cur_doc, doc) + else: + self._add_conflict(doc.doc_id, new_rev, doc.get_json()) + self._delete_conflicts(doc, superseded_revs) def _get_replica_gen_and_trans_id(self, other_replica_uid): self._get_u1db_data() @@ -124,12 +138,22 @@ class ObjectStore(CommonBackend): def _set_replica_gen_and_trans_id(self, other_replica_uid, other_generation, other_transaction_id): - self._get_u1db_data() + return self._do_set_replica_gen_and_trans_id( + other_replica_uid, + other_generation, + other_transaction_id) + + def _do_set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): self._sync_log.set_replica_gen_and_trans_id(other_replica_uid, other_generation, other_transaction_id) self._set_u1db_data() + def _get_transaction_log(self): + self._get_u1db_data() + return self._transaction_log.get_transaction_log() + #------------------------------------------------------------------------- # implemented methods from CommonBackend #------------------------------------------------------------------------- @@ -143,12 +167,14 @@ class ObjectStore(CommonBackend): return self._transaction_log.get_generation_info() def _has_conflicts(self, doc_id): - # Documents never have conflicts on server. - return False - - def _put_and_update_indexes(self, doc_id, old_doc, new_rev, content): - raise NotImplementedError(self._put_and_update_indexes) + self._get_u1db_data() + return self._conflict_log.has_conflicts(doc_id) + def _put_and_update_indexes(self, old_doc, doc): + # for now we ignore indexes as this backend is used to store encrypted + # blobs of data in the server. + self._put_doc(doc) + self._update_gen_and_transaction_log(doc.doc_id) def _get_trans_id_for_gen(self, generation): self._get_u1db_data() @@ -184,14 +210,9 @@ class ObjectStore(CommonBackend): """ Create u1db data object in store. """ - self._replica_uid = uuid.uuid4().hex - doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) - doc.content = { 'transaction_log' : [], - 'sync_log' : [], - 'replica_uid' : self._replica_uid } - self._put_doc(doc) + NotImplementedError(self._initialize) - def _get_u1db_data(self, u1db_data_doc_id): + def _get_u1db_data(self): """ Fetch u1db configuration data from backend storage. """ @@ -201,11 +222,230 @@ class ObjectStore(CommonBackend): """ Save u1db configuration data on backend storage. """ - doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) - doc.content = { 'transaction_log' : self._transaction_log.log, - 'sync_log' : self._sync_log.log, - 'replica_uid' : self._replica_uid, - '_rev' : self._couch_rev} - self._put_doc(doc) + NotImplementedError(self._set_u1db_data) + + def _set_replica_uid(self, replica_uid): + self._replica_uid = replica_uid + self._set_u1db_data() + + def _get_replica_uid(self): + return self._replica_uid + + replica_uid = property( + _get_replica_uid, _set_replica_uid, doc="Replica UID of the database") + + + #------------------------------------------------------------------------- + # The methods below were cloned from u1db sqlite backend. They should at + # least exist and raise a NotImplementedError exception in CommonBackend + # (should we maybe fill a bug in u1db bts?). + #------------------------------------------------------------------------- + + def _add_conflict(self, doc_id, my_doc_rev, my_content): + self._conflict_log.append((doc_id, my_doc_rev, my_content)) + self._set_u1db_data() + + def _delete_conflicts(self, doc, conflict_revs): + deleting = [(doc.doc_id, c_rev) for c_rev in conflict_revs] + self._conflict_log.delete_conflicts(deleting) + self._set_u1db_data() + doc.has_conflicts = self._has_conflicts(doc.doc_id) + + def _prune_conflicts(self, doc, doc_vcr): + if self._has_conflicts(doc.doc_id): + autoresolved = False + c_revs_to_prune = [] + for c_doc in self._conflict_log.get_conflicts(doc.doc_id): + c_vcr = vectorclock.VectorClockRev(c_doc.rev) + if doc_vcr.is_newer(c_vcr): + c_revs_to_prune.append(c_doc.rev) + elif doc.same_content_as(c_doc): + c_revs_to_prune.append(c_doc.rev) + doc_vcr.maximize(c_vcr) + autoresolved = True + if autoresolved: + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + self._delete_conflicts(doc, c_revs_to_prune) + + def _force_doc_sync_conflict(self, doc): + my_doc = self._get_doc(doc.doc_id) + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + self._add_conflict(doc.doc_id, my_doc.rev, my_doc.get_json()) + doc.has_conflicts = True + self._put_and_update_indexes(my_doc, doc) + + +#---------------------------------------------------------------------------- +# U1DB's TransactionLog, SyncLog, ConflictLog, and Index +#---------------------------------------------------------------------------- + +class SimpleList(object): + def __init__(self): + self._data = [] + + def _set_data(self, data): + self._data = data + + def _get_data(self): + return self._data + + data = property( + _get_data, _set_data, doc="List contents.") + + def append(self, msg): + self._data.append(msg) + + def reduce(self, func, initializer=None): + return reduce(func, self._data, initializer) + + def map(self, func): + return map(func, self._get_data()) + + def filter(self, func): + return filter(func, self._get_data()) + + +class SimpleLog(SimpleList): + def _set_log(self, log): + self._data = log + + def _get_log(self): + return self._data + + log = property( + _get_log, _set_log, doc="Log contents.") + + +class TransactionLog(SimpleLog): + """ + An ordered list of (generation, doc_id, transaction_id) tuples. + """ + + def _set_log(self, log): + self._data = log + + def _get_data(self, reverse=True): + return sorted(self._data, reverse=reverse) + + _get_log = _get_data + + log = property( + _get_log, _set_log, doc="Log contents.") + + def get_generation(self): + """ + Return the current generation. + """ + gens = self.map(lambda x: x[0]) + if not gens: + return 0 + return max(gens) + + def get_generation_info(self): + """ + Return the current generation and transaction id. + """ + if not self._get_log(): + return(0, '') + info = self.map(lambda x: (x[0], x[2])) + return reduce(lambda x, y: x if (x[0] > y[0]) else y, info) + + def get_trans_id_for_gen(self, gen): + """ + Get the transaction id corresponding to a particular generation. + """ + log = self.reduce(lambda x, y: y if y[0] == gen else x) + if log is None: + return None + return log[2] + + def whats_changed(self, old_generation): + """ + Return a list of documents that have changed since old_generation. + """ + results = self.filter(lambda x: x[0] > old_generation) + seen = set() + changes = [] + newest_trans_id = '' + for generation, doc_id, trans_id in results: + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + if changes: + cur_gen = changes[0][1] # max generation + newest_trans_id = changes[0][2] + changes.reverse() + else: + results = self._get_log() + if not results: + cur_gen = 0 + newest_trans_id = '' + else: + cur_gen, _, newest_trans_id = results[0] + + return cur_gen, newest_trans_id, changes + + + def get_transaction_log(self): + """ + Return only a list of (doc_id, transaction_id) + """ + return map(lambda x: (x[1], x[2]), sorted(self._get_log(reverse=False))) + + +class SyncLog(SimpleLog): + """ + A list of (replica_id, generation, transaction_id) tuples. + """ + + def find_by_replica_uid(self, replica_uid): + if not self._get_log(): + return () + return self.reduce(lambda x, y: y if y[0] == replica_uid else x) + + def get_replica_gen_and_trans_id(self, other_replica_uid): + """ + Return the last known generation and transaction id for the other db + replica. + """ + info = self.find_by_replica_uid(other_replica_uid) + if not info: + return (0, '') + return (info[1], info[2]) + + def set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + """ + Set the last-known generation and transaction id for the other + database replica. + """ + self._set_log(self.filter(lambda x: x[0] != other_replica_uid)) + self.append((other_replica_uid, other_generation, + other_transaction_id)) + +class ConflictLog(SimpleLog): + """ + A list of (doc_id, my_doc_rev, my_content) tuples. + """ + + def __init__(self, factory): + super(ConflictLog, self).__init__() + self._factory = factory + + def delete_conflicts(self, conflicts): + for conflict in conflicts: + self._set_log(self.filter(lambda x: + x[0] != conflict[0] or x[1] != conflict[1])) + + def get_conflicts(self, doc_id): + conflicts = self.filter(lambda x: x[0] == doc_id) + if not conflicts: + return [] + return reversed(map(lambda x: self._factory(doc_id, x[1], x[2]), + conflicts)) + + def has_conflicts(self, doc_id): + return bool(self.filter(lambda x: x[0] == doc_id)) diff --git a/src/leap/soledad/backends/sqlcipher.py b/src/leap/soledad/backends/sqlcipher.py index 6fd6e619..3d03449e 100644 --- a/src/leap/soledad/backends/sqlcipher.py +++ b/src/leap/soledad/backends/sqlcipher.py @@ -16,30 +16,21 @@ """A U1DB implementation that uses SQLCipher as its persistence layer.""" -import errno import os -try: - import simplejson as json -except ImportError: - import json # noqa -from sqlite3 import dbapi2 -import sys +from sqlite3 import dbapi2, DatabaseError import time -import uuid -import pkg_resources - -from u1db.backends import CommonBackend, CommonSyncTarget -from u1db.backends.sqlite_backend import SQLitePartialExpandDatabase +from u1db.backends.sqlite_backend import ( + SQLiteDatabase, + SQLitePartialExpandDatabase, +) from u1db import ( Document, errors, - query_parser, - vectorclock, - ) +) -def open(path, create, document_factory=None, password=None): +def open(path, password, create=True, document_factory=None): """Open a database at the given location. Will raise u1db.errors.DatabaseDoesNotExist if create=False and the @@ -52,11 +43,17 @@ def open(path, create, document_factory=None, password=None): parameters as Document.__init__. :return: An instance of Database. """ - from u1db.backends import sqlite_backend - return sqlite_backend.SQLCipherDatabase.open_database( + return SQLCipherDatabase.open_database( path, password, create=create, document_factory=document_factory) +class DatabaseIsNotEncrypted(Exception): + """ + Exception raised when trying to open non-encrypted databases. + """ + pass + + class SQLCipherDatabase(SQLitePartialExpandDatabase): """A U1DB implementation that uses SQLCipher as its persistence layer.""" @@ -67,14 +64,30 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase): def set_pragma_key(cls, db_handle, key): db_handle.cursor().execute("PRAGMA key = '%s'" % key) + def __init__(self, sqlite_file, password, document_factory=None): - """Create a new sqlite file.""" + """Create a new sqlcipher file.""" + self._check_if_db_is_encrypted(sqlite_file) self._db_handle = dbapi2.connect(sqlite_file) SQLCipherDatabase.set_pragma_key(self._db_handle, password) self._real_replica_uid = None self._ensure_schema() self._factory = document_factory or Document + + def _check_if_db_is_encrypted(self, sqlite_file): + if not os.path.exists(sqlite_file): + return + else: + try: + # try to open an encrypted database with the regular u1db backend + # should raise a DatabaseError exception. + SQLitePartialExpandDatabase(sqlite_file) + raise DatabaseIsNotEncrypted() + except DatabaseError: + pass + + @classmethod def _open_database(cls, sqlite_file, password, document_factory=None): if not os.path.isfile(sqlite_file): @@ -100,6 +113,7 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase): return SQLCipherDatabase._sqlite_registry[v]( sqlite_file, password, document_factory=document_factory) + @classmethod def open_database(cls, sqlite_file, password, create, backend_cls=None, document_factory=None): @@ -115,13 +129,17 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase): return backend_cls(sqlite_file, password, document_factory=document_factory) - @staticmethod - def register_implementation(klass): - """Register that we implement an SQLCipherDatabase. - The attribute _index_storage_value will be used as the lookup key. + def sync(self, url, creds=None, autocreate=True, soledad=None): + """ + Synchronize encrypted documents with remote replica exposed at url. """ - SQLCipherDatabase._sqlite_registry[klass._index_storage_value] = klass + from u1db.sync import Synchronizer + from leap.soledad.backends.leap_backend import LeapSyncTarget + return Synchronizer(self, LeapSyncTarget(url, creds=creds), + soledad=self._soledad).sync( + autocreate=autocreate) + +SQLiteDatabase.register_implementation(SQLCipherDatabase) -SQLCipherDatabase.register_implementation(SQLCipherDatabase) |