# -*- coding: utf-8 -*-
# test_leap_backend.py
# Copyright (C) 2013 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


"""
Test Leap backend bits.
"""

import u1db
import os
import ssl
try:
    import simplejson as json
except ImportError:
    import json  # noqa
import cStringIO

from u1db.sync import Synchronizer
from u1db.remote import (
    http_client,
    http_database,
    http_target,
)

from leap import soledad
from leap.soledad.backends import leap_backend
from leap.soledad.server import (
    SoledadApp,
    SoledadAuthMiddleware
)
from leap.soledad import auth


from leap.soledad.tests import u1db_tests as tests
from leap.soledad.tests.u1db_tests.test_remote_sync_target import (
    make_oauth_http_app,
)
from leap.soledad.tests import BaseSoledadTest
from leap.soledad.tests.u1db_tests import test_backends
from leap.soledad.tests.u1db_tests import test_http_database
from leap.soledad.tests.u1db_tests import test_http_client
from leap.soledad.tests.u1db_tests import test_document
from leap.soledad.tests.u1db_tests import test_remote_sync_target
from leap.soledad.tests.u1db_tests import test_https
from leap.soledad.tests.u1db_tests import test_sync


#-----------------------------------------------------------------------------
# The following tests come from `u1db.tests.test_backends`.
#-----------------------------------------------------------------------------

def make_leap_document_for_test(test, doc_id, rev, content,
                                has_conflicts=False):
    return leap_backend.LeapDocument(
        doc_id, rev, content, has_conflicts=has_conflicts)


def make_soledad_app(state):
    return SoledadApp(state)


def make_token_soledad_app(state):
    app = SoledadApp(state)

    def verify_token(environ, uuid, token):
        if uuid == 'user-uuid' and token == 'auth-token':
            return True
        return False

    application = SoledadAuthMiddleware(app)
    application.verify_token = verify_token
    return application


LEAP_SCENARIOS = [
    ('http', {
        'make_database_for_test': test_backends.make_http_database_for_test,
        'copy_database_for_test': test_backends.copy_http_database_for_test,
        'make_document_for_test': make_leap_document_for_test,
        'make_app_with_state': make_soledad_app}),
]


def make_token_http_database_for_test(test, replica_uid):
    test.startServer()
    test.request_state._create_database(replica_uid)

    class _HTTPDatabaseWithToken(
            http_database.HTTPDatabase, auth.TokenBasedAuth):

        def set_token_credentials(self, uuid, token):
            auth.TokenBasedAuth.set_token_credentials(self, uuid, token)

        def _sign_request(self, method, url_query, params):
            return auth.TokenBasedAuth._sign_request(
                self, method, url_query, params)

    http_db = _HTTPDatabaseWithToken(test.getURL('test'))
    http_db.set_token_credentials('user-uuid', 'auth-token')
    return http_db


def copy_token_http_database_for_test(test, db):
    # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS
    # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE
    # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN
    # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR
    # HOUSE.
    http_db = test.request_state._copy_database(db)
    http_db.set_token_credentials(http_db, 'user-uuid', 'auth-token')
    return http_db


class LeapTests(test_backends.AllDatabaseTests, BaseSoledadTest):

    scenarios = LEAP_SCENARIOS + [
        ('oauth_http', {'make_database_for_test':
                        test_backends.make_oauth_http_database_for_test,
                        'copy_database_for_test':
                        test_backends.copy_oauth_http_database_for_test,
                        'make_document_for_test': make_leap_document_for_test,
                        'make_app_with_state': make_oauth_http_app}),
        ('token_http', {'make_database_for_test':
                        make_token_http_database_for_test,
                        'copy_database_for_test':
                        copy_token_http_database_for_test,
                        'make_document_for_test': make_leap_document_for_test,
                        'make_app_with_state': make_token_soledad_app,
                        })
    ]


#-----------------------------------------------------------------------------
# The following tests come from `u1db.tests.test_http_client`.
#-----------------------------------------------------------------------------

class TestLeapClientBase(test_http_client.TestHTTPClientBase):
    """
    This class should be used to test Token auth.
    """

    def getClientWithToken(self, **kwds):
        self.startServer()

        class _HTTPClientWithToken(
                http_client.HTTPClientBase, auth.TokenBasedAuth):

            def set_token_credentials(self, uuid, token):
                auth.TokenBasedAuth.set_token_credentials(self, uuid, token)

            def _sign_request(self, method, url_query, params):
                return auth.TokenBasedAuth._sign_request(
                    self, method, url_query, params)

        return _HTTPClientWithToken(self.getURL('dbase'), **kwds)

    def test_oauth(self):
        """
        Suppress oauth test (we test for token auth here).
        """
        pass

    def test_oauth_ctr_creds(self):
        """
        Suppress oauth test (we test for token auth here).
        """
        pass

    def test_oauth_Unauthorized(self):
        """
        Suppress oauth test (we test for token auth here).
        """
        pass

    def app(self, environ, start_response):
        res = test_http_client.TestHTTPClientBase.app(
            self, environ, start_response)
        if res is not None:
            return res
        # mime solead application here.
        if '/token' in environ['PATH_INFO']:
            auth = environ.get(SoledadAuthMiddleware.HTTP_AUTH_KEY)
            if not auth:
                start_response("401 Unauthorized",
                               [('Content-Type', 'application/json')])
                return [json.dumps({"error": "unauthorized",
                                    "message": e.message})]
            scheme, encoded = auth.split(None, 1)
            if scheme.lower() != 'token':
                start_response("401 Unauthorized",
                               [('Content-Type', 'application/json')])
                return [json.dumps({"error": "unauthorized",
                                    "message": e.message})]
            uuid, token = encoded.decode('base64').split(':', 1)
            if uuid != 'user-uuid' and token != 'auth-token':
                return unauth_err("Incorrect address or token.")
            start_response("200 OK", [('Content-Type', 'application/json')])
            return [json.dumps([environ['PATH_INFO'], uuid, token])]

    def test_token(self):
        """
        Test if token is sent correctly.
        """
        cli = self.getClientWithToken()
        cli.set_token_credentials('user-uuid', 'auth-token')
        res, headers = cli._request('GET', ['doc', 'token'])
        self.assertEqual(
            ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res))

    def test_token_ctr_creds(self):
        cli = self.getClientWithToken(creds={'token': {
            'uuid': 'user-uuid',
            'token': 'auth-token',
        }})
        res, headers = cli._request('GET', ['doc', 'token'])
        self.assertEqual(
            ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res))


#-----------------------------------------------------------------------------
# The following tests come from `u1db.tests.test_document`.
#-----------------------------------------------------------------------------

class TestLeapDocument(test_document.TestDocument, BaseSoledadTest):

    scenarios = ([(
        'leap', {'make_document_for_test': make_leap_document_for_test})])


class TestLeapPyDocument(test_document.TestPyDocument, BaseSoledadTest):

    scenarios = ([(
        'leap', {'make_document_for_test': make_leap_document_for_test})])


#-----------------------------------------------------------------------------
# The following tests come from `u1db.tests.test_remote_sync_target`.
#-----------------------------------------------------------------------------

class TestLeapSyncTargetBasics(
        test_remote_sync_target.TestHTTPSyncTargetBasics):
    """
    Some tests had to be copied to this class so we can instantiate our own
    target.
    """

    def test_parse_url(self):
        remote_target = leap_backend.LeapSyncTarget('http://127.0.0.1:12345/')
        self.assertEqual('http', remote_target._url.scheme)
        self.assertEqual('127.0.0.1', remote_target._url.hostname)
        self.assertEqual(12345, remote_target._url.port)
        self.assertEqual('/', remote_target._url.path)


class TestLeapParsingSyncStream(
        test_remote_sync_target.TestParsingSyncStream,
        BaseSoledadTest):
    """
    Some tests had to be copied to this class so we can instantiate our own
    target.
    """

    def setUp(self):
        test_remote_sync_target.TestParsingSyncStream.setUp(self)
        BaseSoledadTest.setUp(self)

    def tearDown(self):
        test_remote_sync_target.TestParsingSyncStream.tearDown(self)
        BaseSoledadTest.tearDown(self)

    def test_extra_comma(self):
        """
        Test adapted to use encrypted content.
        """
        doc = leap_backend.LeapDocument('i', rev='r')
        doc.content = {}
        enc_json = leap_backend.encrypt_doc(self._soledad._crypto, doc)
        tgt = leap_backend.LeapSyncTarget(
            "http://foo/foo", crypto=self._soledad._crypto)

        self.assertRaises(u1db.errors.BrokenSyncStream,
                          tgt._parse_sync_stream, "[\r\n{},\r\n]", None)
        self.assertRaises(u1db.errors.BrokenSyncStream,
                          tgt._parse_sync_stream,
                          '[\r\n{},\r\n{"id": "i", "rev": "r", '
                          '"content": %s, "gen": 3, "trans_id": "T-sid"}'
                          ',\r\n]' % json.dumps(enc_json),
                          lambda doc, gen, trans_id: None)

    def test_wrong_start(self):
        tgt = leap_backend.LeapSyncTarget("http://foo/foo")

        self.assertRaises(u1db.errors.BrokenSyncStream,
                          tgt._parse_sync_stream, "{}\r\n]", None)

        self.assertRaises(u1db.errors.BrokenSyncStream,
                          tgt._parse_sync_stream, "\r\n{}\r\n]", None)

        self.assertRaises(u1db.errors.BrokenSyncStream,
                          tgt._parse_sync_stream, "", None)

    def test_wrong_end(self):
        tgt = leap_backend.LeapSyncTarget("http://foo/foo")

        self.assertRaises(u1db.errors.BrokenSyncStream,
                          tgt._parse_sync_stream, "[\r\n{}", None)

        self.assertRaises(u1db.errors.BrokenSyncStream,
                          tgt._parse_sync_stream, "[\r\n", None)

    def test_missing_comma(self):
        tgt = leap_backend.LeapSyncTarget("http://foo/foo")

        self.assertRaises(u1db.errors.BrokenSyncStream,
                          tgt._parse_sync_stream,
                          '[\r\n{}\r\n{"id": "i", "rev": "r", '
                          '"content": "c", "gen": 3}\r\n]', None)

    def test_no_entries(self):
        tgt = leap_backend.LeapSyncTarget("http://foo/foo")

        self.assertRaises(u1db.errors.BrokenSyncStream,
                          tgt._parse_sync_stream, "[\r\n]", None)

    def test_error_in_stream(self):
        tgt = leap_backend.LeapSyncTarget("http://foo/foo")

        self.assertRaises(u1db.errors.Unavailable,
                          tgt._parse_sync_stream,
                          '[\r\n{"new_generation": 0},'
                          '\r\n{"error": "unavailable"}\r\n', None)

        self.assertRaises(u1db.errors.Unavailable,
                          tgt._parse_sync_stream,
                          '[\r\n{"error": "unavailable"}\r\n', None)

        self.assertRaises(u1db.errors.BrokenSyncStream,
                          tgt._parse_sync_stream,
                          '[\r\n{"error": "?"}\r\n', None)


#
# functions for TestRemoteSyncTargets
#

def leap_sync_target(test, path):
    return leap_backend.LeapSyncTarget(
        test.getURL(path), crypto=test._soledad._crypto)


def oauth_leap_sync_target(test, path):
    st = leap_sync_target(test, '~/' + path)
    st.set_oauth_credentials(tests.consumer1.key, tests.consumer1.secret,
                             tests.token1.key, tests.token1.secret)
    return st


def token_leap_sync_target(test, path):
    st = leap_sync_target(test, path)
    st.set_token_credentials('user-uuid', 'auth-token')
    return st


class TestLeapSyncTarget(
        test_remote_sync_target.TestRemoteSyncTargets, BaseSoledadTest):

    scenarios = [
        ('http', {'make_app_with_state': make_soledad_app,
                  'make_document_for_test': make_leap_document_for_test,
                  'sync_target': leap_sync_target}),
        ('oauth_http', {'make_app_with_state': make_oauth_http_app,
                        'make_document_for_test': make_leap_document_for_test,
                        'sync_target': oauth_leap_sync_target}),
        ('token_soledad',
            {'make_app_with_state': make_token_soledad_app,
             'make_document_for_test': make_leap_document_for_test,
             'sync_target': token_leap_sync_target}),
    ]

    def test_sync_exchange_send(self):
        """
        Test for sync exchanging send of document.

        This test was adapted to decrypt remote content before assert.
        """
        self.startServer()
        db = self.request_state._create_database('test')
        remote_target = self.getSyncTarget('test')
        other_docs = []

        def receive_doc(doc):
            other_docs.append((doc.doc_id, doc.rev, doc.get_json()))

        doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
        new_gen, trans_id = remote_target.sync_exchange(
            [(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
            last_known_trans_id=None, return_doc_cb=receive_doc)
        self.assertEqual(1, new_gen)
        self.assertGetEncryptedDoc(
            db, 'doc-here', 'replica:1', '{"value": "here"}', False)

    def test_sync_exchange_send_failure_and_retry_scenario(self):
        """
        Test for sync exchange failure and retry.

        This test was adapted to decrypt remote content before assert.
        """

        self.startServer()

        def blackhole_getstderr(inst):
            return cStringIO.StringIO()

        self.patch(self.server.RequestHandlerClass, 'get_stderr',
                   blackhole_getstderr)
        db = self.request_state._create_database('test')
        _put_doc_if_newer = db._put_doc_if_newer
        trigger_ids = ['doc-here2']

        def bomb_put_doc_if_newer(doc, save_conflict,
                                  replica_uid=None, replica_gen=None,
                                  replica_trans_id=None):
            if doc.doc_id in trigger_ids:
                raise Exception
            return _put_doc_if_newer(doc, save_conflict=save_conflict,
                                     replica_uid=replica_uid,
                                     replica_gen=replica_gen,
                                     replica_trans_id=replica_trans_id)
        self.patch(db, '_put_doc_if_newer', bomb_put_doc_if_newer)
        remote_target = self.getSyncTarget('test')
        other_changes = []

        def receive_doc(doc, gen, trans_id):
            other_changes.append(
                (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))

        doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
        doc2 = self.make_document('doc-here2', 'replica:1',
                                  '{"value": "here2"}')
        self.assertRaises(
            u1db.errors.HTTPError,
            remote_target.sync_exchange,
            [(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')],
            'replica', last_known_generation=0, last_known_trans_id=None,
            return_doc_cb=receive_doc)
        self.assertGetEncryptedDoc(
            db, 'doc-here', 'replica:1', '{"value": "here"}',
            False)
        self.assertEqual(
            (10, 'T-sid'), db._get_replica_gen_and_trans_id('replica'))
        self.assertEqual([], other_changes)
        # retry
        trigger_ids = []
        new_gen, trans_id = remote_target.sync_exchange(
            [(doc2, 11, 'T-sud')], 'replica', last_known_generation=0,
            last_known_trans_id=None, return_doc_cb=receive_doc)
        self.assertGetEncryptedDoc(
            db, 'doc-here2', 'replica:1', '{"value": "here2"}',
            False)
        self.assertEqual(
            (11, 'T-sud'), db._get_replica_gen_and_trans_id('replica'))
        self.assertEqual(2, new_gen)
        # bounced back to us
        self.assertEqual(
            ('doc-here', 'replica:1', '{"value": "here"}', 1),
            other_changes[0][:-1])

    def test_sync_exchange_send_ensure_callback(self):
        """
        Test for sync exchange failure and retry.

        This test was adapted to decrypt remote content before assert.
        """
        self.startServer()
        remote_target = self.getSyncTarget('test')
        other_docs = []
        replica_uid_box = []

        def receive_doc(doc):
            other_docs.append((doc.doc_id, doc.rev, doc.get_json()))

        def ensure_cb(replica_uid):
            replica_uid_box.append(replica_uid)

        doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
        new_gen, trans_id = remote_target.sync_exchange(
            [(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
            last_known_trans_id=None, return_doc_cb=receive_doc,
            ensure_callback=ensure_cb)
        self.assertEqual(1, new_gen)
        db = self.request_state.open_database('test')
        self.assertEqual(1, len(replica_uid_box))
        self.assertEqual(db._replica_uid, replica_uid_box[0])
        self.assertGetEncryptedDoc(
            db, 'doc-here', 'replica:1', '{"value": "here"}', False)


#-----------------------------------------------------------------------------
# The following tests come from `u1db.tests.test_https`.
#-----------------------------------------------------------------------------

def token_leap_https_sync_target(test, host, path):
    _, port = test.server.server_address
    st = leap_backend.LeapSyncTarget(
        'https://%s:%d/%s' % (host, port, path),
        crypto=test._soledad._crypto)
    st.set_token_credentials('user-uuid', 'auth-token')
    return st


class TestLeapSyncTargetHttpsSupport(test_https.TestHttpSyncTargetHttpsSupport,
                                     BaseSoledadTest):

    scenarios = [
        ('token_soledad_https',
            {'server_def': test_https.https_server_def,
             'make_app_with_state': make_token_soledad_app,
             'make_document_for_test': make_leap_document_for_test,
             'sync_target': token_leap_https_sync_target}),
    ]

    def setUp(self):
        # the parent constructor undoes our SSL monkey patch to ensure tests
        # run smoothly with standard u1db.
        test_https.TestHttpSyncTargetHttpsSupport.setUp(self)
        # so here monkey patch again to test our functionality.
        http_client._VerifiedHTTPSConnection = soledad.VerifiedHTTPSConnection
        soledad.SOLEDAD_CERT = http_client.CA_CERTS

    def test_working(self):
        """
        Test that SSL connections work well.

        This test was adapted to patch Soledad's HTTPS connection custom class
        with the intended CA certificates.
        """
        self.startServer()
        db = self.request_state._create_database('test')
        self.patch(soledad, 'SOLEDAD_CERT', self.cacert_pem)
        remote_target = self.getSyncTarget('localhost', 'test')
        remote_target.record_sync_info('other-id', 2, 'T-id')
        self.assertEqual(
            (2, 'T-id'), db._get_replica_gen_and_trans_id('other-id'))

    def test_host_mismatch(self):
        """
        Test that SSL connections to a hostname different than the one in the
        certificate raise CertificateError.

        This test was adapted to patch Soledad's HTTPS connection custom class
        with the intended CA certificates.
        """
        self.startServer()
        self.request_state._create_database('test')
        self.patch(soledad, 'SOLEDAD_CERT', self.cacert_pem)
        remote_target = self.getSyncTarget('127.0.0.1', 'test')
        self.assertRaises(
            http_client.CertificateError, remote_target.record_sync_info,
            'other-id', 2, 'T-id')


#-----------------------------------------------------------------------------
# The following tests come from `u1db.tests.test_http_database`.
#-----------------------------------------------------------------------------

class _HTTPDatabase(http_database.HTTPDatabase, auth.TokenBasedAuth):
    """
    Wraps our token auth implementation.
    """

    def set_token_credentials(self, uuid, token):
        auth.TokenBasedAuth.set_token_credentials(self, uuid, token)

    def _sign_request(self, method, url_query, params):
        return auth.TokenBasedAuth._sign_request(
            self, method, url_query, params)


class TestHTTPDatabaseWithCreds(
        test_http_database.TestHTTPDatabaseCtrWithCreds):

    def test_get_sync_target_inherits_token_credentials(self):
        # this test was from TestDatabaseSimpleOperations but we put it here
        # for convenience.
        self.db = _HTTPDatabase('dbase')
        self.db.set_token_credentials('user-uuid', 'auth-token')
        st = self.db.get_sync_target()
        self.assertEqual(self.db._creds, st._creds)

    def test_ctr_with_creds(self):
        db1 = _HTTPDatabase('http://dbs/db', creds={'token': {
            'uuid': 'user-uuid',
            'token': 'auth-token',
        }})
        self.assertIn('token', db1._creds)


#-----------------------------------------------------------------------------
# The following tests come from `u1db.tests.test_sync`.
#-----------------------------------------------------------------------------

def _make_local_db_and_leap_target(test, path='test'):
    test.startServer()
    db = test.request_state._create_database(os.path.basename(path))
    st = leap_backend.LeapSyncTarget.connect(
        test.getURL(path), crypto=test._soledad._crypto)
    return db, st


def _make_local_db_and_token_leap_target(test):
    db, st = _make_local_db_and_leap_target(test, 'test')
    st.set_token_credentials('user-uuid', 'auth-token')
    return db, st


target_scenarios = [
    ('token_leap', {'create_db_and_target':
                    _make_local_db_and_token_leap_target,
                    'make_app_with_state': make_token_soledad_app}),
]


class LeapDatabaseSyncTargetTests(
        test_sync.DatabaseSyncTargetTests, BaseSoledadTest):

    scenarios = (
        tests.multiply_scenarios(
            tests.DatabaseBaseTests.scenarios,
            target_scenarios))

    def test_sync_exchange(self):
        """
        Test sync exchange.

        This test was adapted to decrypt remote content before assert.
        """
        sol = _make_local_db_and_leap_target(self)
        docs_by_gen = [
            (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10,
             'T-sid')]
        new_gen, trans_id = self.st.sync_exchange(
            docs_by_gen, 'replica', last_known_generation=0,
            last_known_trans_id=None, return_doc_cb=self.receive_doc)
        self.assertGetEncryptedDoc(
            self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
        self.assertTransactionLog(['doc-id'], self.db)
        last_trans_id = self.getLastTransId(self.db)
        self.assertEqual(([], 1, last_trans_id),
                         (self.other_changes, new_gen, last_trans_id))
        self.assertEqual(10, self.st.get_sync_info('replica')[3])

    def test_sync_exchange_push_many(self):
        """
        Test sync exchange.

        This test was adapted to decrypt remote content before assert.
        """
        docs_by_gen = [
            (self.make_document(
                'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'),
            (self.make_document(
                'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')]
        new_gen, trans_id = self.st.sync_exchange(
            docs_by_gen, 'replica', last_known_generation=0,
            last_known_trans_id=None, return_doc_cb=self.receive_doc)
        self.assertGetEncryptedDoc(
            self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
        self.assertGetEncryptedDoc(
            self.db, 'doc-id2', 'replica:1', tests.nested_doc, False)
        self.assertTransactionLog(['doc-id', 'doc-id2'], self.db)
        last_trans_id = self.getLastTransId(self.db)
        self.assertEqual(([], 2, last_trans_id),
                         (self.other_changes, new_gen, trans_id))
        self.assertEqual(11, self.st.get_sync_info('replica')[3])

    def test_sync_exchange_returns_many_new_docs(self):
        """
        Test sync exchange.

        This test was adapted to avoid JSON serialization comparison as local
        and remote representations might differ. It looks directly at the
        doc's contents instead.
        """
        doc = self.db.create_doc_from_json(tests.simple_doc)
        doc2 = self.db.create_doc_from_json(tests.nested_doc)
        self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
        new_gen, _ = self.st.sync_exchange(
            [], 'other-replica', last_known_generation=0,
            last_known_trans_id=None, return_doc_cb=self.receive_doc)
        self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
        self.assertEqual(2, new_gen)
        self.assertEqual(
            [(doc.doc_id, doc.rev, 1),
             (doc2.doc_id, doc2.rev, 2)],
            [c[:-3]+c[-2:-1] for c in self.other_changes])
        self.assertEqual(
            json.loads(tests.simple_doc),
            json.loads(self.other_changes[0][2]))
        self.assertEqual(
            json.loads(tests.nested_doc),
            json.loads(self.other_changes[1][2]))
        if self.whitebox:
            self.assertEqual(
                self.db._last_exchange_log['return'],
                {'last_gen': 2, 'docs':
                 [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]})


class TestLeapDbSync(test_sync.TestDbSync, BaseSoledadTest):
    """Test db.sync remote sync shortcut"""

    scenarios = [
        ('py-http', {
            'make_app_with_state': make_soledad_app,
            'make_database_for_test': tests.make_memory_database_for_test,
        }),
        ('py-token-http', {
            'make_app_with_state': make_token_soledad_app,
            'make_database_for_test': tests.make_memory_database_for_test,
            'token': True
        }),
    ]

    oauth = False
    token = False

    def do_sync(self, target_name):
        """
        Perform sync using LeapSyncTarget and Token auth.
        """
        if self.token:
            extra = dict(creds={'token': {
                'uuid': 'user-uuid',
                'token': 'auth-token',
            }})
            target_url = self.getURL(target_name)
            return Synchronizer(
                self.db,
                leap_backend.LeapSyncTarget(
                    target_url,
                    crypto=self._soledad._crypto,
                    **extra)).sync(autocreate=True)
        else:
            return test_sync.TestDbSync.do_sync(self, target_name)

    def test_db_sync(self):
        """
        Test sync.

        Adapted to check for encrypted content.
        """
        doc1 = self.db.create_doc_from_json(tests.simple_doc)
        doc2 = self.db2.create_doc_from_json(tests.nested_doc)
        local_gen_before_sync = self.do_sync('test2.db')
        gen, _, changes = self.db.whats_changed(local_gen_before_sync)
        self.assertEqual(1, len(changes))
        self.assertEqual(doc2.doc_id, changes[0][0])
        self.assertEqual(1, gen - local_gen_before_sync)
        self.assertGetEncryptedDoc(
            self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False)
        self.assertGetEncryptedDoc(
            self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False)

    def test_db_sync_autocreate(self):
        """
        Test sync.

        Adapted to check for encrypted content.
        """
        doc1 = self.db.create_doc_from_json(tests.simple_doc)
        local_gen_before_sync = self.do_sync('test3.db')
        gen, _, changes = self.db.whats_changed(local_gen_before_sync)
        self.assertEqual(0, gen - local_gen_before_sync)
        db3 = self.request_state.open_database('test3.db')
        gen, _, changes = db3.whats_changed()
        self.assertEqual(1, len(changes))
        self.assertEqual(doc1.doc_id, changes[0][0])
        self.assertGetEncryptedDoc(
            db3, doc1.doc_id, doc1.rev, tests.simple_doc, False)
        t_gen, _ = self.db._get_replica_gen_and_trans_id('test3.db')
        s_gen, _ = db3._get_replica_gen_and_trans_id('test1')
        self.assertEqual(1, t_gen)
        self.assertEqual(1, s_gen)


load_tests = tests.load_with_scenarios