From c0ae8b109a21fad1c2ff56db0768c16b78f6a5f7 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Wed, 6 Jul 2016 15:59:10 +0200 Subject: [feat] discover encr and sign address --- memoryhole/gpg.py | 16 ++++++++++------ memoryhole/protection.py | 17 +++++++++++++++-- tests/test_protection.py | 15 ++++++++++++++- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/memoryhole/gpg.py b/memoryhole/gpg.py index fcead9e..533d12a 100644 --- a/memoryhole/gpg.py +++ b/memoryhole/gpg.py @@ -1,4 +1,3 @@ -from gnupg import GPG from zope.interface import implementer from memoryhole.openpgp import OpenPGP @@ -7,16 +6,21 @@ from memoryhole.openpgp import OpenPGP @implementer(OpenPGP) class Gnupg(object): def __init__(self): + from gnupg import GPG self.gpg = GPG() - def encrypt(self, data, encraddr, singaddr): - # TODO - encfp = 0 - signfp = 0 - return self.gpg.encrypt(data, encfp, default_key=signfp) + def encrypt(self, data, encraddr, signaddr): + result = self.gpg.encrypt(data, *encraddr, default_key=signaddr) + self._assert_gpg_result_ok(result) + return result.data def decrypt(self, data): pass def verify(self, data, signature): pass + + def _assert_gpg_result_ok(self, result): + stderr = getattr(result, 'stderr', '') + if getattr(result, 'ok', False) is not True: + raise RuntimeError('Failed to encrypt/decrypt: %s' % stderr) diff --git a/memoryhole/protection.py b/memoryhole/protection.py index b179614..b064f47 100644 --- a/memoryhole/protection.py +++ b/memoryhole/protection.py @@ -30,13 +30,14 @@ def protect(msg, openpgp=Gnupg(), encrypt=True, obscure=True): def _encrypt_mime(msg, openpgp): + encraddr = _recipient_addresses(msg) + signaddr = _from_address(msg) + newmsg = MultipartEncrypted('application/pgp-encrypted') for hkey, hval in msg.items(): newmsg.add_header(hkey, hval) del(msg[hkey]) - encraddr = "" # TODO - signaddr = "" # TODO encstr = openpgp.encrypt(msg.as_string(unixfrom=False), encraddr, signaddr) encmsg = MIMEApplication( @@ -51,3 +52,15 @@ def _encrypt_mime(msg, openpgp): newmsg.attach(metamsg) newmsg.attach(encmsg) return newmsg + + +def _recipient_addresses(msg): + recipients = [] + for header in ('to', 'cc', 'bcc'): + recipients += msg.get_all(header, []) + return [r[1] for r in getaddresses(recipients)] + + +def _from_address(msg): + frm = msg.get_all('From', []) + return parseaddr(frm)[1] diff --git a/tests/test_protection.py b/tests/test_protection.py index f83d3f7..59effaa 100644 --- a/tests/test_protection.py +++ b/tests/test_protection.py @@ -1,4 +1,3 @@ -import base64 import unittest from email.parser import Parser from zope.interface import implementer @@ -32,8 +31,20 @@ class ProtectTest(unittest.TestCase): self.assertEqual(encmsg.get_payload(1).get_payload(), encrypter.encstr) self.assertEqual(BODY, encrypter.data[1:-1]) # remove '\n' + self.assertEqual([TO], encrypter.encraddr) + self.assertEqual(FROM, encrypter.singaddr) self.assertEqual(encmsg.get_content_type(), "multipart/encrypted") + def test_unobscured_headers(self): + p = Parser() + msg = p.parsestr(EMAIL) + encrypter = Encrypter() + encmsg = protect(msg, encrypter, obscure=False) + + self.assertEqual(encmsg['from'], FROM) + self.assertEqual(encmsg['to'], TO) + self.assertEqual(encmsg['subject'], SUBJECT) + @implementer(OpenPGP) class Encrypter(object): @@ -41,6 +52,8 @@ class Encrypter(object): def encrypt(self, data, encraddr, singaddr): self.data = data + self.encraddr = encraddr + self.singaddr = singaddr return self.encstr -- cgit v1.2.3