From ae9c4a0c3de7eebbcef3c145af2a3eaa86376e44 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Wed, 6 Jul 2016 20:35:29 +0200 Subject: [feat] protect signing headers --- memoryhole/protection.py | 45 +++++++++++++++++++++++++++++++-------------- tests/test_protection.py | 24 ++++++++++++++++-------- 2 files changed, 47 insertions(+), 22 deletions(-) diff --git a/memoryhole/protection.py b/memoryhole/protection.py index 08d5fb0..2f2d704 100644 --- a/memoryhole/protection.py +++ b/memoryhole/protection.py @@ -6,6 +6,7 @@ except ImportError: from io import StringIO from email.mime.application import MIMEApplication from email.utils import getaddresses +from copy import deepcopy from memoryhole.gpg import Gnupg from memoryhole.rfc3156 import ( @@ -16,8 +17,8 @@ from memoryhole.rfc3156 import ( class ProtectConfig(object): - PROTECTED_HEADERS = ('Subject', 'Message-ID', 'Date', 'To', 'From') - OBSCURED_HEADERS = ('Subject', 'Message-ID', 'Date', 'To', 'From') + PROTECTED_HEADERS = ('subject', 'message-id', 'date', 'to', 'from') + OBSCURED_HEADERS = ('subject', 'message-id', 'date', 'to', 'from') def __init__(self, openpgp=None, protected_headers=PROTECTED_HEADERS, obscured_headers=OBSCURED_HEADERS): @@ -36,6 +37,9 @@ class ProtectConfig(object): openpgp = Gnupg() self.openpgp = openpgp + self.protected_headers = protected_headers + self.obscured_headers = obscured_headers + def protect(msg, encrypt=True, config=None): """ @@ -62,12 +66,10 @@ def protect(msg, encrypt=True, config=None): def _encrypt_mime(msg, config): encraddr = _recipient_addresses(msg) - newmsg = MultipartEncrypted('application/pgp-encrypted') - for hkey, hval in msg.items(): - newmsg.add_header(hkey, hval) - del(msg[hkey]) + newmsg, part = _fix_headers( + msg, MultipartEncrypted('application/pgp-encrypted'), config) - encstr = config.openpgp.encrypt(msg.as_string(unixfrom=False), encraddr) + encstr = config.openpgp.encrypt(part.as_string(unixfrom=False), encraddr) encmsg = MIMEApplication( encstr, _subtype='octet-stream', _encoder=lambda x: x) encmsg.add_header('content-disposition', 'attachment', @@ -83,17 +85,16 @@ def _encrypt_mime(msg, config): def _sign_mime(msg, config): - newmsg = MultipartSigned('application/pgp-signature', 'pgp-sha512') - for hkey, hval in msg.items(): - newmsg.add_header(hkey, hval) - del(msg[hkey]) + newmsg, part = _fix_headers( + msg, MultipartSigned('application/pgp-signature', 'pgp-sha512'), + config) # apply base64 content-transfer-encoding - encode_base64_rec(msg) + encode_base64_rec(part) # get message text with headers and replace \n for \r\n fp = StringIO() g = RFC3156CompliantGenerator(fp, mangle_from_=False, maxheaderlen=76) - g.flatten(msg) + g.flatten(part) msgtext = re.sub('\r?\n', '\r\n', fp.getvalue()) # make sure signed message ends with \r\n as per OpenPGP stantard. if msg.is_multipart() and not msgtext.endswith("\r\n"): @@ -101,12 +102,28 @@ def _sign_mime(msg, config): signature = config.openpgp.sign(msgtext) sigmsg = PGPSignature(signature) + # attach original message and signature to new message - newmsg.attach(msg) + newmsg.attach(part) newmsg.attach(sigmsg) return newmsg +def _fix_headers(oldmsg, newmsg, config): + part = deepcopy(oldmsg) + for hkey, hval in part.items(): + newmsg.add_header(hkey, hval) + del(part[hkey]) + _protect_headers(newmsg, part, config.protected_headers) + return newmsg, part + + +def _protect_headers(orig, dest, headers): + for header in headers: + if header in orig: + dest.add_header(header, orig[header]) + + def _recipient_addresses(msg): recipients = [] for header in ('to', 'cc', 'bcc'): diff --git a/tests/test_protection.py b/tests/test_protection.py index 980a5ff..2475cb7 100644 --- a/tests/test_protection.py +++ b/tests/test_protection.py @@ -33,7 +33,7 @@ class ProtectTest(unittest.TestCase): encmsg = protect(msg, config=conf) self.assertEqual(encmsg.get_payload(1).get_payload(), encrypter.encstr) - self.assertEqual(BODY, encrypter.data[1:-1]) # remove '\n' + self._assert_body(encrypter.data, BODY+'\n') self.assertEqual([TO], encrypter.encraddr) self.assertEqual(encmsg.get_content_type(), "multipart/encrypted") @@ -58,9 +58,7 @@ class ProtectTest(unittest.TestCase): b64body = b64encode(six.b(BODY+'\n')) self.assertEqual(six.b(encmsg.get_payload(0).get_payload()), b64body) self.assertEqual(encmsg.get_payload(1).get_payload(), signer.signature) - self.assertEqual( - six.b("Content-Transfer-Encoding: base64\r\n\r\n")+b64body, - six.b(signer.data)) + self._assert_body(signer.data, b64body.decode('utf-8')) self.assertEqual(encmsg.get_content_type(), "multipart/signed") def test_signed_headers(self): @@ -68,11 +66,21 @@ class ProtectTest(unittest.TestCase): msg = p.parsestr(EMAIL) signer = Signer() conf = ProtectConfig(openpgp=signer) - encmsg = protect(msg, encrypt=False, config=conf) + signmsg = protect(msg, encrypt=False, config=conf) - self.assertEqual(encmsg['from'], FROM) - self.assertEqual(encmsg['to'], TO) - self.assertEqual(encmsg['subject'], SUBJECT) + self.assertEqual(signmsg['from'], FROM) + self.assertEqual(signmsg['to'], TO) + self.assertEqual(signmsg['subject'], SUBJECT) + + signedpart = signmsg.get_payload(0) + self.assertEqual(signedpart['from'], FROM) + self.assertEqual(signedpart['to'], TO) + self.assertEqual(signedpart['subject'], SUBJECT) + + def _assert_body(self, data, body): + p = Parser() + msg = p.parsestr(data) + self.assertEqual(msg.get_payload(), body) @implementer(IOpenPGP) -- cgit v1.2.3