diff options
Diffstat (limited to 'soledad/src/leap/soledad/tests/test_target.py')
-rw-r--r-- | soledad/src/leap/soledad/tests/test_target.py | 784 |
1 files changed, 784 insertions, 0 deletions
diff --git a/soledad/src/leap/soledad/tests/test_target.py b/soledad/src/leap/soledad/tests/test_target.py new file mode 100644 index 00000000..73c9fe68 --- /dev/null +++ b/soledad/src/leap/soledad/tests/test_target.py @@ -0,0 +1,784 @@ +# -*- coding: utf-8 -*- +# test_target.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Test Leap backend bits. +""" + +import u1db +import os +import ssl +import simplejson as json +import cStringIO + + +from u1db.sync import Synchronizer +from u1db.remote import ( + http_client, + http_database, + http_target, +) + +from leap import soledad +from leap.soledad import ( + target, + auth, +) +from leap.soledad.document import SoledadDocument +from leap.soledad_server import ( + SoledadApp, + SoledadAuthMiddleware, +) + + +from leap.soledad.tests import u1db_tests as tests +from leap.soledad.tests import BaseSoledadTest +from leap.soledad.tests.u1db_tests import test_backends +from leap.soledad.tests.u1db_tests import test_http_database +from leap.soledad.tests.u1db_tests import test_http_client +from leap.soledad.tests.u1db_tests import test_document +from leap.soledad.tests.u1db_tests import test_remote_sync_target +from leap.soledad.tests.u1db_tests import test_https +from leap.soledad.tests.u1db_tests import test_sync + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_backends`. +#----------------------------------------------------------------------------- + +def make_leap_document_for_test(test, doc_id, rev, content, + has_conflicts=False): + return SoledadDocument( + doc_id, rev, content, has_conflicts=has_conflicts) + + +def make_soledad_app(state): + return SoledadApp(state) + + +def make_token_soledad_app(state): + app = SoledadApp(state) + + def verify_token(environ, uuid, token): + if uuid == 'user-uuid' and token == 'auth-token': + return True + return False + + # we test for action authorization in leap.soledad.tests.test_server + def verify_action(environ, uuid): + return True + + application = SoledadAuthMiddleware(app) + application.verify_token = verify_token + application.verify_action = verify_action + return application + + +LEAP_SCENARIOS = [ + ('http', { + 'make_database_for_test': test_backends.make_http_database_for_test, + 'copy_database_for_test': test_backends.copy_http_database_for_test, + 'make_document_for_test': make_leap_document_for_test, + 'make_app_with_state': make_soledad_app}), +] + + +def make_token_http_database_for_test(test, replica_uid): + test.startServer() + test.request_state._create_database(replica_uid) + + class _HTTPDatabaseWithToken( + http_database.HTTPDatabase, auth.TokenBasedAuth): + + def set_token_credentials(self, uuid, token): + auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + + def _sign_request(self, method, url_query, params): + return auth.TokenBasedAuth._sign_request( + self, method, url_query, params) + + http_db = _HTTPDatabaseWithToken(test.getURL('test')) + http_db.set_token_credentials('user-uuid', 'auth-token') + return http_db + + +def copy_token_http_database_for_test(test, db): + # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS + # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE + # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN + # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR + # HOUSE. + http_db = test.request_state._copy_database(db) + http_db.set_token_credentials(http_db, 'user-uuid', 'auth-token') + return http_db + + +class SoledadTests(test_backends.AllDatabaseTests, BaseSoledadTest): + + scenarios = LEAP_SCENARIOS + [ + ('token_http', {'make_database_for_test': + make_token_http_database_for_test, + 'copy_database_for_test': + copy_token_http_database_for_test, + 'make_document_for_test': make_leap_document_for_test, + 'make_app_with_state': make_token_soledad_app, + }) + ] + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_http_client`. +#----------------------------------------------------------------------------- + +class TestSoledadClientBase(test_http_client.TestHTTPClientBase): + """ + This class should be used to test Token auth. + """ + + def getClientWithToken(self, **kwds): + self.startServer() + + class _HTTPClientWithToken( + http_client.HTTPClientBase, auth.TokenBasedAuth): + + def set_token_credentials(self, uuid, token): + auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + + def _sign_request(self, method, url_query, params): + return auth.TokenBasedAuth._sign_request( + self, method, url_query, params) + + return _HTTPClientWithToken(self.getURL('dbase'), **kwds) + + def test_oauth(self): + """ + Suppress oauth test (we test for token auth here). + """ + pass + + def test_oauth_ctr_creds(self): + """ + Suppress oauth test (we test for token auth here). + """ + pass + + def test_oauth_Unauthorized(self): + """ + Suppress oauth test (we test for token auth here). + """ + pass + + def app(self, environ, start_response): + res = test_http_client.TestHTTPClientBase.app( + self, environ, start_response) + if res is not None: + return res + # mime solead application here. + if '/token' in environ['PATH_INFO']: + auth = environ.get(SoledadAuthMiddleware.HTTP_AUTH_KEY) + if not auth: + start_response("401 Unauthorized", + [('Content-Type', 'application/json')]) + return [json.dumps({"error": "unauthorized", + "message": e.message})] + scheme, encoded = auth.split(None, 1) + if scheme.lower() != 'token': + start_response("401 Unauthorized", + [('Content-Type', 'application/json')]) + return [json.dumps({"error": "unauthorized", + "message": e.message})] + uuid, token = encoded.decode('base64').split(':', 1) + if uuid != 'user-uuid' and token != 'auth-token': + return unauth_err("Incorrect address or token.") + start_response("200 OK", [('Content-Type', 'application/json')]) + return [json.dumps([environ['PATH_INFO'], uuid, token])] + + def test_token(self): + """ + Test if token is sent correctly. + """ + cli = self.getClientWithToken() + cli.set_token_credentials('user-uuid', 'auth-token') + res, headers = cli._request('GET', ['doc', 'token']) + self.assertEqual( + ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) + + def test_token_ctr_creds(self): + cli = self.getClientWithToken(creds={'token': { + 'uuid': 'user-uuid', + 'token': 'auth-token', + }}) + res, headers = cli._request('GET', ['doc', 'token']) + self.assertEqual( + ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_document`. +#----------------------------------------------------------------------------- + +class TestSoledadDocument(test_document.TestDocument, BaseSoledadTest): + + scenarios = ([( + 'leap', {'make_document_for_test': make_leap_document_for_test})]) + + +class TestSoledadPyDocument(test_document.TestPyDocument, BaseSoledadTest): + + scenarios = ([( + 'leap', {'make_document_for_test': make_leap_document_for_test})]) + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_remote_sync_target`. +#----------------------------------------------------------------------------- + +class TestSoledadSyncTargetBasics( + test_remote_sync_target.TestHTTPSyncTargetBasics): + """ + Some tests had to be copied to this class so we can instantiate our own + target. + """ + + def test_parse_url(self): + remote_target = target.SoledadSyncTarget('http://127.0.0.1:12345/') + self.assertEqual('http', remote_target._url.scheme) + self.assertEqual('127.0.0.1', remote_target._url.hostname) + self.assertEqual(12345, remote_target._url.port) + self.assertEqual('/', remote_target._url.path) + + +class TestSoledadParsingSyncStream( + test_remote_sync_target.TestParsingSyncStream, + BaseSoledadTest): + """ + Some tests had to be copied to this class so we can instantiate our own + target. + """ + + def setUp(self): + test_remote_sync_target.TestParsingSyncStream.setUp(self) + BaseSoledadTest.setUp(self) + + def tearDown(self): + test_remote_sync_target.TestParsingSyncStream.tearDown(self) + BaseSoledadTest.tearDown(self) + + def test_extra_comma(self): + """ + Test adapted to use encrypted content. + """ + doc = SoledadDocument('i', rev='r') + doc.content = {} + enc_json = target.encrypt_doc(self._soledad._crypto, doc) + tgt = target.SoledadSyncTarget( + "http://foo/foo", crypto=self._soledad._crypto) + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "[\r\n{},\r\n]", None) + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, + '[\r\n{},\r\n{"id": "i", "rev": "r", ' + '"content": %s, "gen": 3, "trans_id": "T-sid"}' + ',\r\n]' % json.dumps(enc_json), + lambda doc, gen, trans_id: None) + + def test_wrong_start(self): + tgt = target.SoledadSyncTarget("http://foo/foo") + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "{}\r\n]", None) + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "\r\n{}\r\n]", None) + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "", None) + + def test_wrong_end(self): + tgt = target.SoledadSyncTarget("http://foo/foo") + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "[\r\n{}", None) + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "[\r\n", None) + + def test_missing_comma(self): + tgt = target.SoledadSyncTarget("http://foo/foo") + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, + '[\r\n{}\r\n{"id": "i", "rev": "r", ' + '"content": "c", "gen": 3}\r\n]', None) + + def test_no_entries(self): + tgt = target.SoledadSyncTarget("http://foo/foo") + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "[\r\n]", None) + + def test_error_in_stream(self): + tgt = target.SoledadSyncTarget("http://foo/foo") + + self.assertRaises(u1db.errors.Unavailable, + tgt._parse_sync_stream, + '[\r\n{"new_generation": 0},' + '\r\n{"error": "unavailable"}\r\n', None) + + self.assertRaises(u1db.errors.Unavailable, + tgt._parse_sync_stream, + '[\r\n{"error": "unavailable"}\r\n', None) + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, + '[\r\n{"error": "?"}\r\n', None) + + +# +# functions for TestRemoteSyncTargets +# + +def leap_sync_target(test, path): + return target.SoledadSyncTarget( + test.getURL(path), crypto=test._soledad._crypto) + + +def token_leap_sync_target(test, path): + st = leap_sync_target(test, path) + st.set_token_credentials('user-uuid', 'auth-token') + return st + + +class TestSoledadSyncTarget( + test_remote_sync_target.TestRemoteSyncTargets, BaseSoledadTest): + + scenarios = [ + ('token_soledad', + {'make_app_with_state': make_token_soledad_app, + 'make_document_for_test': make_leap_document_for_test, + 'sync_target': token_leap_sync_target}), + ] + + def test_sync_exchange_send(self): + """ + Test for sync exchanging send of document. + + This test was adapted to decrypt remote content before assert. + """ + self.startServer() + db = self.request_state._create_database('test') + remote_target = self.getSyncTarget('test') + other_docs = [] + + def receive_doc(doc): + other_docs.append((doc.doc_id, doc.rev, doc.get_json())) + + doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') + new_gen, trans_id = remote_target.sync_exchange( + [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=receive_doc) + self.assertEqual(1, new_gen) + self.assertGetEncryptedDoc( + db, 'doc-here', 'replica:1', '{"value": "here"}', False) + + def test_sync_exchange_send_failure_and_retry_scenario(self): + """ + Test for sync exchange failure and retry. + + This test was adapted to decrypt remote content before assert. + """ + + self.startServer() + + def blackhole_getstderr(inst): + return cStringIO.StringIO() + + self.patch(self.server.RequestHandlerClass, 'get_stderr', + blackhole_getstderr) + db = self.request_state._create_database('test') + _put_doc_if_newer = db._put_doc_if_newer + trigger_ids = ['doc-here2'] + + def bomb_put_doc_if_newer(doc, save_conflict, + replica_uid=None, replica_gen=None, + replica_trans_id=None): + if doc.doc_id in trigger_ids: + raise Exception + return _put_doc_if_newer(doc, save_conflict=save_conflict, + replica_uid=replica_uid, + replica_gen=replica_gen, + replica_trans_id=replica_trans_id) + self.patch(db, '_put_doc_if_newer', bomb_put_doc_if_newer) + remote_target = self.getSyncTarget('test') + other_changes = [] + + def receive_doc(doc, gen, trans_id): + other_changes.append( + (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id)) + + doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}') + doc2 = self.make_document('doc-here2', 'replica:1', + '{"value": "here2"}') + self.assertRaises( + u1db.errors.HTTPError, + remote_target.sync_exchange, + [(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')], + 'replica', last_known_generation=0, last_known_trans_id=None, + return_doc_cb=receive_doc) + self.assertGetEncryptedDoc( + db, 'doc-here', 'replica:1', '{"value": "here"}', + False) + self.assertEqual( + (10, 'T-sid'), db._get_replica_gen_and_trans_id('replica')) + self.assertEqual([], other_changes) + # retry + trigger_ids = [] + new_gen, trans_id = remote_target.sync_exchange( + [(doc2, 11, 'T-sud')], 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=receive_doc) + self.assertGetEncryptedDoc( + db, 'doc-here2', 'replica:1', '{"value": "here2"}', + False) + self.assertEqual( + (11, 'T-sud'), db._get_replica_gen_and_trans_id('replica')) + self.assertEqual(2, new_gen) + # bounced back to us + self.assertEqual( + ('doc-here', 'replica:1', '{"value": "here"}', 1), + other_changes[0][:-1]) + + def test_sync_exchange_send_ensure_callback(self): + """ + Test for sync exchange failure and retry. + + This test was adapted to decrypt remote content before assert. + """ + self.startServer() + remote_target = self.getSyncTarget('test') + other_docs = [] + replica_uid_box = [] + + def receive_doc(doc): + other_docs.append((doc.doc_id, doc.rev, doc.get_json())) + + def ensure_cb(replica_uid): + replica_uid_box.append(replica_uid) + + doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') + new_gen, trans_id = remote_target.sync_exchange( + [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=receive_doc, + ensure_callback=ensure_cb) + self.assertEqual(1, new_gen) + db = self.request_state.open_database('test') + self.assertEqual(1, len(replica_uid_box)) + self.assertEqual(db._replica_uid, replica_uid_box[0]) + self.assertGetEncryptedDoc( + db, 'doc-here', 'replica:1', '{"value": "here"}', False) + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_https`. +#----------------------------------------------------------------------------- + +def token_leap_https_sync_target(test, host, path): + _, port = test.server.server_address + st = target.SoledadSyncTarget( + 'https://%s:%d/%s' % (host, port, path), + crypto=test._soledad._crypto) + st.set_token_credentials('user-uuid', 'auth-token') + return st + + +class TestSoledadSyncTargetHttpsSupport(test_https.TestHttpSyncTargetHttpsSupport, + BaseSoledadTest): + + scenarios = [ + ('token_soledad_https', + {'server_def': test_https.https_server_def, + 'make_app_with_state': make_token_soledad_app, + 'make_document_for_test': make_leap_document_for_test, + 'sync_target': token_leap_https_sync_target}), + ] + + def setUp(self): + # the parent constructor undoes our SSL monkey patch to ensure tests + # run smoothly with standard u1db. + test_https.TestHttpSyncTargetHttpsSupport.setUp(self) + # so here monkey patch again to test our functionality. + http_client._VerifiedHTTPSConnection = soledad.VerifiedHTTPSConnection + soledad.SOLEDAD_CERT = http_client.CA_CERTS + + def test_working(self): + """ + Test that SSL connections work well. + + This test was adapted to patch Soledad's HTTPS connection custom class + with the intended CA certificates. + """ + self.startServer() + db = self.request_state._create_database('test') + self.patch(soledad, 'SOLEDAD_CERT', self.cacert_pem) + remote_target = self.getSyncTarget('localhost', 'test') + remote_target.record_sync_info('other-id', 2, 'T-id') + self.assertEqual( + (2, 'T-id'), db._get_replica_gen_and_trans_id('other-id')) + + def test_host_mismatch(self): + """ + Test that SSL connections to a hostname different than the one in the + certificate raise CertificateError. + + This test was adapted to patch Soledad's HTTPS connection custom class + with the intended CA certificates. + """ + self.startServer() + self.request_state._create_database('test') + self.patch(soledad, 'SOLEDAD_CERT', self.cacert_pem) + remote_target = self.getSyncTarget('127.0.0.1', 'test') + self.assertRaises( + http_client.CertificateError, remote_target.record_sync_info, + 'other-id', 2, 'T-id') + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_http_database`. +#----------------------------------------------------------------------------- + +class _HTTPDatabase(http_database.HTTPDatabase, auth.TokenBasedAuth): + """ + Wraps our token auth implementation. + """ + + def set_token_credentials(self, uuid, token): + auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + + def _sign_request(self, method, url_query, params): + return auth.TokenBasedAuth._sign_request( + self, method, url_query, params) + + +class TestHTTPDatabaseWithCreds( + test_http_database.TestHTTPDatabaseCtrWithCreds): + + def test_get_sync_target_inherits_token_credentials(self): + # this test was from TestDatabaseSimpleOperations but we put it here + # for convenience. + self.db = _HTTPDatabase('dbase') + self.db.set_token_credentials('user-uuid', 'auth-token') + st = self.db.get_sync_target() + self.assertEqual(self.db._creds, st._creds) + + def test_ctr_with_creds(self): + db1 = _HTTPDatabase('http://dbs/db', creds={'token': { + 'uuid': 'user-uuid', + 'token': 'auth-token', + }}) + self.assertIn('token', db1._creds) + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_sync`. +#----------------------------------------------------------------------------- + +def _make_local_db_and_leap_target(test, path='test'): + test.startServer() + db = test.request_state._create_database(os.path.basename(path)) + st = target.SoledadSyncTarget.connect( + test.getURL(path), crypto=test._soledad._crypto) + return db, st + + +def _make_local_db_and_token_leap_target(test): + db, st = _make_local_db_and_leap_target(test, 'test') + st.set_token_credentials('user-uuid', 'auth-token') + return db, st + + +target_scenarios = [ + ('token_leap', {'create_db_and_target': + _make_local_db_and_token_leap_target, + 'make_app_with_state': make_token_soledad_app}), +] + + +class SoledadDatabaseSyncTargetTests( + test_sync.DatabaseSyncTargetTests, BaseSoledadTest): + + scenarios = ( + tests.multiply_scenarios( + tests.DatabaseBaseTests.scenarios, + target_scenarios)) + + def test_sync_exchange(self): + """ + Test sync exchange. + + This test was adapted to decrypt remote content before assert. + """ + sol = _make_local_db_and_leap_target(self) + docs_by_gen = [ + (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10, + 'T-sid')] + new_gen, trans_id = self.st.sync_exchange( + docs_by_gen, 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertGetEncryptedDoc( + self.db, 'doc-id', 'replica:1', tests.simple_doc, False) + self.assertTransactionLog(['doc-id'], self.db) + last_trans_id = self.getLastTransId(self.db) + self.assertEqual(([], 1, last_trans_id), + (self.other_changes, new_gen, last_trans_id)) + self.assertEqual(10, self.st.get_sync_info('replica')[3]) + + def test_sync_exchange_push_many(self): + """ + Test sync exchange. + + This test was adapted to decrypt remote content before assert. + """ + docs_by_gen = [ + (self.make_document( + 'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'), + (self.make_document( + 'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')] + new_gen, trans_id = self.st.sync_exchange( + docs_by_gen, 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertGetEncryptedDoc( + self.db, 'doc-id', 'replica:1', tests.simple_doc, False) + self.assertGetEncryptedDoc( + self.db, 'doc-id2', 'replica:1', tests.nested_doc, False) + self.assertTransactionLog(['doc-id', 'doc-id2'], self.db) + last_trans_id = self.getLastTransId(self.db) + self.assertEqual(([], 2, last_trans_id), + (self.other_changes, new_gen, trans_id)) + self.assertEqual(11, self.st.get_sync_info('replica')[3]) + + def test_sync_exchange_returns_many_new_docs(self): + """ + Test sync exchange. + + This test was adapted to avoid JSON serialization comparison as local + and remote representations might differ. It looks directly at the + doc's contents instead. + """ + doc = self.db.create_doc_from_json(tests.simple_doc) + doc2 = self.db.create_doc_from_json(tests.nested_doc) + self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) + new_gen, _ = self.st.sync_exchange( + [], 'other-replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) + self.assertEqual(2, new_gen) + self.assertEqual( + [(doc.doc_id, doc.rev, 1), + (doc2.doc_id, doc2.rev, 2)], + [c[:-3] + c[-2:-1] for c in self.other_changes]) + self.assertEqual( + json.loads(tests.simple_doc), + json.loads(self.other_changes[0][2])) + self.assertEqual( + json.loads(tests.nested_doc), + json.loads(self.other_changes[1][2])) + if self.whitebox: + self.assertEqual( + self.db._last_exchange_log['return'], + {'last_gen': 2, 'docs': + [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]}) + + +class TestSoledadDbSync(test_sync.TestDbSync, BaseSoledadTest): + """Test db.sync remote sync shortcut""" + + scenarios = [ + ('py-http', { + 'make_app_with_state': make_soledad_app, + 'make_database_for_test': tests.make_memory_database_for_test, + }), + ('py-token-http', { + 'make_app_with_state': make_token_soledad_app, + 'make_database_for_test': tests.make_memory_database_for_test, + 'token': True + }), + ] + + oauth = False + token = False + + def do_sync(self, target_name): + """ + Perform sync using SoledadSyncTarget and Token auth. + """ + if self.token: + extra = dict(creds={'token': { + 'uuid': 'user-uuid', + 'token': 'auth-token', + }}) + target_url = self.getURL(target_name) + return Synchronizer( + self.db, + target.SoledadSyncTarget( + target_url, + crypto=self._soledad._crypto, + **extra)).sync(autocreate=True) + else: + return test_sync.TestDbSync.do_sync(self, target_name) + + def test_db_sync(self): + """ + Test sync. + + Adapted to check for encrypted content. + """ + doc1 = self.db.create_doc_from_json(tests.simple_doc) + doc2 = self.db2.create_doc_from_json(tests.nested_doc) + local_gen_before_sync = self.do_sync('test2.db') + gen, _, changes = self.db.whats_changed(local_gen_before_sync) + self.assertEqual(1, len(changes)) + self.assertEqual(doc2.doc_id, changes[0][0]) + self.assertEqual(1, gen - local_gen_before_sync) + self.assertGetEncryptedDoc( + self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False) + self.assertGetEncryptedDoc( + self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False) + + def test_db_sync_autocreate(self): + """ + Test sync. + + Adapted to check for encrypted content. + """ + doc1 = self.db.create_doc_from_json(tests.simple_doc) + local_gen_before_sync = self.do_sync('test3.db') + gen, _, changes = self.db.whats_changed(local_gen_before_sync) + self.assertEqual(0, gen - local_gen_before_sync) + db3 = self.request_state.open_database('test3.db') + gen, _, changes = db3.whats_changed() + self.assertEqual(1, len(changes)) + self.assertEqual(doc1.doc_id, changes[0][0]) + self.assertGetEncryptedDoc( + db3, doc1.doc_id, doc1.rev, tests.simple_doc, False) + t_gen, _ = self.db._get_replica_gen_and_trans_id('test3.db') + s_gen, _ = db3._get_replica_gen_and_trans_id('test1') + self.assertEqual(1, t_gen) + self.assertEqual(1, s_gen) + + +load_tests = tests.load_with_scenarios |