summaryrefslogtreecommitdiff
path: root/src/leap/soledad/tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad/tests')
-rw-r--r--src/leap/soledad/tests/__init__.py16
-rw-r--r--src/leap/soledad/tests/test_crypto.py75
-rw-r--r--src/leap/soledad/tests/test_leap_backend.py64
-rw-r--r--src/leap/soledad/tests/test_soledad.py3
-rw-r--r--src/leap/soledad/tests/test_sqlcipher.py10
-rw-r--r--src/leap/soledad/tests/u1db_tests/test_https.py3
6 files changed, 111 insertions, 60 deletions
diff --git a/src/leap/soledad/tests/__init__.py b/src/leap/soledad/tests/__init__.py
index 6787aa9d..07038910 100644
--- a/src/leap/soledad/tests/__init__.py
+++ b/src/leap/soledad/tests/__init__.py
@@ -10,7 +10,7 @@ from leap.soledad import Soledad
from leap.soledad.crypto import SoledadCrypto
from leap.soledad.backends.leap_backend import (
LeapDocument,
- decrypt_doc_json,
+ decrypt_doc,
ENC_SCHEME_KEY,
)
from leap.common.testing.basetest import BaseLeapTest
@@ -44,7 +44,8 @@ class BaseSoledadTest(BaseLeapTest):
self._db2.close()
self._soledad.close()
- def _soledad_instance(self, user='leap@leap.se', passphrase='123', prefix='',
+ def _soledad_instance(self, user='leap@leap.se', passphrase='123',
+ prefix='',
secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME,
local_db_path='/soledad.u1db', server_url='',
cert_file=None):
@@ -69,15 +70,16 @@ class BaseSoledadTest(BaseLeapTest):
server_url=server_url, # Soledad will fail if not given an url.
cert_file=cert_file)
- def assertGetEncryptedDoc(self, db, doc_id, doc_rev, content, has_conflicts):
- """Assert that the document in the database looks correct."""
+ def assertGetEncryptedDoc(
+ self, db, doc_id, doc_rev, content, has_conflicts):
+ """
+ Assert that the document in the database looks correct.
+ """
exp_doc = self.make_document(doc_id, doc_rev, content,
has_conflicts=has_conflicts)
doc = db.get_doc(doc_id)
if ENC_SCHEME_KEY in doc.content:
- doc.set_json(
- decrypt_doc_json(
- self._soledad._crypto, doc.doc_id, doc.get_json()))
+ doc.set_json(decrypt_doc(self._soledad._crypto, doc))
self.assertEqual(exp_doc.doc_id, doc.doc_id)
self.assertEqual(exp_doc.rev, doc.rev)
self.assertEqual(exp_doc.has_conflicts, doc.has_conflicts)
diff --git a/src/leap/soledad/tests/test_crypto.py b/src/leap/soledad/tests/test_crypto.py
index 61c5f5b0..9a219bd0 100644
--- a/src/leap/soledad/tests/test_crypto.py
+++ b/src/leap/soledad/tests/test_crypto.py
@@ -31,13 +31,16 @@ except ImportError:
from leap.soledad.backends.leap_backend import (
LeapDocument,
- encrypt_doc_json,
- decrypt_doc_json,
+ encrypt_doc,
+ decrypt_doc,
EncryptionSchemes,
LeapSyncTarget,
ENC_JSON_KEY,
ENC_SCHEME_KEY,
+ MAC_METHOD_KEY,
MAC_KEY,
+ UnknownMacMethod,
+ WrongMac,
)
from leap.soledad.backends.couch import CouchDatabase
from leap.soledad import KeyAlreadyExists, Soledad
@@ -66,16 +69,21 @@ class EncryptedSyncTestCase(BaseSoledadTest):
"""
Test encrypting and decrypting documents.
"""
+ simpledoc = {'key': 'val'}
doc1 = LeapDocument(doc_id='id')
- doc1.content = {'key': 'val'}
- enc_json = encrypt_doc_json(
- self._soledad._crypto, doc1.doc_id, doc1.get_json())
- plain_json = decrypt_doc_json(
- self._soledad._crypto, doc1.doc_id, enc_json)
- doc2 = LeapDocument(doc_id=doc1.doc_id, json=plain_json)
- res1 = doc1.get_json()
- res2 = doc2.get_json()
- self.assertEqual(res1, res2, 'incorrect document encryption')
+ doc1.content = simpledoc
+ # encrypt doc
+ doc1.set_json(encrypt_doc(self._soledad._crypto, doc1))
+ # assert content is different and includes keys
+ self.assertNotEqual(
+ simpledoc, doc1.content,
+ 'incorrect document encryption')
+ self.assertTrue(ENC_JSON_KEY in doc1.content)
+ self.assertTrue(ENC_SCHEME_KEY in doc1.content)
+ # decrypt doc
+ doc1.set_json(decrypt_doc(self._soledad._crypto, doc1))
+ self.assertEqual(
+ simpledoc, doc1.content, 'incorrect document encryption')
def test_encrypt_sym(self):
"""
@@ -84,9 +92,7 @@ class EncryptedSyncTestCase(BaseSoledadTest):
doc1 = LeapDocument()
doc1.content = {'key': 'val'}
enc_json = json.loads(
- encrypt_doc_json(
- self._soledad._crypto,
- doc1.doc_id, doc1.get_json()))[ENC_JSON_KEY]
+ encrypt_doc(self._soledad._crypto, doc1))[ENC_JSON_KEY]
self.assertEqual(
True,
self._soledad._crypto.is_encrypted_sym(enc_json),
@@ -161,7 +167,7 @@ class EncryptedSyncTestCase(BaseSoledadTest):
# # create and encrypt a doc to insert directly in couchdb
# doc = LeapDocument('doc-id')
# doc.set_json(
-# encrypt_doc_json(
+# encrypt_doc(
# self._soledad._crypto, 'doc-id', json.dumps(simple_doc)))
# db.put_doc(doc)
# # setup credentials for access to soledad server
@@ -241,3 +247,42 @@ class CryptoMethodsTestCase(BaseSoledadTest):
sol = self._soledad_instance(user='user@leap.se', prefix='/3')
self.assertTrue(sol._has_secret(), "Should have a secret at "
"this point")
+
+
+class MacAuthTestCase(BaseSoledadTest):
+
+ def test_decrypt_with_wrong_mac_raises(self):
+ """
+ Trying to decrypt a document with wrong MAC should raise.
+ """
+ simpledoc = {'key': 'val'}
+ doc = LeapDocument(doc_id='id')
+ doc.content = simpledoc
+ # encrypt doc
+ doc.set_json(encrypt_doc(self._soledad._crypto, doc))
+ self.assertTrue(MAC_KEY in doc.content)
+ self.assertTrue(MAC_METHOD_KEY in doc.content)
+ # mess with MAC
+ doc.content[MAC_KEY] = 'wrongmac'
+ # try to decrypt doc
+ self.assertRaises(
+ WrongMac,
+ decrypt_doc, self._soledad._crypto, doc)
+
+ def test_decrypt_with_unknown_mac_method_raises(self):
+ """
+ Trying to decrypt a document with unknown MAC method should raise.
+ """
+ simpledoc = {'key': 'val'}
+ doc = LeapDocument(doc_id='id')
+ doc.content = simpledoc
+ # encrypt doc
+ doc.set_json(encrypt_doc(self._soledad._crypto, doc))
+ self.assertTrue(MAC_KEY in doc.content)
+ self.assertTrue(MAC_METHOD_KEY in doc.content)
+ # mess with MAC method
+ doc.content[MAC_METHOD_KEY] = 'mymac'
+ # try to decrypt doc
+ self.assertRaises(
+ UnknownMacMethod,
+ decrypt_doc, self._soledad._crypto, doc)
diff --git a/src/leap/soledad/tests/test_leap_backend.py b/src/leap/soledad/tests/test_leap_backend.py
index dbebadb5..8afae6f6 100644
--- a/src/leap/soledad/tests/test_leap_backend.py
+++ b/src/leap/soledad/tests/test_leap_backend.py
@@ -106,7 +106,8 @@ def make_token_http_database_for_test(test, replica_uid):
auth.TokenBasedAuth.set_token_credentials(self, uuid, token)
def _sign_request(self, method, url_query, params):
- return auth.TokenBasedAuth._sign_request(self, method, url_query, params)
+ return auth.TokenBasedAuth._sign_request(
+ self, method, url_query, params)
http_db = _HTTPDatabaseWithToken(test.getURL('test'))
http_db.set_token_credentials('user-uuid', 'auth-token')
@@ -162,7 +163,8 @@ class TestLeapClientBase(test_http_client.TestHTTPClientBase):
auth.TokenBasedAuth.set_token_credentials(self, uuid, token)
def _sign_request(self, method, url_query, params):
- return auth.TokenBasedAuth._sign_request(self, method, url_query, params)
+ return auth.TokenBasedAuth._sign_request(
+ self, method, url_query, params)
return _HTTPClientWithToken(self.getURL('dbase'), **kwds)
@@ -185,7 +187,8 @@ class TestLeapClientBase(test_http_client.TestHTTPClientBase):
pass
def app(self, environ, start_response):
- res = test_http_client.TestHTTPClientBase.app(self, environ, start_response)
+ res = test_http_client.TestHTTPClientBase.app(
+ self, environ, start_response)
if res is not None:
return res
# mime solead application here.
@@ -195,13 +198,13 @@ class TestLeapClientBase(test_http_client.TestHTTPClientBase):
start_response("401 Unauthorized",
[('Content-Type', 'application/json')])
return [json.dumps({"error": "unauthorized",
- "message": e.message})]
+ "message": e.message})]
scheme, encoded = auth.split(None, 1)
if scheme.lower() != 'token':
start_response("401 Unauthorized",
[('Content-Type', 'application/json')])
return [json.dumps({"error": "unauthorized",
- "message": e.message})]
+ "message": e.message})]
uuid, token = encoded.decode('base64').split(':', 1)
if uuid != 'user-uuid' and token != 'auth-token':
return unauth_err("Incorrect address or token.")
@@ -228,7 +231,6 @@ class TestLeapClientBase(test_http_client.TestHTTPClientBase):
['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res))
-
#-----------------------------------------------------------------------------
# The following tests come from `u1db.tests.test_document`.
#-----------------------------------------------------------------------------
@@ -284,10 +286,9 @@ class TestLeapParsingSyncStream(
"""
Test adapted to use encrypted content.
"""
- doc = leap_backend.LeapDocument('i')
+ doc = leap_backend.LeapDocument('i', rev='r')
doc.content = {}
- enc_json = leap_backend.encrypt_doc_json(
- self._soledad._crypto, doc.doc_id, doc.get_json())
+ enc_json = leap_backend.encrypt_doc(self._soledad._crypto, doc)
tgt = leap_backend.LeapSyncTarget(
"http://foo/foo", crypto=self._soledad._crypto)
@@ -367,6 +368,7 @@ def oauth_leap_sync_target(test, path):
tests.token1.key, tests.token1.secret)
return st
+
def token_leap_sync_target(test, path):
st = leap_sync_target(test, path)
st.set_token_credentials('user-uuid', 'auth-token')
@@ -374,7 +376,7 @@ def token_leap_sync_target(test, path):
class TestLeapSyncTarget(
- test_remote_sync_target.TestRemoteSyncTargets, BaseSoledadTest):
+ test_remote_sync_target.TestRemoteSyncTargets, BaseSoledadTest):
scenarios = [
('http', {'make_app_with_state': make_soledad_app,
@@ -383,9 +385,10 @@ class TestLeapSyncTarget(
('oauth_http', {'make_app_with_state': make_oauth_http_app,
'make_document_for_test': make_leap_document_for_test,
'sync_target': oauth_leap_sync_target}),
- ('token_soledad', {'make_app_with_state': make_token_soledad_app,
- 'make_document_for_test': make_leap_document_for_test,
- 'sync_target': token_leap_sync_target}),
+ ('token_soledad',
+ {'make_app_with_state': make_token_soledad_app,
+ 'make_document_for_test': make_leap_document_for_test,
+ 'sync_target': token_leap_sync_target}),
]
def test_sync_exchange_send(self):
@@ -523,10 +526,11 @@ class TestLeapSyncTargetHttpsSupport(test_https.TestHttpSyncTargetHttpsSupport,
BaseSoledadTest):
scenarios = [
- ('token_soledad_https', {'server_def': test_https.https_server_def,
- 'make_app_with_state': make_token_soledad_app,
- 'make_document_for_test': make_leap_document_for_test,
- 'sync_target': token_leap_https_sync_target}),
+ ('token_soledad_https',
+ {'server_def': test_https.https_server_def,
+ 'make_app_with_state': make_token_soledad_app,
+ 'make_document_for_test': make_leap_document_for_test,
+ 'sync_target': token_leap_https_sync_target}),
]
def setUp(self):
@@ -568,6 +572,7 @@ class TestLeapSyncTargetHttpsSupport(test_https.TestHttpSyncTargetHttpsSupport,
http_client.CertificateError, remote_target.record_sync_info,
'other-id', 2, 'T-id')
+
#-----------------------------------------------------------------------------
# The following tests come from `u1db.tests.test_http_database`.
#-----------------------------------------------------------------------------
@@ -585,7 +590,8 @@ class _HTTPDatabase(http_database.HTTPDatabase, auth.TokenBasedAuth):
self, method, url_query, params)
-class TestHTTPDatabaseWithCreds(test_http_database.TestHTTPDatabaseCtrWithCreds):
+class TestHTTPDatabaseWithCreds(
+ test_http_database.TestHTTPDatabaseCtrWithCreds):
def test_get_sync_target_inherits_token_credentials(self):
# this test was from TestDatabaseSimpleOperations but we put it here
@@ -595,7 +601,6 @@ class TestHTTPDatabaseWithCreds(test_http_database.TestHTTPDatabaseCtrWithCreds)
st = self.db.get_sync_target()
self.assertEqual(self.db._creds, st._creds)
-
def test_ctr_with_creds(self):
db1 = _HTTPDatabase('http://dbs/db', creds={'token': {
'uuid': 'user-uuid',
@@ -658,7 +663,6 @@ class LeapDatabaseSyncTargetTests(
(self.other_changes, new_gen, last_trans_id))
self.assertEqual(10, self.st.get_sync_info('replica')[3])
-
def test_sync_exchange_push_many(self):
"""
Test sync exchange.
@@ -666,9 +670,10 @@ class LeapDatabaseSyncTargetTests(
This test was adapted to decrypt remote content before assert.
"""
docs_by_gen = [
- (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'),
- (self.make_document('doc-id2', 'replica:1', tests.nested_doc), 11,
- 'T-2')]
+ (self.make_document(
+ 'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'),
+ (self.make_document(
+ 'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')]
new_gen, trans_id = self.st.sync_exchange(
docs_by_gen, 'replica', last_known_generation=0,
last_known_trans_id=None, return_doc_cb=self.receive_doc)
@@ -682,7 +687,6 @@ class LeapDatabaseSyncTargetTests(
(self.other_changes, new_gen, trans_id))
self.assertEqual(11, self.st.get_sync_info('replica')[3])
-
def test_sync_exchange_returns_many_new_docs(self):
"""
Test sync exchange.
@@ -766,10 +770,10 @@ class TestLeapDbSync(test_sync.TestDbSync, BaseSoledadTest):
self.assertEqual(1, len(changes))
self.assertEqual(doc2.doc_id, changes[0][0])
self.assertEqual(1, gen - local_gen_before_sync)
- self.assertGetEncryptedDoc(self.db2, doc1.doc_id, doc1.rev, tests.simple_doc,
- False)
- self.assertGetEncryptedDoc(self.db, doc2.doc_id, doc2.rev, tests.nested_doc,
- False)
+ self.assertGetEncryptedDoc(
+ self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False)
+ self.assertGetEncryptedDoc(
+ self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False)
def test_db_sync_autocreate(self):
"""
@@ -785,8 +789,8 @@ class TestLeapDbSync(test_sync.TestDbSync, BaseSoledadTest):
gen, _, changes = db3.whats_changed()
self.assertEqual(1, len(changes))
self.assertEqual(doc1.doc_id, changes[0][0])
- self.assertGetEncryptedDoc(db3, doc1.doc_id, doc1.rev, tests.simple_doc,
- False)
+ self.assertGetEncryptedDoc(
+ db3, doc1.doc_id, doc1.rev, tests.simple_doc, False)
t_gen, _ = self.db._get_replica_gen_and_trans_id('test3.db')
s_gen, _ = db3._get_replica_gen_and_trans_id('test1')
self.assertEqual(1, t_gen)
diff --git a/src/leap/soledad/tests/test_soledad.py b/src/leap/soledad/tests/test_soledad.py
index 49358ab6..6a4261c0 100644
--- a/src/leap/soledad/tests/test_soledad.py
+++ b/src/leap/soledad/tests/test_soledad.py
@@ -69,7 +69,8 @@ class AuxMethodsTestCase(BaseSoledadTest):
secrets_path=None, local_db_path=None,
server_url='', cert_file=None) # otherwise Soledad will fail.
self.assertEquals(
- os.path.join(sol.DEFAULT_PREFIX, Soledad.STORAGE_SECRETS_FILE_NAME),
+ os.path.join(
+ sol.DEFAULT_PREFIX, Soledad.STORAGE_SECRETS_FILE_NAME),
sol.secrets_path)
self.assertEquals(
os.path.join(sol.DEFAULT_PREFIX, 'soledad.u1db'),
diff --git a/src/leap/soledad/tests/test_sqlcipher.py b/src/leap/soledad/tests/test_sqlcipher.py
index c4282c0f..60261111 100644
--- a/src/leap/soledad/tests/test_sqlcipher.py
+++ b/src/leap/soledad/tests/test_sqlcipher.py
@@ -52,10 +52,9 @@ from leap.soledad.backends.sqlcipher import open as u1db_open
from leap.soledad.backends.leap_backend import (
LeapDocument,
EncryptionSchemes,
- decrypt_doc_json,
+ decrypt_doc,
ENC_JSON_KEY,
ENC_SCHEME_KEY,
- MAC_KEY,
)
@@ -634,9 +633,7 @@ class SQLCipherDatabaseSyncTests(
self.sync(self.db2, db3)
doc3 = db3.get_doc('the-doc')
if ENC_SCHEME_KEY in doc3.content:
- doc3.set_json(
- decrypt_doc_json(
- self._soledad._crypto, doc3.doc_id, doc3.get_json()))
+ doc3.set_json(decrypt_doc(self._soledad._crypto, doc3))
self.assertEqual(doc4.get_json(), doc3.get_json())
self.assertFalse(doc3.has_conflicts)
@@ -715,7 +712,8 @@ class SQLCipherSyncTargetTests(
sever-side.
"""
docs_by_gen = [
- (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'),
+ (self.make_document(
+ 'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'),
(self.make_document('doc-id2', 'replica:1', tests.nested_doc), 11,
'T-2')]
new_gen, trans_id = self.st.sync_exchange(
diff --git a/src/leap/soledad/tests/u1db_tests/test_https.py b/src/leap/soledad/tests/u1db_tests/test_https.py
index b4b14722..62180f8c 100644
--- a/src/leap/soledad/tests/u1db_tests/test_https.py
+++ b/src/leap/soledad/tests/u1db_tests/test_https.py
@@ -74,7 +74,8 @@ class TestHttpSyncTargetHttpsSupport(tests.TestCaseWithServer):
# class with one that will do HTTPS independent of the platform. In
# order to maintain the compatibility with u1db default tests, we undo
# that replacement here.
- http_client._VerifiedHTTPSConnection = soledad.old__VerifiedHTTPSConnection
+ http_client._VerifiedHTTPSConnection = \
+ soledad.old__VerifiedHTTPSConnection
super(TestHttpSyncTargetHttpsSupport, self).setUp()
def getSyncTarget(self, host, path=None):