summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/soledad/client/_crypto.py74
-rw-r--r--src/leap/soledad/client/_preamble.py88
-rw-r--r--testing/tests/client/test_crypto.py17
3 files changed, 122 insertions, 57 deletions
diff --git a/src/leap/soledad/client/_crypto.py b/src/leap/soledad/client/_crypto.py
index 8cedf52e..d0fb6e7f 100644
--- a/src/leap/soledad/client/_crypto.py
+++ b/src/leap/soledad/client/_crypto.py
@@ -69,11 +69,8 @@ checked during PREAMBLE reading.
import base64
import hashlib
-import warnings
import hmac
import os
-import struct
-import time
from io import BytesIO
from collections import namedtuple
@@ -89,19 +86,14 @@ from cryptography.hazmat.backends import default_backend
from zope.interface import implementer
+from ._preamble import InvalidPreambleException, decode_preamble, Preamble
+from ._preamble import ENC_SCHEME, ENC_METHOD, BLOB_SIGNATURE_MAGIC
+
SECRET_LENGTH = 64
SEPARATOR = ' ' # Anything that doesn't belong to base64 encoding
CRYPTO_BACKEND = default_backend()
-
-PACMAN = struct.Struct('2sbbQ16s255p255pQ')
-LEGACY_PACMAN = struct.Struct('2sbbQ16s255p255p')
-BLOB_SIGNATURE_MAGIC = '\x13\x37'
-
-
-ENC_SCHEME = namedtuple('SCHEME', 'symkey')(1)
-ENC_METHOD = namedtuple('METHOD', 'aes_256_ctr aes_256_gcm')(1, 2)
DocInfo = namedtuple('DocInfo', 'doc_id rev')
@@ -288,18 +280,12 @@ class BlobEncryptor(object):
return d
def _encode_preamble(self):
- current_time = int(time.time())
-
- preamble = PACMAN.pack(
- BLOB_SIGNATURE_MAGIC,
- ENC_SCHEME.symkey,
- ENC_METHOD.aes_256_gcm,
- current_time,
- self.iv,
- str(self.doc_id),
- str(self.rev),
- self._content_size)
- return preamble
+ scheme = ENC_SCHEME.symkey
+ method = ENC_METHOD.aes_256_gcm
+ content_size = self._content_size
+
+ return Preamble(self.doc_id, self.rev, scheme, method, iv=self.iv,
+ content_size=content_size).encode()
def _end_crypto_stream_and_encode_result(self):
@@ -309,10 +295,10 @@ class BlobEncryptor(object):
# chunks?
# FIXME also, it needs to be able to encode chunks with base64 if armor
- preamble, encrypted = self._aes.end()
+ raw_preamble, encrypted = self._aes.end()
result = BytesIO()
result.write(
- base64.urlsafe_b64encode(preamble))
+ base64.urlsafe_b64encode(raw_preamble))
result.write(SEPARATOR)
if self.armor:
@@ -374,7 +360,7 @@ class BlobDecryptor(object):
self.fd.seek(0)
try:
parts = self.fd.getvalue().split(SEPARATOR, 1)
- preamble = base64.urlsafe_b64decode(parts[0])
+ encoded_preamble = base64.urlsafe_b64decode(parts[0])
if len(parts) == 2:
ciphertext = parts[1]
if self.armor:
@@ -390,40 +376,30 @@ class BlobDecryptor(object):
raise InvalidBlob
try:
- if len(preamble) == LEGACY_PACMAN.size:
- warnings.warn("Decrypting a legacy document without size. " +
- "This will be deprecated in 0.12. Doc was: " +
- "doc_id: %s rev: %s" % (self.doc_id, self.rev),
- Warning)
- unpacked_data = LEGACY_PACMAN.unpack(preamble)
- magic, sch, meth, ts, iv, doc_id, rev = unpacked_data
- elif len(preamble) == PACMAN.size:
- unpacked_data = PACMAN.unpack(preamble)
- magic, sch, meth, ts, iv, doc_id, rev, doc_size = unpacked_data
- self.size = doc_size
- else:
- raise InvalidBlob("Unexpected preamble size %d", len(preamble))
- except struct.error as e:
+ preamble = decode_preamble(encoded_preamble)
+ except InvalidPreambleException as e:
raise InvalidBlob(e)
- if magic != BLOB_SIGNATURE_MAGIC:
+ if preamble.magic != BLOB_SIGNATURE_MAGIC:
raise InvalidBlob
# TODO check timestamp. Just as a sanity check, but for instance
# we can refuse to process something that is in the future or
# too far in the past (1984 would be nice, hehe)
- if sch != ENC_SCHEME.symkey:
- raise InvalidBlob('Invalid scheme: %s' % sch)
- if meth != ENC_METHOD.aes_256_gcm:
- raise InvalidBlob('Invalid encryption scheme: %s' % meth)
- if rev != self.rev:
+ if preamble.scheme != ENC_SCHEME.symkey:
+ raise InvalidBlob('Invalid scheme: %s' % preamble.scheme)
+ if preamble.method != ENC_METHOD.aes_256_gcm:
+ method = preamble.method
+ raise InvalidBlob('Invalid encryption scheme: %s' % method)
+ if preamble.rev != self.rev:
+ rev = preamble.rev
msg = 'Invalid revision. Expected: %s, was: %s' % (self.rev, rev)
raise InvalidBlob(msg)
- if doc_id != self.doc_id:
+ if preamble.doc_id != self.doc_id:
msg = 'Invalid doc_id. '
- + 'Expected: %s, was: %s' % (self.doc_id, doc_id)
+ + 'Expected: %s, was: %s' % (self.doc_id, preamble.doc_id)
raise InvalidBlob(msg)
- return preamble, iv
+ return encoded_preamble, preamble.iv
def _end_stream(self):
try:
diff --git a/src/leap/soledad/client/_preamble.py b/src/leap/soledad/client/_preamble.py
new file mode 100644
index 00000000..262d70ce
--- /dev/null
+++ b/src/leap/soledad/client/_preamble.py
@@ -0,0 +1,88 @@
+# -*- coding: utf-8 -*-
+# _preamble.py
+# Copyright (C) 2017 LEAP Encryption Access Project
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Preamble is a metadata payload present on encrypted documents. It holds data
+about encryption scheme, iv, document id and sync related data.
+ BLOB_SIGNATURE_MAGIC, -> used to differentiate from other data formats
+ ENC_SCHEME, -> cryptographic scheme (symmetric or asymmetric)
+ ENC_METHOD, -> cipher used, such as AES-GCM or AES-CTR or GPG
+ current_time, -> time.time()
+ self.iv, -> initialization vector if any, or 0 when not applicable
+ str(self.doc_id), -> document id
+ str(self.rev), -> current revision
+ self._content_size) -> size, rounded to ceiling
+"""
+import warnings
+import struct
+import time
+from collections import namedtuple
+PACMAN = struct.Struct('2sbbQ16s255p255pQ')
+LEGACY_PACMAN = struct.Struct('2sbbQ16s255p255p') # DEPRECATED
+BLOB_SIGNATURE_MAGIC = '\x13\x37'
+ENC_SCHEME = namedtuple('SCHEME', 'symkey')(1)
+ENC_METHOD = namedtuple('METHOD', 'aes_256_ctr aes_256_gcm')(1, 2)
+
+
+class InvalidPreambleException(Exception):
+ pass
+
+
+class Preamble:
+
+ def __init__(self, doc_id, rev, scheme, method,
+ timestamp=0, iv='', magic=None, content_size=0):
+ self.doc_id = doc_id
+ self.rev = rev
+ self.scheme = scheme
+ self.method = method
+ self.iv = iv
+ self.timestamp = int(timestamp) or int(time.time())
+ self.magic = magic or BLOB_SIGNATURE_MAGIC
+ self.content_size = int(content_size)
+
+ def encode(self):
+ preamble = PACMAN.pack(
+ self.magic,
+ self.scheme,
+ self.method,
+ self.timestamp,
+ self.iv,
+ str(self.doc_id),
+ str(self.rev),
+ self.content_size)
+ return preamble
+
+
+def decode_preamble(encoded_preamble):
+ preamble_size = len(encoded_preamble)
+ try:
+ if preamble_size == LEGACY_PACMAN.size:
+ unpacked_data = LEGACY_PACMAN.unpack(encoded_preamble)
+ magic, sch, meth, ts, iv, doc_id, rev = unpacked_data
+ warnings.warn("Decoding a legacy preamble without size. " +
+ "This will be deprecated in 0.12. Doc was: " +
+ "doc_id: %s rev: %s" % (doc_id, rev), Warning)
+ return Preamble(doc_id, rev, sch, meth, ts, iv, magic)
+ elif preamble_size == PACMAN.size:
+ unpacked_data = PACMAN.unpack(encoded_preamble)
+ magic, sch, meth, ts, iv, doc_id, rev, size = unpacked_data
+ return Preamble(doc_id, rev, sch, meth, ts, iv, magic, int(size))
+ else:
+ raise InvalidPreambleException("Unexpected preamble size %d",
+ preamble_size)
+ except struct.error as e:
+ raise InvalidPreambleException(e)
diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py
index 10cccbb2..5a50b118 100644
--- a/testing/tests/client/test_crypto.py
+++ b/testing/tests/client/test_crypto.py
@@ -34,6 +34,7 @@ from leap.soledad.common.document import SoledadDocument
from test_soledad.util import BaseSoledadTest
from leap.soledad.client import _crypto
from leap.soledad.client import _scrypt
+from leap.soledad.client import _preamble
from twisted.trial import unittest
from twisted.internet import defer
@@ -136,8 +137,8 @@ class BlobTestCase(unittest.TestCase):
ciphertext = base64.urlsafe_b64decode(ciphertext)
ciphertext = ciphertext[:-16]
- assert len(preamble) == _crypto.PACMAN.size
- unpacked_data = _crypto.PACMAN.unpack(preamble)
+ assert len(preamble) == _preamble.PACMAN.size
+ unpacked_data = _preamble.PACMAN.unpack(preamble)
magic, sch, meth, ts, iv, doc_id, rev, _ = unpacked_data
assert magic == _crypto.BLOB_SIGNATURE_MAGIC
assert sch == 1
@@ -317,7 +318,7 @@ class PreambleTestCase(unittest.TestCase):
def test_preamble_has_cipher_metadata(self):
preamble = self.blob._encode_preamble()
- unpacked = _crypto.PACMAN.unpack(preamble)
+ unpacked = _preamble.PACMAN.unpack(preamble)
encryption_scheme, encryption_method = unpacked[1:3]
assert encryption_scheme in _crypto.ENC_SCHEME
assert encryption_method in _crypto.ENC_METHOD
@@ -325,14 +326,14 @@ class PreambleTestCase(unittest.TestCase):
def test_preamble_has_document_sync_metadata(self):
preamble = self.blob._encode_preamble()
- unpacked = _crypto.PACMAN.unpack(preamble)
+ unpacked = _preamble.PACMAN.unpack(preamble)
doc_id, doc_rev = unpacked[5:7]
assert doc_id == self.doc_info.doc_id
assert doc_rev == self.doc_info.rev
def test_preamble_has_document_size(self):
preamble = self.blob._encode_preamble()
- unpacked = _crypto.PACMAN.unpack(preamble)
+ unpacked = _preamble.PACMAN.unpack(preamble)
size = unpacked[7]
assert size == _crypto._ceiling(len(snowden1))
@@ -341,8 +342,8 @@ class PreambleTestCase(unittest.TestCase):
# XXX: This test case is here only to test backwards compatibility!
preamble = self.blob._encode_preamble()
# repack preamble using legacy format, without doc size
- unpacked = _crypto.PACMAN.unpack(preamble)
- preamble_without_size = _crypto.LEGACY_PACMAN.pack(*unpacked[0:7])
+ unpacked = _preamble.PACMAN.unpack(preamble)
+ preamble_without_size = _preamble.LEGACY_PACMAN.pack(*unpacked[0:7])
# encrypt it manually for custom tag
ciphertext, tag = _aes_encrypt(self.blob.sym_key, self.blob.iv,
self.cleartext.getvalue(),
@@ -359,7 +360,7 @@ class PreambleTestCase(unittest.TestCase):
assert cleartext.getvalue() == self.cleartext.getvalue()
warnings = self.flushWarnings()
assert len(warnings) == 1
- assert 'legacy document without size' in warnings[0]['message']
+ assert 'legacy preamble without size' in warnings[0]['message']
def _aes_encrypt(key, iv, data, aead=''):