# -*- CODING: UTF-8 -*- # util.py # Copyright (C) 2013 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Utilities used by multiple test suites. """ import os import tempfile import shutil import random import string import couchdb from uuid import uuid4 from mock import Mock from urlparse import urljoin from StringIO import StringIO from pysqlcipher import dbapi2 from twisted.trial import unittest from leap.common.testing.basetest import BaseLeapTest from leap.soledad.common import l2db from leap.soledad.common.l2db import sync from leap.soledad.common.l2db.remote import http_database from leap.soledad.common import soledad_assert from leap.soledad.common.document import SoledadDocument from leap.soledad.common.couch import CouchDatabase from leap.soledad.common.couch.state import CouchServerState from leap.soledad.common.crypto import ENC_SCHEME_KEY from leap.soledad.client import Soledad from leap.soledad.client import http_target from leap.soledad.client import auth from leap.soledad.client.crypto import decrypt_doc_dict from leap.soledad.client.sqlcipher import SQLCipherDatabase from leap.soledad.client.sqlcipher import SQLCipherOptions from leap.soledad.server import SoledadApp from leap.soledad.server.auth import SoledadTokenAuthMiddleware PASSWORD = '123456' ADDRESS = 'leap@leap.se' def make_local_db_and_target(test): db = test.create_database('test') st = db.get_sync_target() return db, st def make_document_for_test(test, doc_id, rev, content, has_conflicts=False): return SoledadDocument(doc_id, rev, content, has_conflicts=has_conflicts) def make_sqlcipher_database_for_test(test, replica_uid): db = SQLCipherDatabase( SQLCipherOptions(':memory:', PASSWORD)) db._set_replica_uid(replica_uid) return db def copy_sqlcipher_database_for_test(test, db): # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR # HOUSE. new_db = make_sqlcipher_database_for_test(test, None) tmpfile = StringIO() for line in db._db_handle.iterdump(): if 'sqlite_sequence' not in line: # work around bug in iterdump tmpfile.write('%s\n' % line) tmpfile.seek(0) new_db._db_handle = dbapi2.connect(':memory:') new_db._db_handle.cursor().executescript(tmpfile.read()) new_db._db_handle.commit() new_db._set_replica_uid(db._replica_uid) new_db._factory = db._factory return new_db SQLCIPHER_SCENARIOS = [ ('sqlcipher', {'make_database_for_test': make_sqlcipher_database_for_test, 'copy_database_for_test': copy_sqlcipher_database_for_test, 'make_document_for_test': make_document_for_test, }), ] def make_soledad_app(state): return SoledadApp(state) def make_token_soledad_app(state): app = SoledadApp(state) def _verify_authentication_data(uuid, auth_data): if uuid.startswith('user-') and auth_data == 'auth-token': return True return False # we test for action authorization in leap.soledad.common.tests.test_server def _verify_authorization(uuid, environ): return True application = SoledadTokenAuthMiddleware(app) application._verify_authentication_data = _verify_authentication_data application._verify_authorization = _verify_authorization return application def make_soledad_document_for_test(test, doc_id, rev, content, has_conflicts=False): return SoledadDocument( doc_id, rev, content, has_conflicts=has_conflicts) def make_token_http_database_for_test(test, replica_uid): test.startServer() test.request_state._create_database(replica_uid) class _HTTPDatabaseWithToken( http_database.HTTPDatabase, auth.TokenBasedAuth): def set_token_credentials(self, uuid, token): auth.TokenBasedAuth.set_token_credentials(self, uuid, token) def _sign_request(self, method, url_query, params): return auth.TokenBasedAuth._sign_request( self, method, url_query, params) http_db = _HTTPDatabaseWithToken(test.getURL('test')) http_db.set_token_credentials('user-uuid', 'auth-token') return http_db def copy_token_http_database_for_test(test, db): # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR # HOUSE. http_db = test.request_state._copy_database(db) http_db.set_token_credentials(http_db, 'user-uuid', 'auth-token') return http_db def sync_via_synchronizer(test, db_source, db_target, trace_hook=None, trace_hook_shallow=None): target = db_target.get_sync_target() trace_hook = trace_hook or trace_hook_shallow if trace_hook: target._set_trace_hook(trace_hook) return sync.Synchronizer(db_source, target).sync() class MockedSharedDBTest(object): def get_default_shared_mock(self, put_doc_side_effect=None, get_doc_return_value=None): """ Get a default class for mocking the shared DB """ class defaultMockSharedDB(object): get_doc = Mock(return_value=get_doc_return_value) put_doc = Mock(side_effect=put_doc_side_effect) open = Mock(return_value=None) close = Mock(return_value=None) syncable = True def __call__(self): return self return defaultMockSharedDB def soledad_sync_target( test, path, source_replica_uid=uuid4().hex, sync_db=None, sync_enc_pool=None): creds = {'token': { 'uuid': 'user-uuid', 'token': 'auth-token', }} return http_target.SoledadHTTPSyncTarget( test.getURL(path), source_replica_uid, creds, test._soledad._crypto, None, # cert_file sync_db=sync_db, sync_enc_pool=sync_enc_pool) # redefine the base leap test class so it inherits from twisted trial's # TestCase. This is needed so trial knows that it has to manage a reactor and # wait for deferreds returned by tests to be fired. BaseLeapTest = type( 'BaseLeapTest', (unittest.TestCase,), dict(BaseLeapTest.__dict__)) class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest): """ Instantiates Soledad for usage in tests. """ defer_sync_encryption = False def setUp(self): # The following snippet comes from BaseLeapTest.setUpClass, but we # repeat it here because twisted.trial does not work with # setUpClass/tearDownClass. self.old_path = os.environ['PATH'] self.old_home = os.environ['HOME'] self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") self.home = self.tempdir bin_tdir = os.path.join( self.tempdir, 'bin') os.environ["PATH"] = bin_tdir os.environ["HOME"] = self.tempdir # config info self.db1_file = os.path.join(self.tempdir, "db1.u1db") self.db2_file = os.path.join(self.tempdir, "db2.u1db") self.email = ADDRESS # open test dbs self._db1 = l2db.open(self.db1_file, create=True, document_factory=SoledadDocument) self._db2 = l2db.open(self.db2_file, create=True, document_factory=SoledadDocument) # get a random prefix for each test, so we do not mess with # concurrency during initialization and shutting down of # each local db. self.rand_prefix = ''.join( map(lambda x: random.choice(string.ascii_letters), range(6))) # initialize soledad by hand so we can control keys # XXX check if this soledad is actually used self._soledad = self._soledad_instance( prefix=self.rand_prefix, user=self.email) def tearDown(self): self._db1.close() self._db2.close() self._soledad.close() # restore paths os.environ["PATH"] = self.old_path os.environ["HOME"] = self.old_home def _delete_temporary_dirs(): # XXX should not access "private" attrs for f in [self._soledad.local_db_path, self._soledad.secrets.secrets_path]: if os.path.isfile(f): os.unlink(f) # The following snippet comes from BaseLeapTest.setUpClass, but we # repeat it here because twisted.trial does not work with # setUpClass/tearDownClass. soledad_assert( self.tempdir.startswith('/tmp/leap_tests-'), "beware! tried to remove a dir which does not " "live in temporal folder!") shutil.rmtree(self.tempdir) from twisted.internet import reactor reactor.addSystemEventTrigger( "after", "shutdown", _delete_temporary_dirs) def _soledad_instance(self, user=ADDRESS, passphrase=u'123', prefix='', secrets_path='secrets.json', local_db_path='soledad.u1db', server_url='https://127.0.0.1/', cert_file=None, shared_db_class=None, auth_token='auth-token'): def _put_doc_side_effect(doc): self._doc_put = doc if shared_db_class is not None: MockSharedDB = shared_db_class else: MockSharedDB = self.get_default_shared_mock( _put_doc_side_effect) soledad = Soledad( user, passphrase, secrets_path=os.path.join( self.tempdir, prefix, secrets_path), local_db_path=os.path.join( self.tempdir, prefix, local_db_path), server_url=server_url, # Soledad will fail if not given an url. cert_file=cert_file, defer_encryption=self.defer_sync_encryption, shared_db=MockSharedDB(), auth_token=auth_token) self.addCleanup(soledad.close) return soledad def assertGetEncryptedDoc( self, db, doc_id, doc_rev, content, has_conflicts): """ Assert that the document in the database looks correct. """ exp_doc = self.make_document(doc_id, doc_rev, content, has_conflicts=has_conflicts) doc = db.get_doc(doc_id) if ENC_SCHEME_KEY in doc.content: # XXX check for SYM_KEY too key = self._soledad._crypto.doc_passphrase(doc.doc_id) secret = self._soledad._crypto.secret decrypted = decrypt_doc_dict( doc.content, doc.doc_id, doc.rev, key, secret) doc.set_json(decrypted) self.assertEqual(exp_doc.doc_id, doc.doc_id) self.assertEqual(exp_doc.rev, doc.rev) self.assertEqual(exp_doc.has_conflicts, doc.has_conflicts) self.assertEqual(exp_doc.content, doc.content) class CouchDBTestCase(unittest.TestCase, MockedSharedDBTest): """ TestCase base class for tests against a real CouchDB server. """ def setUp(self): """ Make sure we have a CouchDB instance for a test. """ self.couch_port = 5984 self.couch_url = 'http://localhost:%d' % self.couch_port self.couch_server = couchdb.Server(self.couch_url) def delete_db(self, name): try: self.couch_server.delete(name) except: # ignore if already missing pass class CouchServerStateForTests(CouchServerState): """ This is a slightly modified CouchDB server state that allows for creating a database. Ordinarily, the CouchDB server state does not allow some operations, because for security purposes the Soledad Server should not even have enough permissions to perform them. For tests, we allow database creation, otherwise we'd have to create those databases in setUp/tearDown methods, which is less pleasant than allowing the db to be automatically created. """ def __init__(self, *args, **kwargs): self.dbs = [] super(CouchServerStateForTests, self).__init__(*args, **kwargs) def _create_database(self, replica_uid=None, dbname=None): """ Create db and append to a list, allowing test to close it later """ dbname = dbname or ('test-%s' % uuid4().hex) db = CouchDatabase.open_database( urljoin(self.couch_url, dbname), True, replica_uid=replica_uid or 'test', ensure_ddocs=True) self.dbs.append(db) return db def ensure_database(self, dbname): db = self._create_database(dbname=dbname) return db, db.replica_uid class SoledadWithCouchServerMixin( BaseSoledadTest, CouchDBTestCase): def setUp(self): CouchDBTestCase.setUp(self) BaseSoledadTest.setUp(self) main_test_class = getattr(self, 'main_test_class', None) if main_test_class is not None: main_test_class.setUp(self) def tearDown(self): main_test_class = getattr(self, 'main_test_class', None) if main_test_class is not None: main_test_class.tearDown(self) # delete the test database BaseSoledadTest.tearDown(self) CouchDBTestCase.tearDown(self) def make_app(self): self.request_state = CouchServerStateForTests(self.couch_url) self.addCleanup(self.delete_dbs) return self.make_app_with_state(self.request_state) def delete_dbs(self): for db in self.request_state.dbs: self.delete_db(db._dbname)