From 10574757e77bb828782b467656ce304f562417e5 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Wed, 6 Jul 2016 18:00:44 +0200 Subject: [feat] add multipart/signed --- memoryhole/gpg.py | 11 ++++++++--- memoryhole/openpgp.py | 14 +++++++++++++- memoryhole/protection.py | 49 +++++++++++++++++++++++++++++++++++++----------- 3 files changed, 59 insertions(+), 15 deletions(-) (limited to 'memoryhole') diff --git a/memoryhole/gpg.py b/memoryhole/gpg.py index 3f1c59a..1ff3ac8 100644 --- a/memoryhole/gpg.py +++ b/memoryhole/gpg.py @@ -9,8 +9,13 @@ class Gnupg(object): from gnupg import GPG self.gpg = GPG() - def encrypt(self, data, encraddr, signaddr): - result = self.gpg.encrypt(data, *encraddr, default_key=signaddr) + def encrypt(self, data, encraddr): + result = self.gpg.encrypt(data, *encraddr) + self._check_gpg_error(result) + return result.data + + def sign(self, data): + result = self.gpg.sign(data) self._check_gpg_error(result) return result.data @@ -23,4 +28,4 @@ class Gnupg(object): def _check_gpg_error(self, result): stderr = getattr(result, 'stderr', '') if getattr(result, 'ok', False) is not True: - raise RuntimeError('Failed to encrypt/decrypt: %s' % stderr) + raise RuntimeError('Failed gnupg operation: %s' % stderr) diff --git a/memoryhole/openpgp.py b/memoryhole/openpgp.py index 3cefc3c..3318704 100644 --- a/memoryhole/openpgp.py +++ b/memoryhole/openpgp.py @@ -2,7 +2,7 @@ from zope.interface import Interface class IOpenPGP(Interface): - def encrypt(data, encraddr, singaddr): + def encrypt(data, encraddr): """ Encrypt and sign data. @@ -18,6 +18,18 @@ class IOpenPGP(Interface): """ pass + def sign(data): + """ + Sign data. + + :param data: data to be encrypted + :type data: str + + :return: signature + :rtype: str + """ + pass + def decrypt(data): """ Decrypt and verify data. diff --git a/memoryhole/protection.py b/memoryhole/protection.py index d1b372b..08d5fb0 100644 --- a/memoryhole/protection.py +++ b/memoryhole/protection.py @@ -1,8 +1,17 @@ +import re + +try: + from StringIO import StringIO +except ImportError: + from io import StringIO from email.mime.application import MIMEApplication -from email.utils import getaddresses, parseaddr +from email.utils import getaddresses from memoryhole.gpg import Gnupg -from memoryhole.rfc3156 import PGPEncrypted, MultipartEncrypted +from memoryhole.rfc3156 import ( + PGPEncrypted, MultipartEncrypted, RFC3156CompliantGenerator, + MultipartSigned, PGPSignature, encode_base64_rec +) class ProtectConfig(object): @@ -47,20 +56,18 @@ def protect(msg, encrypt=True, config=None): if encrypt: return _encrypt_mime(msg, config) - raise NotImplementedError() + return _sign_mime(msg, config) def _encrypt_mime(msg, config): encraddr = _recipient_addresses(msg) - signaddr = _from_address(msg) newmsg = MultipartEncrypted('application/pgp-encrypted') for hkey, hval in msg.items(): newmsg.add_header(hkey, hval) del(msg[hkey]) - encstr = config.openpgp.encrypt(msg.as_string(unixfrom=False), - encraddr, signaddr) + encstr = config.openpgp.encrypt(msg.as_string(unixfrom=False), encraddr) encmsg = MIMEApplication( encstr, _subtype='octet-stream', _encoder=lambda x: x) encmsg.add_header('content-disposition', 'attachment', @@ -75,13 +82,33 @@ def _encrypt_mime(msg, config): return newmsg +def _sign_mime(msg, config): + newmsg = MultipartSigned('application/pgp-signature', 'pgp-sha512') + for hkey, hval in msg.items(): + newmsg.add_header(hkey, hval) + del(msg[hkey]) + + # apply base64 content-transfer-encoding + encode_base64_rec(msg) + # get message text with headers and replace \n for \r\n + fp = StringIO() + g = RFC3156CompliantGenerator(fp, mangle_from_=False, maxheaderlen=76) + g.flatten(msg) + msgtext = re.sub('\r?\n', '\r\n', fp.getvalue()) + # make sure signed message ends with \r\n as per OpenPGP stantard. + if msg.is_multipart() and not msgtext.endswith("\r\n"): + msgtext += "\r\n" + + signature = config.openpgp.sign(msgtext) + sigmsg = PGPSignature(signature) + # attach original message and signature to new message + newmsg.attach(msg) + newmsg.attach(sigmsg) + return newmsg + + def _recipient_addresses(msg): recipients = [] for header in ('to', 'cc', 'bcc'): recipients += msg.get_all(header, []) return [r[1] for r in getaddresses(recipients)] - - -def _from_address(msg): - frm = msg.get_all('From', []) - return parseaddr(frm)[1] -- cgit v1.2.3