diff options
Diffstat (limited to 'common/src')
5 files changed, 42 insertions, 245 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 8c152353..83f542ab 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -1071,14 +1071,20 @@ class CouchDatabase(CommonBackend):          """          if other_replica_uid in self.cache:              return self.cache[other_replica_uid] -        # query a couch view -        result = self._database.view('syncs/log') -        if len(result[other_replica_uid].rows) == 0: -            return (0, '') -        return ( -            result[other_replica_uid].rows[0]['value']['known_generation'], -            result[other_replica_uid].rows[0]['value']['known_transaction_id'] -        ) + +        doc_id = 'u1db_sync_%s' % other_replica_uid +        try: +            doc = self._database[doc_id] +        except ResourceNotFound: +            doc = { +                '_id': doc_id, +                'generation': 0, +                'transaction_id': '', +            } +            self._database.save(doc) +        result = doc['generation'], doc['transaction_id'] +        self.cache[other_replica_uid] = result +        return result      def _set_replica_gen_and_trans_id(self, other_replica_uid,                                        other_generation, other_transaction_id, @@ -1138,43 +1144,16 @@ class CouchDatabase(CommonBackend):          :type doc_idx: int          :param sync_id: The id of the current sync session.          :type sync_id: str - -        :raise MissingDesignDocError: Raised when tried to access a missing -                                      design document. -        :raise MissingDesignDocListFunctionError: Raised when trying to access -                                                  a missing list function on a -                                                  design document. -        :raise MissingDesignDocNamedViewError: Raised when trying to access a -                                               missing named view on a design -                                               document. -        :raise MissingDesignDocDeletedError: Raised when trying to access a -                                             deleted design document. -        :raise MissingDesignDocUnknownError: Raised when failed to access a -                                             design document for an yet -                                             unknown reason.          """          self.cache[other_replica_uid] = (other_generation, other_transaction_id) -        # query a couch update function -        ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log'] -        res = self._database.resource(*ddoc_path) +        doc_id = 'u1db_sync_%s' % other_replica_uid          try: -            with CouchDatabase.update_handler_lock[self._get_replica_uid()]: -                body = { -                    'other_replica_uid': other_replica_uid, -                    'other_generation': other_generation, -                    'other_transaction_id': other_transaction_id, -                } -                if number_of_docs is not None: -                    body['number_of_docs'] = number_of_docs -                if doc_idx is not None: -                    body['doc_idx'] = doc_idx -                if sync_id is not None: -                    body['sync_id'] = sync_id -                res.put_json( -                    body=body, -                    headers={'content-type': 'application/json'}) -        except ResourceNotFound as e: -            raise_missing_design_doc_error(e, ddoc_path) +            doc = self._database[doc_id] +        except ResourceNotFound: +            doc = {'_id': doc_id} +        doc['generation'] = other_generation +        doc['transaction_id'] = other_transaction_id +        self._database.save(doc)      def _force_doc_sync_conflict(self, doc):          """ diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js deleted file mode 100644 index b0ae2de6..00000000 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js +++ /dev/null @@ -1,151 +0,0 @@ -/** - * The u1db_sync_log document stores both the actual sync log and a list of - * pending updates to the log, in case we receive incoming documents out of - * the correct order (i.e. if there are parallel PUTs during the sync - * process). - * - * The structure of the document is the following: - * - *     { - *         'syncs': [ - *             ['<replica_uid>', <gen>, '<trans_id>'], - *             ...  - *         ], - *         'pending': { - *             'other_replica_uid': { - *                 'sync_id': '<sync_id>', - *                 'log': [[<gen>, '<trans_id>'], ...] - *             }, - *             ... - *         } - *     } - * - * The update function below does the following: - * - *   0. If we do not receive a sync_id, we just update the 'syncs' list with - *      the incoming info about the source replica state. - * - *   1. Otherwise, if the incoming sync_id differs from current stored - *      sync_id, then we assume that the previous sync session for that source - *      replica was interrupted and discard all pending data. - * - *   2. Then we append incoming info as pending data for that source replica - *      and current sync_id, and sort the pending data by generation. - * - *   3. Then we go through pending data and find the most recent generation - *      that we can use to update the actual sync log. - * - *   4. Finally, we insert the most up to date information into the sync log. - */ -function(doc, req){ - -    // create the document if it doesn't exist -    if (!doc) { -        doc = {} -        doc['_id'] = 'u1db_sync_log'; -        doc['syncs'] = []; -    } - -    // get and validate incoming info -    var body = JSON.parse(req.body); -    var other_replica_uid = body['other_replica_uid']; -    var other_generation = parseInt(body['other_generation']); -    var other_transaction_id = body['other_transaction_id'] -    var sync_id = body['sync_id']; -    var number_of_docs = body['number_of_docs']; -    var doc_idx = body['doc_idx']; - -    // parse integers -    if (number_of_docs != null) -        number_of_docs = parseInt(number_of_docs); -    if (doc_idx != null) -        doc_idx = parseInt(doc_idx); - -    if (other_replica_uid == null -            || other_generation == null -            || other_transaction_id == null) -        return [null, 'invalid data']; - -    // create slot for pending logs -    if (doc['pending'] == null) -        doc['pending'] = {}; - -    // these are the values that will be actually inserted -    var current_gen = other_generation; -    var current_trans_id = other_transaction_id; - -    /*------------- Wait for sequential values before storing -------------*/ - -    // we just try to obtain pending log if we received a sync_id -    if (sync_id != null) { - -        // create slot for current source and sync_id pending log -        if (doc['pending'][other_replica_uid] == null -                || doc['pending'][other_replica_uid]['sync_id'] != sync_id) { -            doc['pending'][other_replica_uid] = { -                'sync_id': sync_id, -                'log': [], -                'last_doc_idx': 0, -            } -        } - -        // append incoming data to pending log -        doc['pending'][other_replica_uid]['log'].push([ -            other_generation, -            other_transaction_id, -            doc_idx, -        ]) - -        // sort pending log according to generation -        doc['pending'][other_replica_uid]['log'].sort(function(a, b) { -            return a[0] - b[0]; -        }); - -        // get most up-to-date information from pending log -        var last_doc_idx = doc['pending'][other_replica_uid]['last_doc_idx']; -        var pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; - -        current_gen = null; -        current_trans_id = null; - -        while (last_doc_idx + 1 == pending_idx) { -            pending = doc['pending'][other_replica_uid]['log'].shift() -            current_gen = pending[0]; -            current_trans_id = pending[1]; -            last_doc_idx = pending[2] -            if (doc['pending'][other_replica_uid]['log'].length == 0) -                break; -            pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; -        } - -        // leave the sync log untouched if we still did not receive enough docs -        if (current_gen == null) -            return [doc, 'ok']; - -        // update last index of received doc -        doc['pending'][other_replica_uid]['last_doc_idx'] = last_doc_idx; - -        // eventually remove all pending data from that replica -        if (last_doc_idx == number_of_docs) -            delete doc['pending'][other_replica_uid] -    } - -    /*--------------- Store source replica info on sync log ---------------*/ - -    // remove outdated info -    doc['syncs'] = doc['syncs'].filter( -        function (entry) { -            return entry[0] != other_replica_uid; -        } -    ); - -    // store in log -    doc['syncs'].push([ -        other_replica_uid, -        current_gen, -        current_trans_id  -    ]); - -    return [doc, 'ok']; -} - diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js deleted file mode 100644 index a63c7cf4..00000000 --- a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js +++ /dev/null @@ -1,12 +0,0 @@ -function(doc) { -    if (doc._id == 'u1db_sync_log') { -        if (doc.syncs) -            doc.syncs.forEach(function (entry) { -                emit(entry[0], -                    { -                        'known_generation': entry[1], -                        'known_transaction_id': entry[2] -                    }); -            }); -    } -} diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index a08ffd16..c8d13667 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -1359,10 +1359,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          self.assertRaises(              errors.MissingDesignDocError,              self.db.whats_changed) -        # _do_set_replica_gen_and_trans_id() -        self.assertRaises( -            errors.MissingDesignDocError, -            self.db._do_set_replica_gen_and_trans_id, 1, 2, 3)      def test_missing_design_doc_functions_raises(self):          """ @@ -1489,10 +1485,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):          self.assertRaises(              errors.MissingDesignDocDeletedError,              self.db.whats_changed) -        # _do_set_replica_gen_and_trans_id() -        self.assertRaises( -            errors.MissingDesignDocDeletedError, -            self.db._do_set_replica_gen_and_trans_id, 1, 2, 3)      def test_ensure_ddoc_independently(self):          """ diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index 25f709ca..3e8e8cce 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -163,6 +163,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):                  2,                  len(filter(lambda t: t[0] == doc_id, transaction_log))) +    @defer.inlineCallbacks      def test_correct_sync_log_after_sequential_syncs(self):          """          Assert that the sync_log increases accordingly with sequential syncs. @@ -170,21 +171,21 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):          sol = self._soledad_instance(              auth_token='auth-token',              server_url=self.getURL()) +        source_replica_uid = sol._dbpool.replica_uid -        def _create_docs(results): +        def _create_docs():              deferreds = []              for i in xrange(0, REPEAT_TIMES):                  deferreds.append(sol.create_doc({})) -            return defer.DeferredList(deferreds) +            return defer.gatherResults(deferreds)          def _assert_transaction_and_sync_logs(results, sync_idx):              # assert sizes of transaction and sync logs              self.assertEqual(                  sync_idx * REPEAT_TIMES,                  len(self.db._get_transaction_log())) -            self.assertEqual( -                1 if sync_idx > 0 else 0, -                len(self.db._database.view('syncs/log').rows)) +            gen, _ = self.db._get_replica_gen_and_trans_id(source_replica_uid) +            self.assertEqual(sync_idx * REPEAT_TIMES, gen)          def _assert_sync(results, sync_idx):              gen, docs = results @@ -193,40 +194,28 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):              # assert sizes of transaction and sync logs              self.assertEqual((sync_idx + 1) * REPEAT_TIMES,                               len(self.db._get_transaction_log())) -            sync_log_rows = self.db._database.view('syncs/log').rows -            sync_log = sync_log_rows[0].value -            replica_uid = sync_log_rows[0].key -            known_gen = sync_log['known_generation'] -            known_trans_id = sync_log['known_transaction_id'] -            # assert sync_log has exactly 1 row -            self.assertEqual(1, len(sync_log_rows)) -            # assert it has the correct replica_uid, gen and trans_id -            self.assertEqual(sol._dbpool.replica_uid, replica_uid) +            target_known_gen, target_known_trans_id = \ +                self.db._get_replica_gen_and_trans_id(source_replica_uid) +            # assert it has the correct gen and trans_id              conn_key = sol._dbpool._u1dbconnections.keys().pop()              conn = sol._dbpool._u1dbconnections[conn_key]              sol_gen, sol_trans_id = conn._get_generation_info() -            self.assertEqual(sol_gen, known_gen) -            self.assertEqual(sol_trans_id, known_trans_id) - -        # create some documents -        d = _create_docs(None) +            self.assertEqual(sol_gen, target_known_gen) +            self.assertEqual(sol_trans_id, target_known_trans_id)          # sync first time and assert success -        d.addCallback(_assert_transaction_and_sync_logs, 0) -        d.addCallback(lambda _: sol.sync()) -        d.addCallback(lambda _: sol.get_all_docs()) -        d.addCallback(_assert_sync, 0) +        results = yield _create_docs() +        _assert_transaction_and_sync_logs(results, 0) +        yield sol.sync() +        results = yield sol.get_all_docs() +        _assert_sync(results, 0)          # create more docs, sync second time and assert success -        d.addCallback(_create_docs) -        d.addCallback(_assert_transaction_and_sync_logs, 1) -        d.addCallback(lambda _: sol.sync()) -        d.addCallback(lambda _: sol.get_all_docs()) -        d.addCallback(_assert_sync, 1) - -        d.addCallback(lambda _: sol.close()) - -        return d +        results = yield _create_docs() +        _assert_transaction_and_sync_logs(results, 1) +        yield sol.sync() +        results = yield sol.get_all_docs() +        _assert_sync(results, 1)      #      # Concurrency tests  | 
