summaryrefslogtreecommitdiff
path: root/testing/tests/couch/test_couch_operations_atomicity.py
diff options
context:
space:
mode:
Diffstat (limited to 'testing/tests/couch/test_couch_operations_atomicity.py')
-rw-r--r--testing/tests/couch/test_couch_operations_atomicity.py371
1 files changed, 371 insertions, 0 deletions
diff --git a/testing/tests/couch/test_couch_operations_atomicity.py b/testing/tests/couch/test_couch_operations_atomicity.py
new file mode 100644
index 00000000..aec9c6cf
--- /dev/null
+++ b/testing/tests/couch/test_couch_operations_atomicity.py
@@ -0,0 +1,371 @@
+# -*- coding: utf-8 -*-
+# test_couch_operations_atomicity.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test atomicity of couch operations.
+"""
+import os
+import tempfile
+import threading
+
+from urlparse import urljoin
+from twisted.internet import defer
+from uuid import uuid4
+
+from leap.soledad.client import Soledad
+from leap.soledad.common.couch.state import CouchServerState
+from leap.soledad.common.couch import CouchDatabase
+
+from test_soledad.util import (
+ make_token_soledad_app,
+ make_soledad_document_for_test,
+ soledad_sync_target,
+)
+from test_soledad.util import CouchDBTestCase
+from test_soledad.u1db_tests import TestCaseWithServer
+
+
+REPEAT_TIMES = 20
+
+
+class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
+
+ @staticmethod
+ def make_app_after_state(state):
+ return make_token_soledad_app(state)
+
+ make_document_for_test = make_soledad_document_for_test
+
+ sync_target = soledad_sync_target
+
+ def _soledad_instance(self, user=None, passphrase=u'123',
+ prefix='',
+ secrets_path='secrets.json',
+ local_db_path='soledad.u1db', server_url='',
+ cert_file=None, auth_token=None):
+ """
+ Instantiate Soledad.
+ """
+ user = user or self.user
+
+ # this callback ensures we save a document which is sent to the shared
+ # db.
+ def _put_doc_side_effect(doc):
+ self._doc_put = doc
+
+ soledad = Soledad(
+ user,
+ passphrase,
+ secrets_path=os.path.join(self.tempdir, prefix, secrets_path),
+ local_db_path=os.path.join(
+ self.tempdir, prefix, local_db_path),
+ server_url=server_url,
+ cert_file=cert_file,
+ auth_token=auth_token,
+ shared_db=self.get_default_shared_mock(_put_doc_side_effect))
+ self.addCleanup(soledad.close)
+ return soledad
+
+ def make_app(self):
+ self.request_state = CouchServerState(self.couch_url)
+ return self.make_app_after_state(self.request_state)
+
+ def setUp(self):
+ TestCaseWithServer.setUp(self)
+ CouchDBTestCase.setUp(self)
+ self.user = ('user-%s' % uuid4().hex)
+ self.db = CouchDatabase.open_database(
+ urljoin(self.couch_url, 'user-' + self.user),
+ create=True,
+ replica_uid='replica',
+ ensure_ddocs=True)
+ self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
+ self.startTwistedServer()
+
+ def tearDown(self):
+ self.db.delete_database()
+ self.db.close()
+ CouchDBTestCase.tearDown(self)
+ TestCaseWithServer.tearDown(self)
+
+ #
+ # Sequential tests
+ #
+
+ def test_correct_transaction_log_after_sequential_puts(self):
+ """
+ Assert that the transaction_log increases accordingly with sequential
+ puts.
+ """
+ doc = self.db.create_doc({'ops': 0})
+ docs = [doc.doc_id]
+ for i in range(0, REPEAT_TIMES):
+ self.assertEqual(
+ i + 1, len(self.db._get_transaction_log()))
+ doc.content['ops'] += 1
+ self.db.put_doc(doc)
+ docs.append(doc.doc_id)
+
+ # assert length of transaction_log
+ transaction_log = self.db._get_transaction_log()
+ self.assertEqual(
+ REPEAT_TIMES + 1, len(transaction_log))
+
+ # assert that all entries in the log belong to the same doc
+ self.assertEqual(REPEAT_TIMES + 1, len(docs))
+ for doc_id in docs:
+ self.assertEqual(
+ REPEAT_TIMES + 1,
+ len(filter(lambda t: t[0] == doc_id, transaction_log)))
+
+ def test_correct_transaction_log_after_sequential_deletes(self):
+ """
+ Assert that the transaction_log increases accordingly with sequential
+ puts and deletes.
+ """
+ docs = []
+ for i in range(0, REPEAT_TIMES):
+ doc = self.db.create_doc({'ops': 0})
+ self.assertEqual(
+ 2 * i + 1, len(self.db._get_transaction_log()))
+ docs.append(doc.doc_id)
+ self.db.delete_doc(doc)
+ self.assertEqual(
+ 2 * i + 2, len(self.db._get_transaction_log()))
+
+ # assert length of transaction_log
+ transaction_log = self.db._get_transaction_log()
+ self.assertEqual(
+ 2 * REPEAT_TIMES, len(transaction_log))
+
+ # assert that each doc appears twice in the transaction_log
+ self.assertEqual(REPEAT_TIMES, len(docs))
+ for doc_id in docs:
+ self.assertEqual(
+ 2,
+ len(filter(lambda t: t[0] == doc_id, transaction_log)))
+
+ @defer.inlineCallbacks
+ def test_correct_sync_log_after_sequential_syncs(self):
+ """
+ Assert that the sync_log increases accordingly with sequential syncs.
+ """
+ sol = self._soledad_instance(
+ auth_token='auth-token',
+ server_url=self.getURL())
+ source_replica_uid = sol._dbpool.replica_uid
+
+ def _create_docs():
+ deferreds = []
+ for i in xrange(0, REPEAT_TIMES):
+ deferreds.append(sol.create_doc({}))
+ return defer.gatherResults(deferreds)
+
+ def _assert_transaction_and_sync_logs(results, sync_idx):
+ # assert sizes of transaction and sync logs
+ self.assertEqual(
+ sync_idx * REPEAT_TIMES,
+ len(self.db._get_transaction_log()))
+ gen, _ = self.db._get_replica_gen_and_trans_id(source_replica_uid)
+ self.assertEqual(sync_idx * REPEAT_TIMES, gen)
+
+ def _assert_sync(results, sync_idx):
+ gen, docs = results
+ self.assertEqual((sync_idx + 1) * REPEAT_TIMES, gen)
+ self.assertEqual((sync_idx + 1) * REPEAT_TIMES, len(docs))
+ # assert sizes of transaction and sync logs
+ self.assertEqual((sync_idx + 1) * REPEAT_TIMES,
+ len(self.db._get_transaction_log()))
+ target_known_gen, target_known_trans_id = \
+ self.db._get_replica_gen_and_trans_id(source_replica_uid)
+ # assert it has the correct gen and trans_id
+ conn_key = sol._dbpool._u1dbconnections.keys().pop()
+ conn = sol._dbpool._u1dbconnections[conn_key]
+ sol_gen, sol_trans_id = conn._get_generation_info()
+ self.assertEqual(sol_gen, target_known_gen)
+ self.assertEqual(sol_trans_id, target_known_trans_id)
+
+ # sync first time and assert success
+ results = yield _create_docs()
+ _assert_transaction_and_sync_logs(results, 0)
+ yield sol.sync()
+ results = yield sol.get_all_docs()
+ _assert_sync(results, 0)
+
+ # create more docs, sync second time and assert success
+ results = yield _create_docs()
+ _assert_transaction_and_sync_logs(results, 1)
+ yield sol.sync()
+ results = yield sol.get_all_docs()
+ _assert_sync(results, 1)
+
+ #
+ # Concurrency tests
+ #
+
+ class _WorkerThread(threading.Thread):
+
+ def __init__(self, params, run_method):
+ threading.Thread.__init__(self)
+ self._params = params
+ self._run_method = run_method
+
+ def run(self):
+ self._run_method(self)
+
+ def test_correct_transaction_log_after_concurrent_puts(self):
+ """
+ Assert that the transaction_log increases accordingly with concurrent
+ puts.
+ """
+ pool = threading.BoundedSemaphore(value=1)
+ threads = []
+ docs = []
+
+ def _run_method(self):
+ doc = self._params['db'].create_doc({})
+ pool.acquire()
+ self._params['docs'].append(doc.doc_id)
+ pool.release()
+
+ for i in range(0, REPEAT_TIMES):
+ thread = self._WorkerThread(
+ {'docs': docs, 'db': self.db},
+ _run_method)
+ thread.start()
+ threads.append(thread)
+
+ for thread in threads:
+ thread.join()
+
+ # assert length of transaction_log
+ transaction_log = self.db._get_transaction_log()
+ self.assertEqual(
+ REPEAT_TIMES, len(transaction_log))
+
+ # assert all documents are in the log
+ self.assertEqual(REPEAT_TIMES, len(docs))
+ for doc_id in docs:
+ self.assertEqual(
+ 1,
+ len(filter(lambda t: t[0] == doc_id, transaction_log)))
+
+ def test_correct_transaction_log_after_concurrent_deletes(self):
+ """
+ Assert that the transaction_log increases accordingly with concurrent
+ puts and deletes.
+ """
+ threads = []
+ docs = []
+ pool = threading.BoundedSemaphore(value=1)
+
+ # create/delete method that will be run concurrently
+ def _run_method(self):
+ doc = self._params['db'].create_doc({})
+ pool.acquire()
+ docs.append(doc.doc_id)
+ pool.release()
+ self._params['db'].delete_doc(doc)
+
+ # launch concurrent threads
+ for i in range(0, REPEAT_TIMES):
+ thread = self._WorkerThread({'db': self.db}, _run_method)
+ thread.start()
+ threads.append(thread)
+
+ # wait for threads to finish
+ for thread in threads:
+ thread.join()
+
+ # assert transaction log
+ transaction_log = self.db._get_transaction_log()
+ self.assertEqual(
+ 2 * REPEAT_TIMES, len(transaction_log))
+ # assert that each doc appears twice in the transaction_log
+ self.assertEqual(REPEAT_TIMES, len(docs))
+ for doc_id in docs:
+ self.assertEqual(
+ 2,
+ len(filter(lambda t: t[0] == doc_id, transaction_log)))
+
+ def test_correct_sync_log_after_concurrent_puts_and_sync(self):
+ """
+ Assert that the sync_log is correct after concurrent syncs.
+ """
+ docs = []
+
+ sol = self._soledad_instance(
+ auth_token='auth-token',
+ server_url=self.getURL())
+
+ def _save_doc_ids(results):
+ for doc in results:
+ docs.append(doc.doc_id)
+
+ # create documents in parallel
+ deferreds = []
+ for i in range(0, REPEAT_TIMES):
+ d = sol.create_doc({})
+ deferreds.append(d)
+
+ # wait for documents creation and sync
+ d = defer.gatherResults(deferreds)
+ d.addCallback(_save_doc_ids)
+ d.addCallback(lambda _: sol.sync())
+
+ def _assert_logs(results):
+ transaction_log = self.db._get_transaction_log()
+ self.assertEqual(REPEAT_TIMES, len(transaction_log))
+ # assert all documents are in the remote log
+ self.assertEqual(REPEAT_TIMES, len(docs))
+ for doc_id in docs:
+ self.assertEqual(
+ 1,
+ len(filter(lambda t: t[0] == doc_id, transaction_log)))
+
+ d.addCallback(_assert_logs)
+ d.addCallback(lambda _: sol.close())
+
+ return d
+
+ @defer.inlineCallbacks
+ def test_concurrent_syncs_do_not_fail(self):
+ """
+ Assert that concurrent attempts to sync end up being executed
+ sequentially and do not fail.
+ """
+ docs = []
+
+ sol = self._soledad_instance(
+ auth_token='auth-token',
+ server_url=self.getURL())
+
+ deferreds = []
+ for i in xrange(0, REPEAT_TIMES):
+ d = sol.create_doc({})
+ d.addCallback(lambda doc: docs.append(doc.doc_id))
+ d.addCallback(lambda _: sol.sync())
+ deferreds.append(d)
+ yield defer.gatherResults(deferreds, consumeErrors=True)
+
+ transaction_log = self.db._get_transaction_log()
+ self.assertEqual(REPEAT_TIMES, len(transaction_log))
+ # assert all documents are in the remote log
+ self.assertEqual(REPEAT_TIMES, len(docs))
+ for doc_id in docs:
+ self.assertEqual(
+ 1,
+ len(filter(lambda t: t[0] == doc_id, transaction_log)))