diff options
Diffstat (limited to 'common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py')
-rw-r--r-- | common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py | 206 |
1 files changed, 101 insertions, 105 deletions
diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index 6465eb80..83cee469 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -15,26 +15,25 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ -Test atomocity for couch operations. +Test atomicity of couch operations. """ import os -import mock import tempfile import threading - from urlparse import urljoin - +from twisted.internet import defer from leap.soledad.client import Soledad from leap.soledad.common.couch import CouchDatabase, CouchServerState -from leap.soledad.common.tests.test_couch import CouchDBTestCase -from leap.soledad.common.tests.u1db_tests import TestCaseWithServer -from leap.soledad.common.tests.test_sync_target import ( + +from leap.soledad.common.tests.util import ( make_token_soledad_app, - make_leap_document_for_test, - token_leap_sync_target, + make_soledad_document_for_test, + token_soledad_sync_target, ) +from leap.soledad.common.tests.test_couch import CouchDBTestCase +from leap.soledad.common.tests.u1db_tests import TestCaseWithServer from leap.soledad.common.tests.test_server import _couch_ensure_database @@ -52,15 +51,15 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): def make_app_after_state(state): return make_token_soledad_app(state) - make_document_for_test = make_leap_document_for_test + make_document_for_test = make_soledad_document_for_test - sync_target = token_leap_sync_target + sync_target = token_soledad_sync_target def _soledad_instance(self, user='user-uuid', passphrase=u'123', prefix='', - secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME, + secrets_path='secrets.json', local_db_path='soledad.u1db', server_url='', - cert_file=None, auth_token=None, secret_id=None): + cert_file=None, auth_token=None): """ Instantiate Soledad. """ @@ -70,19 +69,6 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): def _put_doc_side_effect(doc): self._doc_put = doc - # we need a mocked shared db or else Soledad will try to access the - # network to find if there are uploaded secrets. - class MockSharedDB(object): - - get_doc = mock.Mock(return_value=None) - put_doc = mock.Mock(side_effect=_put_doc_side_effect) - lock = mock.Mock(return_value=('atoken', 300)) - unlock = mock.Mock() - - def __call__(self): - return self - - Soledad._shared_db = MockSharedDB() return Soledad( user, passphrase, @@ -92,7 +78,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): server_url=server_url, cert_file=cert_file, auth_token=auth_token, - secret_id=secret_id) + shared_db=self.get_default_shared_mock(_put_doc_side_effect)) def make_app(self): self.request_state = CouchServerState(self._couch_url, 'shared', @@ -126,7 +112,6 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): puts. """ doc = self.db.create_doc({'ops': 0}) - ops = 1 docs = [doc.doc_id] for i in range(0, REPEAT_TIMES): self.assertEqual( @@ -183,24 +168,27 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): auth_token='auth-token', server_url=self.getURL()) - def _create_docs_and_sync(sol, syncs): - # create a lot of documents - for i in range(0, REPEAT_TIMES): - sol.create_doc({}) + def _create_docs(results): + deferreds = [] + for i in xrange(0, REPEAT_TIMES): + deferreds.append(sol.create_doc({})) + return defer.DeferredList(deferreds) + + def _assert_transaction_and_sync_logs(results, sync_idx): # assert sizes of transaction and sync logs self.assertEqual( - syncs*REPEAT_TIMES, + sync_idx*REPEAT_TIMES, len(self.db._get_transaction_log())) self.assertEqual( - 1 if syncs > 0 else 0, + 1 if sync_idx > 0 else 0, len(self.db._database.view('syncs/log').rows)) - # sync to the remote db - sol.sync() - gen, docs = self.db.get_all_docs() - self.assertEqual((syncs+1)*REPEAT_TIMES, gen) - self.assertEqual((syncs+1)*REPEAT_TIMES, len(docs)) + + def _assert_sync(results, sync_idx): + gen, docs = results + self.assertEqual((sync_idx+1)*REPEAT_TIMES, gen) + self.assertEqual((sync_idx+1)*REPEAT_TIMES, len(docs)) # assert sizes of transaction and sync logs - self.assertEqual((syncs+1)*REPEAT_TIMES, + self.assertEqual((sync_idx+1)*REPEAT_TIMES, len(self.db._get_transaction_log())) sync_log_rows = self.db._database.view('syncs/log').rows sync_log = sync_log_rows[0].value @@ -210,14 +198,32 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): # assert sync_log has exactly 1 row self.assertEqual(1, len(sync_log_rows)) # assert it has the correct replica_uid, gen and trans_id - self.assertEqual(sol._db._replica_uid, replica_uid) - sol_gen, sol_trans_id = sol._db._get_generation_info() + self.assertEqual(sol._dbpool.replica_uid, replica_uid) + conn_key = sol._dbpool._u1dbconnections.keys().pop() + conn = sol._dbpool._u1dbconnections[conn_key] + sol_gen, sol_trans_id = conn._get_generation_info() self.assertEqual(sol_gen, known_gen) self.assertEqual(sol_trans_id, known_trans_id) + + # create some documents + d = _create_docs(None) - _create_docs_and_sync(sol, 0) - _create_docs_and_sync(sol, 1) - sol.close() + # sync first time and assert success + d.addCallback(_assert_transaction_and_sync_logs, 0) + d.addCallback(lambda _: sol.sync()) + d.addCallback(lambda _: sol.get_all_docs()) + d.addCallback(_assert_sync, 0) + + # create more docs, sync second time and assert success + d.addCallback(_create_docs) + d.addCallback(_assert_transaction_and_sync_logs, 1) + d.addCallback(lambda _: sol.sync()) + d.addCallback(lambda _: sol.get_all_docs()) + d.addCallback(_assert_sync, 1) + + d.addCallback(lambda _: sol.close()) + + return d # # Concurrency tests @@ -313,86 +319,76 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): """ Assert that the sync_log is correct after concurrent syncs. """ - threads = [] docs = [] - pool = threading.BoundedSemaphore(value=1) + self.startServer() + sol = self._soledad_instance( auth_token='auth-token', server_url=self.getURL()) - def _run_method(self): - # create a lot of documents - doc = self._params['sol'].create_doc({}) - pool.acquire() - docs.append(doc.doc_id) - pool.release() + def _save_doc_ids(results): + for doc in results: + docs.append(doc.doc_id) - # launch threads to create documents in parallel + # create documents in parallel + deferreds = [] for i in range(0, REPEAT_TIMES): - thread = self._WorkerThread( - {'sol': sol, 'syncs': i}, - _run_method) - thread.start() - threads.append(thread) + d = sol.create_doc({}) + deferreds.append(d) - # wait for threads to finish - for thread in threads: - thread.join() + # wait for documents creation and sync + d = defer.gatherResults(deferreds) + d.addCallback(_save_doc_ids) + d.addCallback(lambda _: sol.sync()) - # do the sync! - sol.sync() + def _assert_logs(results): + transaction_log = self.db._get_transaction_log() + self.assertEqual(REPEAT_TIMES, len(transaction_log)) + # assert all documents are in the remote log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) - transaction_log = self.db._get_transaction_log() - self.assertEqual(REPEAT_TIMES, len(transaction_log)) - # assert all documents are in the remote log - self.assertEqual(REPEAT_TIMES, len(docs)) - for doc_id in docs: - self.assertEqual( - 1, - len(filter(lambda t: t[0] == doc_id, transaction_log))) - sol.close() + d.addCallback(_assert_logs) + d.addCallback(lambda _: sol.close()) + + return d def test_concurrent_syncs_do_not_fail(self): """ Assert that concurrent attempts to sync end up being executed sequentially and do not fail. """ - threads = [] docs = [] - pool = threading.BoundedSemaphore(value=1) + self.startServer() + sol = self._soledad_instance( auth_token='auth-token', server_url=self.getURL()) - def _run_method(self): - # create a lot of documents - doc = self._params['sol'].create_doc({}) - # do the sync! - sol.sync() - pool.acquire() - docs.append(doc.doc_id) - pool.release() - - # launch threads to create documents in parallel - for i in range(0, REPEAT_TIMES): - thread = self._WorkerThread( - {'sol': sol, 'syncs': i}, - _run_method) - thread.start() - threads.append(thread) - - # wait for threads to finish - for thread in threads: - thread.join() - - transaction_log = self.db._get_transaction_log() - self.assertEqual(REPEAT_TIMES, len(transaction_log)) - # assert all documents are in the remote log - self.assertEqual(REPEAT_TIMES, len(docs)) - for doc_id in docs: - self.assertEqual( - 1, - len(filter(lambda t: t[0] == doc_id, transaction_log))) - sol.close() + deferreds = [] + for i in xrange(0, REPEAT_TIMES): + d = sol.create_doc({}) + d.addCallback(lambda doc: docs.append(doc.doc_id)) + d.addCallback(lambda _: sol.sync()) + deferreds.append(d) + + def _assert_logs(results): + transaction_log = self.db._get_transaction_log() + self.assertEqual(REPEAT_TIMES, len(transaction_log)) + # assert all documents are in the remote log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + d = defer.gatherResults(deferreds) + d.addCallback(_assert_logs) + d.addCallback(lambda _: sol.close()) + + return d |