diff options
Diffstat (limited to 'src/leap/soledad/tests')
-rw-r--r-- | src/leap/soledad/tests/__init__.py | 15 | ||||
-rw-r--r-- | src/leap/soledad/tests/test_couch.py | 5 | ||||
-rw-r--r-- | src/leap/soledad/tests/test_crypto.py | 11 | ||||
-rw-r--r-- | src/leap/soledad/tests/test_leap_backend.py | 36 | ||||
-rw-r--r-- | src/leap/soledad/tests/test_server.py | 239 | ||||
-rw-r--r-- | src/leap/soledad/tests/test_soledad.py | 15 | ||||
-rw-r--r-- | src/leap/soledad/tests/test_sqlcipher.py | 11 |
7 files changed, 277 insertions, 55 deletions
diff --git a/src/leap/soledad/tests/__init__.py b/src/leap/soledad/tests/__init__.py index 00de687b..c00fb847 100644 --- a/src/leap/soledad/tests/__init__.py +++ b/src/leap/soledad/tests/__init__.py @@ -2,6 +2,7 @@ Tests to make sure Soledad provides U1DB functionality and more. """ +import os import u1db from mock import Mock @@ -28,8 +29,8 @@ class BaseSoledadTest(BaseLeapTest): def setUp(self): # config info - self.db1_file = "%s/db1.u1db" % self.tempdir - self.db2_file = "%s/db2.u1db" % self.tempdir + self.db1_file = os.path.join(self.tempdir, "db1.u1db") + self.db2_file = os.path.join(self.tempdir, "db2.u1db") self.email = 'leap@leap.se' # open test dbs self._db1 = u1db.open(self.db1_file, create=True, @@ -42,12 +43,15 @@ class BaseSoledadTest(BaseLeapTest): def tearDown(self): self._db1.close() self._db2.close() + for f in [self._soledad._local_db_path, self._soledad._secrets_path]: + if os.path.isfile(f): + os.unlink(f) self._soledad.close() def _soledad_instance(self, user='leap@leap.se', passphrase='123', prefix='', secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME, - local_db_path='/soledad.u1db', server_url='', + local_db_path='soledad.u1db', server_url='', cert_file=None, secret_id=None): def _put_doc_side_effect(doc): @@ -65,8 +69,9 @@ class BaseSoledadTest(BaseLeapTest): return Soledad( user, passphrase, - secrets_path=self.tempdir+prefix+secrets_path, - local_db_path=self.tempdir+prefix+local_db_path, + secrets_path=os.path.join(self.tempdir, prefix, secrets_path), + local_db_path=os.path.join( + self.tempdir, prefix, local_db_path), server_url=server_url, # Soledad will fail if not given an url. cert_file=cert_file, secret_id=secret_id) diff --git a/src/leap/soledad/tests/test_couch.py b/src/leap/soledad/tests/test_couch.py index d6b9ad83..b3cbc1bc 100644 --- a/src/leap/soledad/tests/test_couch.py +++ b/src/leap/soledad/tests/test_couch.py @@ -31,10 +31,7 @@ from leap.soledad.backends import couch from leap.soledad.tests import u1db_tests as tests from leap.soledad.tests.u1db_tests import test_backends from leap.soledad.tests.u1db_tests import test_sync -try: - import simplejson as json -except ImportError: - import json # noqa +import simplejson as json from leap.soledad.backends.leap_backend import ( LeapDocument, ) diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py index a61b931c..5432856e 100644 --- a/src/leap/soledad/tests/test_crypto.py +++ b/src/leap/soledad/tests/test_crypto.py @@ -23,10 +23,7 @@ Tests for cryptographic related stuff. import os import shutil import tempfile -try: - import simplejson as json -except ImportError: - import json # noqa +import simplejson as json import hashlib @@ -44,7 +41,7 @@ from leap.soledad.backends.leap_backend import ( WrongMac, ) from leap.soledad.backends.couch import CouchDatabase -from leap.soledad import KeyAlreadyExists, Soledad +from leap.soledad import Soledad from leap.soledad.crypto import SoledadCrypto from leap.soledad.tests import BaseSoledadTest from leap.soledad.tests.test_couch import CouchDBTestCase @@ -192,7 +189,7 @@ class RecoveryDocumentTestCase(BaseSoledadTest): def test_import_recovery_document(self): rd = self._soledad.export_recovery_document() - s = self._soledad_instance(user='anotheruser@leap.se', prefix='/2') + s = self._soledad_instance(user='anotheruser@leap.se') s.import_recovery_document(rd) s._set_secret_id(self._soledad._secret_id) self.assertEqual(self._soledad._uuid, @@ -238,7 +235,7 @@ class CryptoMethodsTestCase(BaseSoledadTest): def test__has_secret(self): - sol = self._soledad_instance(user='user@leap.se', prefix='/4') + sol = self._soledad_instance(user='user@leap.se') self.assertTrue(sol._has_secret(), "Should have a secret at " "this point") # setting secret id to None should not interfere in the fact we have a diff --git a/src/leap/soledad/tests/test_leap_backend.py b/src/leap/soledad/tests/test_leap_backend.py index 8afae6f6..d04ee412 100644 --- a/src/leap/soledad/tests/test_leap_backend.py +++ b/src/leap/soledad/tests/test_leap_backend.py @@ -23,32 +23,28 @@ Test Leap backend bits. import u1db import os import ssl -try: - import simplejson as json -except ImportError: - import json # noqa +import simplejson as json import cStringIO + from u1db.sync import Synchronizer from u1db.remote import ( http_client, http_database, http_target, ) +from routes.mapper import Mapper from leap import soledad from leap.soledad.backends import leap_backend from leap.soledad.server import ( SoledadApp, - SoledadAuthMiddleware + SoledadAuthMiddleware, ) from leap.soledad import auth from leap.soledad.tests import u1db_tests as tests -from leap.soledad.tests.u1db_tests.test_remote_sync_target import ( - make_oauth_http_app, -) from leap.soledad.tests import BaseSoledadTest from leap.soledad.tests.u1db_tests import test_backends from leap.soledad.tests.u1db_tests import test_http_database @@ -81,8 +77,13 @@ def make_token_soledad_app(state): return True return False + # we test for action authorization in leap.soledad.tests.test_server + def verify_action(environ, uuid): + return True + application = SoledadAuthMiddleware(app) application.verify_token = verify_token + application.verify_action = verify_action return application @@ -128,12 +129,6 @@ def copy_token_http_database_for_test(test, db): class LeapTests(test_backends.AllDatabaseTests, BaseSoledadTest): scenarios = LEAP_SCENARIOS + [ - ('oauth_http', {'make_database_for_test': - test_backends.make_oauth_http_database_for_test, - 'copy_database_for_test': - test_backends.copy_oauth_http_database_for_test, - 'make_document_for_test': make_leap_document_for_test, - 'make_app_with_state': make_oauth_http_app}), ('token_http', {'make_database_for_test': make_token_http_database_for_test, 'copy_database_for_test': @@ -362,13 +357,6 @@ def leap_sync_target(test, path): test.getURL(path), crypto=test._soledad._crypto) -def oauth_leap_sync_target(test, path): - st = leap_sync_target(test, '~/' + path) - st.set_oauth_credentials(tests.consumer1.key, tests.consumer1.secret, - tests.token1.key, tests.token1.secret) - return st - - def token_leap_sync_target(test, path): st = leap_sync_target(test, path) st.set_token_credentials('user-uuid', 'auth-token') @@ -379,12 +367,6 @@ class TestLeapSyncTarget( test_remote_sync_target.TestRemoteSyncTargets, BaseSoledadTest): scenarios = [ - ('http', {'make_app_with_state': make_soledad_app, - 'make_document_for_test': make_leap_document_for_test, - 'sync_target': leap_sync_target}), - ('oauth_http', {'make_app_with_state': make_oauth_http_app, - 'make_document_for_test': make_leap_document_for_test, - 'sync_target': oauth_leap_sync_target}), ('token_soledad', {'make_app_with_state': make_token_soledad_app, 'make_document_for_test': make_leap_document_for_test, diff --git a/src/leap/soledad/tests/test_server.py b/src/leap/soledad/tests/test_server.py new file mode 100644 index 00000000..ec3f636b --- /dev/null +++ b/src/leap/soledad/tests/test_server.py @@ -0,0 +1,239 @@ +# -*- coding: utf-8 -*- +# test_server.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Tests for server-related functionality. +""" + +import os +import shutil +import tempfile +import simplejson as json +import hashlib + + +from leap.soledad.server import URLToAuth +from leap.common.testing.basetest import BaseLeapTest + + +class SoledadServerTestCase(BaseLeapTest): + """ + Tests that guarantee that data will always be encrypted when syncing. + """ + + def setUp(self): + pass + + def tearDown(self): + pass + + def _make_environ(self, path_info, request_method): + return { + 'PATH_INFO': path_info, + 'REQUEST_METHOD': request_method, + } + + def test_verify_action_with_correct_dbnames(self): + """ + Test encrypting and decrypting documents. + + The following table lists the authorized actions among all possible + u1db remote actions: + + URL path | Authorized actions + -------------------------------------------------- + / | GET + /shared-db | GET + /shared-db/docs | - + /shared-db/doc/{id} | GET, PUT, DELETE + /shared-db/sync-from/{source} | - + /user-db | GET, PUT, DELETE + /user-db/docs | - + /user-db/doc/{id} | - + /user-db/sync-from/{source} | GET, PUT, POST + """ + uuid = 'myuuid' + authmap = URLToAuth(uuid) + dbname = authmap._uuid_dbname(uuid) + # test global auth + self.assertTrue( + authmap.is_authorized(self._make_environ('/', 'GET'))) + # test shared-db database resource auth + self.assertTrue( + authmap.is_authorized( + self._make_environ('/shared', 'GET'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/shared', 'PUT'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/shared', 'DELETE'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/shared', 'POST'))) + # test shared-db docs resource auth + self.assertFalse( + authmap.is_authorized( + self._make_environ('/shared/docs', 'GET'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/shared/docs', 'PUT'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/shared/docs', 'DELETE'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/shared/docs', 'POST'))) + # test shared-db doc resource auth + self.assertTrue( + authmap.is_authorized( + self._make_environ('/shared/doc/x', 'GET'))) + self.assertTrue( + authmap.is_authorized( + self._make_environ('/shared/doc/x', 'PUT'))) + self.assertTrue( + authmap.is_authorized( + self._make_environ('/shared/doc/x', 'DELETE'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/shared/doc/x', 'POST'))) + # test shared-db sync resource auth + self.assertFalse( + authmap.is_authorized( + self._make_environ('/shared/sync-from/x', 'GET'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/shared/sync-from/x', 'PUT'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/shared/sync-from/x', 'DELETE'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/shared/sync-from/x', 'POST'))) + # test user-db database resource auth + self.assertTrue( + authmap.is_authorized( + self._make_environ('/%s' % dbname, 'GET'))) + self.assertTrue( + authmap.is_authorized( + self._make_environ('/%s' % dbname, 'PUT'))) + self.assertTrue( + authmap.is_authorized( + self._make_environ('/%s' % dbname, 'DELETE'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s' % dbname, 'POST'))) + # test user-db docs resource auth + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/docs' % dbname, 'GET'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/docs' % dbname, 'PUT'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/docs' % dbname, 'DELETE'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/docs' % dbname, 'POST'))) + # test user-db doc resource auth + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/doc/x' % dbname, 'GET'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/doc/x' % dbname, 'PUT'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/doc/x' % dbname, 'DELETE'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/doc/x' % dbname, 'POST'))) + # test user-db sync resource auth + self.assertTrue( + authmap.is_authorized( + self._make_environ('/%s/sync-from/x' % dbname, 'GET'))) + self.assertTrue( + authmap.is_authorized( + self._make_environ('/%s/sync-from/x' % dbname, 'PUT'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/sync-from/x' % dbname, 'DELETE'))) + self.assertTrue( + authmap.is_authorized( + self._make_environ('/%s/sync-from/x' % dbname, 'POST'))) + + def test_verify_action_with_wrong_dbnames(self): + """ + Test if authorization fails for a wrong dbname. + """ + uuid = 'myuuid' + authmap = URLToAuth(uuid) + dbname = 'somedb' + # test wrong-db database resource auth + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s' % dbname, 'GET'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s' % dbname, 'PUT'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s' % dbname, 'DELETE'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s' % dbname, 'POST'))) + # test wrong-db docs resource auth + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/docs' % dbname, 'GET'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/docs' % dbname, 'PUT'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/docs' % dbname, 'DELETE'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/docs' % dbname, 'POST'))) + # test wrong-db doc resource auth + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/doc/x' % dbname, 'GET'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/doc/x' % dbname, 'PUT'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/doc/x' % dbname, 'DELETE'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/doc/x' % dbname, 'POST'))) + # test wrong-db sync resource auth + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/sync-from/x' % dbname, 'GET'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/sync-from/x' % dbname, 'PUT'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/sync-from/x' % dbname, 'DELETE'))) + self.assertFalse( + authmap.is_authorized( + self._make_environ('/%s/sync-from/x' % dbname, 'POST'))) diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py index 45cd7eb2..1c0e6d4a 100644 --- a/src/leap/soledad/tests/test_soledad.py +++ b/src/leap/soledad/tests/test_soledad.py @@ -24,10 +24,7 @@ Tests for general Soledad functionality. import os import re import tempfile -try: - import simplejson as json -except ImportError: - import json # noqa +import simplejson as json from leap.common.testing.basetest import BaseLeapTest @@ -41,7 +38,7 @@ from leap.soledad.backends.leap_backend import LeapDocument class AuxMethodsTestCase(BaseSoledadTest): def test__init_dirs(self): - sol = self._soledad_instance(prefix='/_init_dirs') + sol = self._soledad_instance(prefix='_init_dirs') sol._init_dirs() local_db_dir = os.path.dirname(sol.local_db_path) secrets_path = os.path.dirname(sol.secrets_path) @@ -94,8 +91,12 @@ class AuxMethodsTestCase(BaseSoledadTest): local_db_path='value_2', server_url='value_1', cert_file=None) - self.assertEqual(self.tempdir+'value_3', sol.secrets_path) - self.assertEqual(self.tempdir+'value_2', sol.local_db_path) + self.assertEqual( + os.path.join(self.tempdir, 'value_3'), + sol.secrets_path) + self.assertEqual( + os.path.join(self.tempdir, 'value_2'), + sol.local_db_path) self.assertEqual('value_1', sol.server_url) diff --git a/src/leap/soledad/tests/test_sqlcipher.py b/src/leap/soledad/tests/test_sqlcipher.py index 60261111..dfc5577b 100644 --- a/src/leap/soledad/tests/test_sqlcipher.py +++ b/src/leap/soledad/tests/test_sqlcipher.py @@ -24,11 +24,10 @@ Test sqlcipher backend internals. import os import time import unittest -try: - import simplejson as json -except ImportError: - import json # noqa +import simplejson as json import threading + + from pysqlcipher import dbapi2 from StringIO import StringIO @@ -440,6 +439,7 @@ def sync_via_synchronizer_and_leap(test, db_source, db_target, test.skipTest("full trace hook unsupported over http") path = test._http_at[db_target] target = LeapSyncTarget.connect(test.getURL(path), test._soledad._crypto) + target.set_token_credentials('user-uuid', 'auth-token') if trace_hook_shallow: target._set_trace_hook_shallow(trace_hook_shallow) return sync.Synchronizer(db_source, target).sync() @@ -663,6 +663,7 @@ def _make_local_db_and_leap_target(test, path='test'): test.startServer() db = test.request_state._create_database(os.path.basename(path)) st = LeapSyncTarget.connect(test.getURL(path), test._soledad._crypto) + st.set_token_credentials('user-uuid', 'auth-token') return db, st @@ -773,7 +774,7 @@ class SQLCipherEncryptionTest(BaseLeapTest): os.unlink(dbfile) def setUp(self): - self.DB_FILE = self.tempdir + '/test.db' + self.DB_FILE = os.path.join(self.tempdir, 'test.db') self._delete_dbfiles() def tearDown(self): |