From 10574757e77bb828782b467656ce304f562417e5 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Wed, 6 Jul 2016 18:00:44 +0200 Subject: [feat] add multipart/signed --- memoryhole/gpg.py | 11 ++++++++--- memoryhole/openpgp.py | 14 +++++++++++++- memoryhole/protection.py | 49 +++++++++++++++++++++++++++++++++++++----------- tests/test_protection.py | 43 ++++++++++++++++++++++++++++++++++++++---- 4 files changed, 98 insertions(+), 19 deletions(-) diff --git a/memoryhole/gpg.py b/memoryhole/gpg.py index 3f1c59a..1ff3ac8 100644 --- a/memoryhole/gpg.py +++ b/memoryhole/gpg.py @@ -9,8 +9,13 @@ class Gnupg(object): from gnupg import GPG self.gpg = GPG() - def encrypt(self, data, encraddr, signaddr): - result = self.gpg.encrypt(data, *encraddr, default_key=signaddr) + def encrypt(self, data, encraddr): + result = self.gpg.encrypt(data, *encraddr) + self._check_gpg_error(result) + return result.data + + def sign(self, data): + result = self.gpg.sign(data) self._check_gpg_error(result) return result.data @@ -23,4 +28,4 @@ class Gnupg(object): def _check_gpg_error(self, result): stderr = getattr(result, 'stderr', '') if getattr(result, 'ok', False) is not True: - raise RuntimeError('Failed to encrypt/decrypt: %s' % stderr) + raise RuntimeError('Failed gnupg operation: %s' % stderr) diff --git a/memoryhole/openpgp.py b/memoryhole/openpgp.py index 3cefc3c..3318704 100644 --- a/memoryhole/openpgp.py +++ b/memoryhole/openpgp.py @@ -2,7 +2,7 @@ from zope.interface import Interface class IOpenPGP(Interface): - def encrypt(data, encraddr, singaddr): + def encrypt(data, encraddr): """ Encrypt and sign data. @@ -18,6 +18,18 @@ class IOpenPGP(Interface): """ pass + def sign(data): + """ + Sign data. + + :param data: data to be encrypted + :type data: str + + :return: signature + :rtype: str + """ + pass + def decrypt(data): """ Decrypt and verify data. diff --git a/memoryhole/protection.py b/memoryhole/protection.py index d1b372b..08d5fb0 100644 --- a/memoryhole/protection.py +++ b/memoryhole/protection.py @@ -1,8 +1,17 @@ +import re + +try: + from StringIO import StringIO +except ImportError: + from io import StringIO from email.mime.application import MIMEApplication -from email.utils import getaddresses, parseaddr +from email.utils import getaddresses from memoryhole.gpg import Gnupg -from memoryhole.rfc3156 import PGPEncrypted, MultipartEncrypted +from memoryhole.rfc3156 import ( + PGPEncrypted, MultipartEncrypted, RFC3156CompliantGenerator, + MultipartSigned, PGPSignature, encode_base64_rec +) class ProtectConfig(object): @@ -47,20 +56,18 @@ def protect(msg, encrypt=True, config=None): if encrypt: return _encrypt_mime(msg, config) - raise NotImplementedError() + return _sign_mime(msg, config) def _encrypt_mime(msg, config): encraddr = _recipient_addresses(msg) - signaddr = _from_address(msg) newmsg = MultipartEncrypted('application/pgp-encrypted') for hkey, hval in msg.items(): newmsg.add_header(hkey, hval) del(msg[hkey]) - encstr = config.openpgp.encrypt(msg.as_string(unixfrom=False), - encraddr, signaddr) + encstr = config.openpgp.encrypt(msg.as_string(unixfrom=False), encraddr) encmsg = MIMEApplication( encstr, _subtype='octet-stream', _encoder=lambda x: x) encmsg.add_header('content-disposition', 'attachment', @@ -75,13 +82,33 @@ def _encrypt_mime(msg, config): return newmsg +def _sign_mime(msg, config): + newmsg = MultipartSigned('application/pgp-signature', 'pgp-sha512') + for hkey, hval in msg.items(): + newmsg.add_header(hkey, hval) + del(msg[hkey]) + + # apply base64 content-transfer-encoding + encode_base64_rec(msg) + # get message text with headers and replace \n for \r\n + fp = StringIO() + g = RFC3156CompliantGenerator(fp, mangle_from_=False, maxheaderlen=76) + g.flatten(msg) + msgtext = re.sub('\r?\n', '\r\n', fp.getvalue()) + # make sure signed message ends with \r\n as per OpenPGP stantard. + if msg.is_multipart() and not msgtext.endswith("\r\n"): + msgtext += "\r\n" + + signature = config.openpgp.sign(msgtext) + sigmsg = PGPSignature(signature) + # attach original message and signature to new message + newmsg.attach(msg) + newmsg.attach(sigmsg) + return newmsg + + def _recipient_addresses(msg): recipients = [] for header in ('to', 'cc', 'bcc'): recipients += msg.get_all(header, []) return [r[1] for r in getaddresses(recipients)] - - -def _from_address(msg): - frm = msg.get_all('From', []) - return parseaddr(frm)[1] diff --git a/tests/test_protection.py b/tests/test_protection.py index 1220639..980a5ff 100644 --- a/tests/test_protection.py +++ b/tests/test_protection.py @@ -1,4 +1,6 @@ +import six import unittest +from base64 import b64encode from email.parser import Parser from zope.interface import implementer @@ -23,7 +25,7 @@ Subject: %(subject)s class ProtectTest(unittest.TestCase): - def test_pgp_mime(self): + def test_pgp_encrypted_mime(self): p = Parser() msg = p.parsestr(EMAIL) encrypter = Encrypter() @@ -33,7 +35,6 @@ class ProtectTest(unittest.TestCase): self.assertEqual(encmsg.get_payload(1).get_payload(), encrypter.encstr) self.assertEqual(BODY, encrypter.data[1:-1]) # remove '\n' self.assertEqual([TO], encrypter.encraddr) - self.assertEqual(FROM, encrypter.singaddr) self.assertEqual(encmsg.get_content_type(), "multipart/encrypted") def test_unobscured_headers(self): @@ -47,17 +48,51 @@ class ProtectTest(unittest.TestCase): self.assertEqual(encmsg['to'], TO) self.assertEqual(encmsg['subject'], SUBJECT) + def test_pgp_signed_mime(self): + p = Parser() + msg = p.parsestr(EMAIL) + signer = Signer() + conf = ProtectConfig(openpgp=signer) + encmsg = protect(msg, encrypt=False, config=conf) + + b64body = b64encode(six.b(BODY+'\n')) + self.assertEqual(six.b(encmsg.get_payload(0).get_payload()), b64body) + self.assertEqual(encmsg.get_payload(1).get_payload(), signer.signature) + self.assertEqual( + six.b("Content-Transfer-Encoding: base64\r\n\r\n")+b64body, + six.b(signer.data)) + self.assertEqual(encmsg.get_content_type(), "multipart/signed") + + def test_signed_headers(self): + p = Parser() + msg = p.parsestr(EMAIL) + signer = Signer() + conf = ProtectConfig(openpgp=signer) + encmsg = protect(msg, encrypt=False, config=conf) + + self.assertEqual(encmsg['from'], FROM) + self.assertEqual(encmsg['to'], TO) + self.assertEqual(encmsg['subject'], SUBJECT) + @implementer(IOpenPGP) class Encrypter(object): encstr = "this is encrypted" - def encrypt(self, data, encraddr, singaddr): + def encrypt(self, data, encraddr): self.data = data self.encraddr = encraddr - self.singaddr = singaddr return self.encstr +@implementer(IOpenPGP) +class Signer(object): + signature = "this is a signature" + + def sign(self, data): + self.data = data + return self.signature + + if __name__ == "__main__": unittest.main() -- cgit v1.2.3