summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2016-07-06 18:00:44 +0200
committerRuben Pollan <meskio@sindominio.net>2016-07-06 18:00:44 +0200
commit10574757e77bb828782b467656ce304f562417e5 (patch)
treeada0c5150d9d9ff1b415b04a4f354f91b933aef8
parentaafba6d33f8af1d4967d069999b9fb0052ebcd65 (diff)
[feat] add multipart/signed
-rw-r--r--memoryhole/gpg.py11
-rw-r--r--memoryhole/openpgp.py14
-rw-r--r--memoryhole/protection.py49
-rw-r--r--tests/test_protection.py43
4 files changed, 98 insertions, 19 deletions
diff --git a/memoryhole/gpg.py b/memoryhole/gpg.py
index 3f1c59a..1ff3ac8 100644
--- a/memoryhole/gpg.py
+++ b/memoryhole/gpg.py
@@ -9,8 +9,13 @@ class Gnupg(object):
from gnupg import GPG
self.gpg = GPG()
- def encrypt(self, data, encraddr, signaddr):
- result = self.gpg.encrypt(data, *encraddr, default_key=signaddr)
+ def encrypt(self, data, encraddr):
+ result = self.gpg.encrypt(data, *encraddr)
+ self._check_gpg_error(result)
+ return result.data
+
+ def sign(self, data):
+ result = self.gpg.sign(data)
self._check_gpg_error(result)
return result.data
@@ -23,4 +28,4 @@ class Gnupg(object):
def _check_gpg_error(self, result):
stderr = getattr(result, 'stderr', '')
if getattr(result, 'ok', False) is not True:
- raise RuntimeError('Failed to encrypt/decrypt: %s' % stderr)
+ raise RuntimeError('Failed gnupg operation: %s' % stderr)
diff --git a/memoryhole/openpgp.py b/memoryhole/openpgp.py
index 3cefc3c..3318704 100644
--- a/memoryhole/openpgp.py
+++ b/memoryhole/openpgp.py
@@ -2,7 +2,7 @@ from zope.interface import Interface
class IOpenPGP(Interface):
- def encrypt(data, encraddr, singaddr):
+ def encrypt(data, encraddr):
"""
Encrypt and sign data.
@@ -18,6 +18,18 @@ class IOpenPGP(Interface):
"""
pass
+ def sign(data):
+ """
+ Sign data.
+
+ :param data: data to be encrypted
+ :type data: str
+
+ :return: signature
+ :rtype: str
+ """
+ pass
+
def decrypt(data):
"""
Decrypt and verify data.
diff --git a/memoryhole/protection.py b/memoryhole/protection.py
index d1b372b..08d5fb0 100644
--- a/memoryhole/protection.py
+++ b/memoryhole/protection.py
@@ -1,8 +1,17 @@
+import re
+
+try:
+ from StringIO import StringIO
+except ImportError:
+ from io import StringIO
from email.mime.application import MIMEApplication
-from email.utils import getaddresses, parseaddr
+from email.utils import getaddresses
from memoryhole.gpg import Gnupg
-from memoryhole.rfc3156 import PGPEncrypted, MultipartEncrypted
+from memoryhole.rfc3156 import (
+ PGPEncrypted, MultipartEncrypted, RFC3156CompliantGenerator,
+ MultipartSigned, PGPSignature, encode_base64_rec
+)
class ProtectConfig(object):
@@ -47,20 +56,18 @@ def protect(msg, encrypt=True, config=None):
if encrypt:
return _encrypt_mime(msg, config)
- raise NotImplementedError()
+ return _sign_mime(msg, config)
def _encrypt_mime(msg, config):
encraddr = _recipient_addresses(msg)
- signaddr = _from_address(msg)
newmsg = MultipartEncrypted('application/pgp-encrypted')
for hkey, hval in msg.items():
newmsg.add_header(hkey, hval)
del(msg[hkey])
- encstr = config.openpgp.encrypt(msg.as_string(unixfrom=False),
- encraddr, signaddr)
+ encstr = config.openpgp.encrypt(msg.as_string(unixfrom=False), encraddr)
encmsg = MIMEApplication(
encstr, _subtype='octet-stream', _encoder=lambda x: x)
encmsg.add_header('content-disposition', 'attachment',
@@ -75,13 +82,33 @@ def _encrypt_mime(msg, config):
return newmsg
+def _sign_mime(msg, config):
+ newmsg = MultipartSigned('application/pgp-signature', 'pgp-sha512')
+ for hkey, hval in msg.items():
+ newmsg.add_header(hkey, hval)
+ del(msg[hkey])
+
+ # apply base64 content-transfer-encoding
+ encode_base64_rec(msg)
+ # get message text with headers and replace \n for \r\n
+ fp = StringIO()
+ g = RFC3156CompliantGenerator(fp, mangle_from_=False, maxheaderlen=76)
+ g.flatten(msg)
+ msgtext = re.sub('\r?\n', '\r\n', fp.getvalue())
+ # make sure signed message ends with \r\n as per OpenPGP stantard.
+ if msg.is_multipart() and not msgtext.endswith("\r\n"):
+ msgtext += "\r\n"
+
+ signature = config.openpgp.sign(msgtext)
+ sigmsg = PGPSignature(signature)
+ # attach original message and signature to new message
+ newmsg.attach(msg)
+ newmsg.attach(sigmsg)
+ return newmsg
+
+
def _recipient_addresses(msg):
recipients = []
for header in ('to', 'cc', 'bcc'):
recipients += msg.get_all(header, [])
return [r[1] for r in getaddresses(recipients)]
-
-
-def _from_address(msg):
- frm = msg.get_all('From', [])
- return parseaddr(frm)[1]
diff --git a/tests/test_protection.py b/tests/test_protection.py
index 1220639..980a5ff 100644
--- a/tests/test_protection.py
+++ b/tests/test_protection.py
@@ -1,4 +1,6 @@
+import six
import unittest
+from base64 import b64encode
from email.parser import Parser
from zope.interface import implementer
@@ -23,7 +25,7 @@ Subject: %(subject)s
class ProtectTest(unittest.TestCase):
- def test_pgp_mime(self):
+ def test_pgp_encrypted_mime(self):
p = Parser()
msg = p.parsestr(EMAIL)
encrypter = Encrypter()
@@ -33,7 +35,6 @@ class ProtectTest(unittest.TestCase):
self.assertEqual(encmsg.get_payload(1).get_payload(), encrypter.encstr)
self.assertEqual(BODY, encrypter.data[1:-1]) # remove '\n'
self.assertEqual([TO], encrypter.encraddr)
- self.assertEqual(FROM, encrypter.singaddr)
self.assertEqual(encmsg.get_content_type(), "multipart/encrypted")
def test_unobscured_headers(self):
@@ -47,17 +48,51 @@ class ProtectTest(unittest.TestCase):
self.assertEqual(encmsg['to'], TO)
self.assertEqual(encmsg['subject'], SUBJECT)
+ def test_pgp_signed_mime(self):
+ p = Parser()
+ msg = p.parsestr(EMAIL)
+ signer = Signer()
+ conf = ProtectConfig(openpgp=signer)
+ encmsg = protect(msg, encrypt=False, config=conf)
+
+ b64body = b64encode(six.b(BODY+'\n'))
+ self.assertEqual(six.b(encmsg.get_payload(0).get_payload()), b64body)
+ self.assertEqual(encmsg.get_payload(1).get_payload(), signer.signature)
+ self.assertEqual(
+ six.b("Content-Transfer-Encoding: base64\r\n\r\n")+b64body,
+ six.b(signer.data))
+ self.assertEqual(encmsg.get_content_type(), "multipart/signed")
+
+ def test_signed_headers(self):
+ p = Parser()
+ msg = p.parsestr(EMAIL)
+ signer = Signer()
+ conf = ProtectConfig(openpgp=signer)
+ encmsg = protect(msg, encrypt=False, config=conf)
+
+ self.assertEqual(encmsg['from'], FROM)
+ self.assertEqual(encmsg['to'], TO)
+ self.assertEqual(encmsg['subject'], SUBJECT)
+
@implementer(IOpenPGP)
class Encrypter(object):
encstr = "this is encrypted"
- def encrypt(self, data, encraddr, singaddr):
+ def encrypt(self, data, encraddr):
self.data = data
self.encraddr = encraddr
- self.singaddr = singaddr
return self.encstr
+@implementer(IOpenPGP)
+class Signer(object):
+ signature = "this is a signature"
+
+ def sign(self, data):
+ self.data = data
+ return self.signature
+
+
if __name__ == "__main__":
unittest.main()