diff options
Diffstat (limited to 'src/leap/soledad/backends')
| -rw-r--r-- | src/leap/soledad/backends/couch.py | 75 | ||||
| -rw-r--r-- | src/leap/soledad/backends/leap_backend.py | 11 | ||||
| -rw-r--r-- | src/leap/soledad/backends/objectstore.py | 316 | ||||
| -rw-r--r-- | src/leap/soledad/backends/sqlcipher.py | 68 | 
4 files changed, 388 insertions, 82 deletions
| diff --git a/src/leap/soledad/backends/couch.py b/src/leap/soledad/backends/couch.py index a3909596..78026af8 100644 --- a/src/leap/soledad/backends/couch.py +++ b/src/leap/soledad/backends/couch.py @@ -1,8 +1,10 @@ +import sys +import uuid +from base64 import b64encode, b64decode  from u1db import errors -from u1db.remote.http_target import HTTPSyncTarget -from couchdb.client import Server, Document +from u1db.sync import LocalSyncTarget +from couchdb.client import Server, Document as CouchDocument  from couchdb.http import ResourceNotFound -  from leap.soledad.backends.objectstore import ObjectStore  from leap.soledad.backends.leap_backend import LeapDocument @@ -15,7 +17,7 @@ except ImportError:  class CouchDatabase(ObjectStore):      """A U1DB implementation that uses Couch as its persistence layer.""" -    def __init__(self, url, database, full_commit=True, session=None):  +    def __init__(self, url, database, replica_uid=None, full_commit=True, session=None):           """Create a new Couch data container."""          self._url = url          self._full_commit = full_commit @@ -23,6 +25,7 @@ class CouchDatabase(ObjectStore):          self._server = Server(url=self._url,                                full_commit=self._full_commit,                                session=self._session) +        self._dbname = database          # this will ensure that transaction and sync logs exist and are          # up-to-date.          self.set_document_factory(LeapDocument) @@ -31,22 +34,26 @@ class CouchDatabase(ObjectStore):          except ResourceNotFound:              self._server.create(database)              self._database = self._server[database] -        super(CouchDatabase, self).__init__() +        super(CouchDatabase, self).__init__(replica_uid=replica_uid)      #-------------------------------------------------------------------------      # implemented methods from Database      #-------------------------------------------------------------------------      def _get_doc(self, doc_id, check_for_conflicts=False): -        """Get just the document content, without fancy handling. -         -        Conflicts do not happen on server side, so there's no need to check -        for them. +        """ +        Get just the document content, without fancy handling.          """          cdoc = self._database.get(doc_id)          if cdoc is None:              return None -        doc = self._factory(doc_id=doc_id, rev=cdoc['u1db_rev']) +        has_conflicts = False +        if check_for_conflicts: +            has_conflicts = self._has_conflicts(doc_id) +        doc = self._factory( +            doc_id=doc_id, +            rev=cdoc['u1db_rev'], +            has_conflicts=has_conflicts)          if cdoc['u1db_json'] is not None:              doc.content = json.loads(cdoc['u1db_json'])          else: @@ -58,7 +65,9 @@ class CouchDatabase(ObjectStore):          generation = self._get_generation()          results = []          for doc_id in self._database: -            doc = self._get_doc(doc_id) +            if doc_id == self.U1DB_DATA_DOC_ID: +                continue +            doc = self._get_doc(doc_id, check_for_conflicts=True)              if doc.content is None and not include_deleted:                  continue              results.append(doc) @@ -66,7 +75,7 @@ class CouchDatabase(ObjectStore):      def _put_doc(self, doc):          # prepare couch's Document -        cdoc = Document() +        cdoc = CouchDocument()          cdoc['_id'] = doc.doc_id          # we have to guarantee that couch's _rev is cosistent          old_cdoc = self._database.get(doc.doc_id) @@ -79,35 +88,68 @@ class CouchDatabase(ObjectStore):              cdoc['u1db_json'] = doc.get_json()          else:              cdoc['u1db_json'] = None +        # save doc in db          self._database.save(cdoc)      def get_sync_target(self):          return CouchSyncTarget(self)      def close(self): -        raise NotImplementedError(self.close) +        # TODO: fix this method so the connection is properly closed and +        # test_close (+tearDown, which deletes the db) works without problems. +        self._url = None +        self._full_commit = None +        self._session = None +        #self._server = None +        self._database = None +        return True +              def sync(self, url, creds=None, autocreate=True):          from u1db.sync import Synchronizer -        from u1db.remote.http_target import CouchSyncTarget          return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync(              autocreate=autocreate) +    def _initialize(self): +        if self._replica_uid is None: +            self._replica_uid = uuid.uuid4().hex +        doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) +        doc.content = { 'sync_log' : [], +                        'transaction_log' : [], +                        'conflict_log' : b64encode(json.dumps([])), +                        'replica_uid' : self._replica_uid } +        self._put_doc(doc) +      def _get_u1db_data(self):          cdoc = self._database.get(self.U1DB_DATA_DOC_ID)          content = json.loads(cdoc['u1db_json'])          self._sync_log.log = content['sync_log']          self._transaction_log.log = content['transaction_log'] +        self._conflict_log.log = json.loads(b64decode(content['conflict_log']))          self._replica_uid = content['replica_uid']          self._couch_rev = cdoc['_rev'] +    def _set_u1db_data(self): +        doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) +        doc.content = { 'sync_log'        : self._sync_log.log, +                        'transaction_log' : self._transaction_log.log, +                        # Here, the b64 encode ensures that document content +                        # does not cause strange behaviour in couchdb because +                        # of encoding. +                        'conflict_log'    : b64encode(json.dumps(self._conflict_log.log)), +                        'replica_uid'     : self._replica_uid, +                        '_rev'            : self._couch_rev} +        self._put_doc(doc) +      #-------------------------------------------------------------------------      # Couch specific methods      #------------------------------------------------------------------------- -    # no specific methods so far. +    def delete_database(self): +        del(self._server[self._dbname]) -class CouchSyncTarget(HTTPSyncTarget): + +class CouchSyncTarget(LocalSyncTarget):      def get_sync_info(self, source_replica_uid):          source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( @@ -125,4 +167,3 @@ class CouchSyncTarget(HTTPSyncTarget):              source_replica_uid, source_replica_generation,              source_replica_transaction_id) - diff --git a/src/leap/soledad/backends/leap_backend.py b/src/leap/soledad/backends/leap_backend.py index a8a65eb4..3e859f7c 100644 --- a/src/leap/soledad/backends/leap_backend.py +++ b/src/leap/soledad/backends/leap_backend.py @@ -4,16 +4,19 @@ except ImportError:      import json  # noqa  from u1db import Document +from u1db.remote import utils  from u1db.remote.http_target import HTTPSyncTarget  from u1db.remote.http_database import HTTPDatabase -import base64  # unused +from u1db.errors import BrokenSyncStream +from leap.soledad.util import GPGWrapper -#from leap.soledad import util  # import GPGWrapper  # unused +import uuid  class NoDefaultKey(Exception):      pass +  class NoSoledadInstance(Exception):      pass @@ -72,6 +75,10 @@ class LeapDatabase(HTTPDatabase):          db._delete()          db.close() +    def _allocate_doc_id(self): +        """Generate a unique identifier for this document.""" +        return 'D-' + uuid.uuid4().hex  # 'D-' stands for document +      def get_sync_target(self):          st = LeapSyncTarget(self._url.geturl())          st._creds = self._creds diff --git a/src/leap/soledad/backends/objectstore.py b/src/leap/soledad/backends/objectstore.py index 3cefdf5d..b6523336 100644 --- a/src/leap/soledad/backends/objectstore.py +++ b/src/leap/soledad/backends/objectstore.py @@ -1,22 +1,20 @@ -import uuid  from u1db.backends import CommonBackend -from u1db import errors, Document - -from leap.soledad import util as soledadutil - +from u1db import errors, Document, vectorclock  class ObjectStore(CommonBackend):      """      A backend for storing u1db data in an object store.      """ -    def __init__(self): +    def __init__(self, replica_uid=None):          # This initialization method should be called after the connection -        # with the database is established, so it can ensure that u1db data is -        # configured and up-to-date. +        # with the database is established in each implementation, so it can +        # ensure that u1db data is configured and up-to-date.          self.set_document_factory(Document) -        self._sync_log = soledadutil.SyncLog() -        self._transaction_log = soledadutil.TransactionLog() +        self._sync_log = SyncLog() +        self._transaction_log = TransactionLog() +        self._conflict_log = ConflictLog(self._factory) +        self._replica_uid = replica_uid          self._ensure_u1db_data()      #------------------------------------------------------------------------- @@ -44,6 +42,12 @@ class ObjectStore(CommonBackend):      def _put_doc(self, doc):          raise NotImplementedError(self._put_doc) +    def _update_gen_and_transaction_log(self, doc_id): +        new_gen = self._get_generation() + 1 +        trans_id = self._allocate_transaction_id() +        self._transaction_log.append((new_gen, doc_id, trans_id)) +        self._set_u1db_data() +      def put_doc(self, doc):          # consistency check          if doc.doc_id is None: @@ -65,12 +69,7 @@ class ObjectStore(CommonBackend):                      raise errors.RevisionConflict()              new_rev = self._allocate_doc_rev(doc.rev)          doc.rev = new_rev -        self._put_doc(doc) -        # update u1db generation and logs -        new_gen = self._get_generation() + 1 -        trans_id = self._allocate_transaction_id() -        self._transaction_log.append((new_gen, doc.doc_id, trans_id)) -        self._set_u1db_data() +        self._put_and_update_indexes(old_doc, doc)          return doc.rev      def delete_doc(self, doc): @@ -86,7 +85,7 @@ class ObjectStore(CommonBackend):          new_rev = self._allocate_doc_rev(doc.rev)          doc.rev = new_rev          doc.make_tombstone() -        self._put_doc(doc) +        self._put_and_update_indexes(old_doc, doc)          return new_rev      # start of index-related methods: these are not supported by this backend. @@ -113,10 +112,25 @@ class ObjectStore(CommonBackend):      # end of index-related methods: these are not supported by this backend.      def get_doc_conflicts(self, doc_id): -        return [] +        self._get_u1db_data() +        conflict_docs = self._conflict_log.get_conflicts(doc_id) +        if not conflict_docs: +            return [] +        this_doc = self._get_doc(doc_id) +        this_doc.has_conflicts = True +        return [this_doc] + list(conflict_docs)      def resolve_doc(self, doc, conflicted_doc_revs): -        raise NotImplementedError(self.resolve_doc) +        cur_doc = self._get_doc(doc.doc_id) +        new_rev = self._ensure_maximal_rev(cur_doc.rev, +                                           conflicted_doc_revs) +        superseded_revs = set(conflicted_doc_revs) +        doc.rev = new_rev +        if cur_doc.rev in superseded_revs: +            self._put_and_update_indexes(cur_doc, doc) +        else: +            self._add_conflict(doc.doc_id, new_rev, doc.get_json()) +        self._delete_conflicts(doc, superseded_revs)      def _get_replica_gen_and_trans_id(self, other_replica_uid):          self._get_u1db_data() @@ -124,12 +138,22 @@ class ObjectStore(CommonBackend):      def _set_replica_gen_and_trans_id(self, other_replica_uid,                                        other_generation, other_transaction_id): -        self._get_u1db_data() +        return self._do_set_replica_gen_and_trans_id( +                 other_replica_uid, +                 other_generation, +                 other_transaction_id) + +    def _do_set_replica_gen_and_trans_id(self, other_replica_uid, +                                         other_generation, other_transaction_id):          self._sync_log.set_replica_gen_and_trans_id(other_replica_uid,                                                      other_generation,                                                      other_transaction_id)          self._set_u1db_data() +    def _get_transaction_log(self): +        self._get_u1db_data() +        return self._transaction_log.get_transaction_log() +      #-------------------------------------------------------------------------      # implemented methods from CommonBackend      #------------------------------------------------------------------------- @@ -143,12 +167,14 @@ class ObjectStore(CommonBackend):          return self._transaction_log.get_generation_info()      def _has_conflicts(self, doc_id): -        # Documents never have conflicts on server. -        return False - -    def _put_and_update_indexes(self, doc_id, old_doc, new_rev, content): -        raise NotImplementedError(self._put_and_update_indexes) +        self._get_u1db_data() +        return self._conflict_log.has_conflicts(doc_id) +    def _put_and_update_indexes(self, old_doc, doc): +        # for now we ignore indexes as this backend is used to store encrypted +        # blobs of data in the server. +        self._put_doc(doc) +        self._update_gen_and_transaction_log(doc.doc_id)      def _get_trans_id_for_gen(self, generation):          self._get_u1db_data() @@ -184,14 +210,9 @@ class ObjectStore(CommonBackend):          """          Create u1db data object in store.          """ -        self._replica_uid = uuid.uuid4().hex -        doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) -        doc.content = { 'transaction_log' : [], -                        'sync_log' : [], -                        'replica_uid' : self._replica_uid } -        self._put_doc(doc) +        NotImplementedError(self._initialize) -    def _get_u1db_data(self, u1db_data_doc_id): +    def _get_u1db_data(self):          """          Fetch u1db configuration data from backend storage.          """ @@ -201,11 +222,230 @@ class ObjectStore(CommonBackend):          """          Save u1db configuration data on backend storage.          """ -        doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID) -        doc.content = { 'transaction_log' : self._transaction_log.log, -                        'sync_log'        : self._sync_log.log, -                        'replica_uid'     : self._replica_uid, -                        '_rev'            : self._couch_rev} -        self._put_doc(doc) +        NotImplementedError(self._set_u1db_data) + +    def _set_replica_uid(self, replica_uid): +        self._replica_uid = replica_uid +        self._set_u1db_data() + +    def _get_replica_uid(self): +        return self._replica_uid + +    replica_uid = property( +        _get_replica_uid, _set_replica_uid, doc="Replica UID of the database") + + +    #------------------------------------------------------------------------- +    # The methods below were cloned from u1db sqlite backend. They should at +    # least exist and raise a NotImplementedError exception in CommonBackend +    # (should we maybe fill a bug in u1db bts?). +    #------------------------------------------------------------------------- + +    def _add_conflict(self, doc_id, my_doc_rev, my_content): +        self._conflict_log.append((doc_id, my_doc_rev, my_content)) +        self._set_u1db_data() + +    def _delete_conflicts(self, doc, conflict_revs): +        deleting = [(doc.doc_id, c_rev) for c_rev in conflict_revs] +        self._conflict_log.delete_conflicts(deleting) +        self._set_u1db_data() +        doc.has_conflicts = self._has_conflicts(doc.doc_id) + +    def _prune_conflicts(self, doc, doc_vcr): +        if self._has_conflicts(doc.doc_id): +            autoresolved = False +            c_revs_to_prune = [] +            for c_doc in self._conflict_log.get_conflicts(doc.doc_id): +                c_vcr = vectorclock.VectorClockRev(c_doc.rev) +                if doc_vcr.is_newer(c_vcr): +                    c_revs_to_prune.append(c_doc.rev) +                elif doc.same_content_as(c_doc): +                    c_revs_to_prune.append(c_doc.rev) +                    doc_vcr.maximize(c_vcr) +                    autoresolved = True +            if autoresolved: +                doc_vcr.increment(self._replica_uid) +                doc.rev = doc_vcr.as_str() +            self._delete_conflicts(doc, c_revs_to_prune) + +    def _force_doc_sync_conflict(self, doc): +        my_doc = self._get_doc(doc.doc_id) +        self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) +        self._add_conflict(doc.doc_id, my_doc.rev, my_doc.get_json()) +        doc.has_conflicts = True +        self._put_and_update_indexes(my_doc, doc) + + +#---------------------------------------------------------------------------- +# U1DB's TransactionLog, SyncLog, ConflictLog, and Index +#---------------------------------------------------------------------------- + +class SimpleList(object): +    def __init__(self): +        self._data = [] + +    def _set_data(self, data): +        self._data = data + +    def _get_data(self): +        return self._data + +    data = property( +        _get_data, _set_data, doc="List contents.") + +    def append(self, msg): +        self._data.append(msg) + +    def reduce(self, func, initializer=None): +        return reduce(func, self._data, initializer) + +    def map(self, func): +        return map(func, self._get_data()) + +    def filter(self, func): +        return filter(func, self._get_data()) + + +class SimpleLog(SimpleList): +    def _set_log(self, log): +        self._data = log + +    def _get_log(self): +        return self._data + +    log = property( +        _get_log, _set_log, doc="Log contents.") + + +class TransactionLog(SimpleLog): +    """ +    An ordered list of (generation, doc_id, transaction_id) tuples. +    """ + +    def _set_log(self, log): +        self._data = log + +    def _get_data(self, reverse=True): +        return sorted(self._data, reverse=reverse) + +    _get_log = _get_data + +    log = property( +        _get_log, _set_log, doc="Log contents.") + +    def get_generation(self): +        """ +        Return the current generation. +        """ +        gens = self.map(lambda x: x[0]) +        if not gens: +            return 0 +        return max(gens) + +    def get_generation_info(self): +        """ +        Return the current generation and transaction id. +        """ +        if not self._get_log(): +            return(0, '') +        info = self.map(lambda x: (x[0], x[2])) +        return reduce(lambda x, y: x if (x[0] > y[0]) else y, info) + +    def get_trans_id_for_gen(self, gen): +        """ +        Get the transaction id corresponding to a particular generation. +        """ +        log = self.reduce(lambda x, y: y if y[0] == gen else x) +        if log is None: +            return None +        return log[2] + +    def whats_changed(self, old_generation): +        """ +        Return a list of documents that have changed since old_generation. +        """ +        results = self.filter(lambda x: x[0] > old_generation) +        seen = set() +        changes = [] +        newest_trans_id = '' +        for generation, doc_id, trans_id in results: +            if doc_id not in seen: +                changes.append((doc_id, generation, trans_id)) +                seen.add(doc_id) +        if changes: +            cur_gen = changes[0][1]  # max generation +            newest_trans_id = changes[0][2] +            changes.reverse() +        else: +            results = self._get_log() +            if not results: +                cur_gen = 0 +                newest_trans_id = '' +            else: +                cur_gen, _, newest_trans_id = results[0] + +        return cur_gen, newest_trans_id, changes + + +    def get_transaction_log(self): +        """ +        Return only a list of (doc_id, transaction_id) +        """ +        return map(lambda x: (x[1], x[2]), sorted(self._get_log(reverse=False))) + + +class SyncLog(SimpleLog): +    """ +    A list of (replica_id, generation, transaction_id) tuples. +    """ + +    def find_by_replica_uid(self, replica_uid): +        if not self._get_log(): +            return () +        return self.reduce(lambda x, y: y if y[0] == replica_uid else x) + +    def get_replica_gen_and_trans_id(self, other_replica_uid): +        """ +        Return the last known generation and transaction id for the other db +        replica. +        """ +        info = self.find_by_replica_uid(other_replica_uid) +        if not info: +            return (0, '') +        return (info[1], info[2]) + +    def set_replica_gen_and_trans_id(self, other_replica_uid, +                                      other_generation, other_transaction_id): +        """ +        Set the last-known generation and transaction id for the other +        database replica. +        """ +        self._set_log(self.filter(lambda x: x[0] != other_replica_uid)) +        self.append((other_replica_uid, other_generation, +                     other_transaction_id)) + +class ConflictLog(SimpleLog): +    """ +    A list of (doc_id, my_doc_rev, my_content) tuples. +    """ + +    def __init__(self, factory): +        super(ConflictLog, self).__init__() +        self._factory = factory +     +    def delete_conflicts(self, conflicts): +        for conflict in conflicts: +            self._set_log(self.filter(lambda x: +                          x[0] != conflict[0] or x[1] != conflict[1])) + +    def get_conflicts(self, doc_id): +        conflicts = self.filter(lambda x: x[0] == doc_id) +        if not conflicts: +            return [] +        return reversed(map(lambda x: self._factory(doc_id, x[1], x[2]), +                            conflicts)) + +    def has_conflicts(self, doc_id): +        return bool(self.filter(lambda x: x[0] == doc_id)) diff --git a/src/leap/soledad/backends/sqlcipher.py b/src/leap/soledad/backends/sqlcipher.py index 6fd6e619..3d03449e 100644 --- a/src/leap/soledad/backends/sqlcipher.py +++ b/src/leap/soledad/backends/sqlcipher.py @@ -16,30 +16,21 @@  """A U1DB implementation that uses SQLCipher as its persistence layer.""" -import errno  import os -try: -    import simplejson as json -except ImportError: -    import json  # noqa -from sqlite3 import dbapi2 -import sys +from sqlite3 import dbapi2, DatabaseError  import time -import uuid -import pkg_resources - -from u1db.backends import CommonBackend, CommonSyncTarget -from u1db.backends.sqlite_backend import SQLitePartialExpandDatabase +from u1db.backends.sqlite_backend import ( +    SQLiteDatabase, +    SQLitePartialExpandDatabase, +)  from u1db import (      Document,      errors, -    query_parser, -    vectorclock, -    ) +) -def open(path, create, document_factory=None, password=None): +def open(path, password, create=True, document_factory=None):      """Open a database at the given location.      Will raise u1db.errors.DatabaseDoesNotExist if create=False and the @@ -52,11 +43,17 @@ def open(path, create, document_factory=None, password=None):          parameters as Document.__init__.      :return: An instance of Database.      """ -    from u1db.backends import sqlite_backend -    return sqlite_backend.SQLCipherDatabase.open_database( +    return SQLCipherDatabase.open_database(          path, password, create=create, document_factory=document_factory) +class DatabaseIsNotEncrypted(Exception): +    """ +    Exception raised when trying to open non-encrypted databases. +    """ +    pass + +  class SQLCipherDatabase(SQLitePartialExpandDatabase):      """A U1DB implementation that uses SQLCipher as its persistence layer.""" @@ -67,14 +64,30 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase):      def set_pragma_key(cls, db_handle, key):         db_handle.cursor().execute("PRAGMA key = '%s'" % key) +      def __init__(self, sqlite_file, password, document_factory=None): -        """Create a new sqlite file.""" +        """Create a new sqlcipher file.""" +        self._check_if_db_is_encrypted(sqlite_file)          self._db_handle = dbapi2.connect(sqlite_file)          SQLCipherDatabase.set_pragma_key(self._db_handle, password)          self._real_replica_uid = None          self._ensure_schema()          self._factory = document_factory or Document + +    def _check_if_db_is_encrypted(self, sqlite_file): +        if not os.path.exists(sqlite_file): +            return +        else: +            try: +                # try to open an encrypted database with the regular u1db backend +                # should raise a DatabaseError exception. +                SQLitePartialExpandDatabase(sqlite_file) +                raise DatabaseIsNotEncrypted() +            except DatabaseError: +                pass + +      @classmethod      def _open_database(cls, sqlite_file, password, document_factory=None):          if not os.path.isfile(sqlite_file): @@ -100,6 +113,7 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase):          return SQLCipherDatabase._sqlite_registry[v](              sqlite_file, password, document_factory=document_factory) +      @classmethod      def open_database(cls, sqlite_file, password, create, backend_cls=None,                        document_factory=None): @@ -115,13 +129,17 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase):              return backend_cls(sqlite_file, password,                                 document_factory=document_factory) -    @staticmethod -    def register_implementation(klass): -        """Register that we implement an SQLCipherDatabase. -        The attribute _index_storage_value will be used as the lookup key. +    def sync(self, url, creds=None, autocreate=True, soledad=None): +        """ +        Synchronize encrypted documents with remote replica exposed at url.          """ -        SQLCipherDatabase._sqlite_registry[klass._index_storage_value] = klass +        from u1db.sync import Synchronizer +        from leap.soledad.backends.leap_backend import LeapSyncTarget +        return Synchronizer(self, LeapSyncTarget(url, creds=creds), +                            soledad=self._soledad).sync( +            autocreate=autocreate) + +SQLiteDatabase.register_implementation(SQLCipherDatabase) -SQLCipherDatabase.register_implementation(SQLCipherDatabase) | 
