summaryrefslogtreecommitdiff
path: root/testing
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2016-09-16 19:24:55 -0400
committerdrebs <drebs@leap.se>2016-12-12 09:11:59 -0200
commit510c0ba3a0c0ade334090a1c36dab9ccae0ba1b4 (patch)
treedf0f2b8e285fa882eefed919d58d98b7aad3e03a /testing
parentfcf3b3046dd2005992638ebf993d53897af8ed3a (diff)
[feature] blob encryptor / decryptor
Diffstat (limited to 'testing')
-rw-r--r--testing/tests/client/test_crypto2.py126
1 files changed, 117 insertions, 9 deletions
diff --git a/testing/tests/client/test_crypto2.py b/testing/tests/client/test_crypto2.py
index ae280020..f0f6c4af 100644
--- a/testing/tests/client/test_crypto2.py
+++ b/testing/tests/client/test_crypto2.py
@@ -19,16 +19,28 @@
Tests for the _crypto module
"""
+import base64
+import binascii
+import time
+import struct
import StringIO
-
import leap.soledad.client
from leap.soledad.client import _crypto
-
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
+from twisted.trial import unittest
+
+
+snowden1 = (
+ "You can't come up against "
+ "the world's most powerful intelligence "
+ "agencies and not accept the risk. "
+ "If they want to get you, over time "
+ "they will.")
+
def _aes_encrypt(key, iv, data):
backend = default_backend()
@@ -36,20 +48,21 @@ def _aes_encrypt(key, iv, data):
encryptor = cipher.encryptor()
return encryptor.update(data) + encryptor.finalize()
+def _aes_decrypt(key, iv, data):
+ backend = default_backend()
+ cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend)
+ decryptor = cipher.decryptor()
+ return decryptor.update(data) + decryptor.finalize()
+
def test_chunked_encryption():
key = 'A' * 32
iv = 'A' * 16
- data = (
- "You can't come up against "
- "the world's most powerful intelligence "
- "agencies and not accept the risk. "
- "If they want to get you, over time "
- "they will.")
fd = StringIO.StringIO()
- aes = _crypto.AESWriter(key, fd, iv)
+ aes = _crypto.AESEncryptor(key, iv, fd)
+ data = snowden1
block = 16
for i in range(len(data)/block):
@@ -61,3 +74,98 @@ def test_chunked_encryption():
ciphertext = _aes_encrypt(key, iv, data)
assert ciphertext_chunked == ciphertext
+
+
+def test_decrypt():
+ key = 'A' * 32
+ iv = 'A' * 16
+
+ data = snowden1
+ block = 16
+
+ ciphertext = _aes_encrypt(key, iv, data)
+
+ fd = StringIO.StringIO()
+ aes = _crypto.AESDecryptor(key, iv, fd)
+
+ for i in range(len(ciphertext)/block):
+ chunk = ciphertext[i * block:(i+1)*block]
+ aes.write(chunk)
+ aes.end()
+
+ cleartext_chunked = fd.getvalue()
+ assert cleartext_chunked == data
+
+
+
+class BlobTestCase(unittest.TestCase):
+
+ class doc_info:
+ doc_id = 'D-deadbeef'
+ rev = '397932e0c77f45fcb7c3732930e7e9b2:1'
+
+ def test_blob_encryptor(self):
+
+ inf = StringIO.StringIO()
+ inf.write(snowden1)
+ inf.seek(0)
+ outf = StringIO.StringIO()
+
+ blob = _crypto.BlobEncryptor(
+ self.doc_info, inf, result=outf,
+ secret='A' * 96, iv='B'*16)
+
+ d = blob.encrypt()
+ d.addCallback(self._test_blob_encryptor_cb, outf)
+ return d
+
+ def _test_blob_encryptor_cb(self, _, outf):
+ encrypted = outf.getvalue()
+ data = base64.urlsafe_b64decode(encrypted)
+
+ assert data[0] == '\x80'
+ ts, sch, meth = struct.unpack(
+ 'Qbb', data[1:11])
+ assert sch == 1
+ assert meth == 1
+ iv = data[11:27]
+ assert iv == 'B' * 16
+ doc_id = data[27:37]
+ assert doc_id == 'D-deadbeef'
+
+ rev = data[37:71]
+ assert rev == self.doc_info.rev
+
+ ciphertext = data[71:-64]
+ aes_key = _crypto._get_sym_key_for_doc(
+ self.doc_info.doc_id, 'A'*96)
+ assert ciphertext == _aes_encrypt(aes_key, 'B'*16, snowden1)
+
+ decrypted = _aes_decrypt(aes_key, 'B'*16, ciphertext)
+ assert str(decrypted) == snowden1
+
+ def test_blob_decryptor(self):
+
+ inf = StringIO.StringIO()
+ inf.write(snowden1)
+ inf.seek(0)
+ outf = StringIO.StringIO()
+
+ blob = _crypto.BlobEncryptor(
+ self.doc_info, inf, result=outf,
+ secret='A' * 96, iv='B' * 16)
+
+ def do_decrypt(_, outf):
+ decryptor = _crypto.BlobDecryptor(
+ self.doc_info, outf,
+ secret='A' * 96)
+ d = decryptor.decrypt()
+ return d
+
+ d = blob.encrypt()
+ d.addCallback(do_decrypt, outf)
+ d.addCallback(self._test_blob_decryptor_cb)
+ return d
+
+ def _test_blob_decryptor_cb(self, decrypted):
+ assert decrypted.getvalue() == snowden1