diff options
| author | Ruben Pollan <meskio@sindominio.net> | 2016-07-06 20:35:29 +0200 | 
|---|---|---|
| committer | Ruben Pollan <meskio@sindominio.net> | 2016-07-06 20:59:38 +0200 | 
| commit | ae9c4a0c3de7eebbcef3c145af2a3eaa86376e44 (patch) | |
| tree | bcae93754099dacbddc948d19952b109eb9d3fa2 | |
| parent | 794911534ce8121acd7dba89cc53c20fe088e5bd (diff) | |
[feat] protect signing headers
| -rw-r--r-- | memoryhole/protection.py | 45 | ||||
| -rw-r--r-- | tests/test_protection.py | 24 | 
2 files changed, 47 insertions, 22 deletions
| diff --git a/memoryhole/protection.py b/memoryhole/protection.py index 08d5fb0..2f2d704 100644 --- a/memoryhole/protection.py +++ b/memoryhole/protection.py @@ -6,6 +6,7 @@ except ImportError:          from io import StringIO  from email.mime.application import MIMEApplication  from email.utils import getaddresses +from copy import deepcopy  from memoryhole.gpg import Gnupg  from memoryhole.rfc3156 import ( @@ -16,8 +17,8 @@ from memoryhole.rfc3156 import (  class ProtectConfig(object): -    PROTECTED_HEADERS = ('Subject', 'Message-ID', 'Date', 'To', 'From') -    OBSCURED_HEADERS = ('Subject', 'Message-ID', 'Date', 'To', 'From') +    PROTECTED_HEADERS = ('subject', 'message-id', 'date', 'to', 'from') +    OBSCURED_HEADERS = ('subject', 'message-id', 'date', 'to', 'from')      def __init__(self, openpgp=None, protected_headers=PROTECTED_HEADERS,                   obscured_headers=OBSCURED_HEADERS): @@ -36,6 +37,9 @@ class ProtectConfig(object):              openpgp = Gnupg()          self.openpgp = openpgp +        self.protected_headers = protected_headers +        self.obscured_headers = obscured_headers +  def protect(msg, encrypt=True, config=None):      """ @@ -62,12 +66,10 @@ def protect(msg, encrypt=True, config=None):  def _encrypt_mime(msg, config):      encraddr = _recipient_addresses(msg) -    newmsg = MultipartEncrypted('application/pgp-encrypted') -    for hkey, hval in msg.items(): -        newmsg.add_header(hkey, hval) -        del(msg[hkey]) +    newmsg, part = _fix_headers( +        msg, MultipartEncrypted('application/pgp-encrypted'), config) -    encstr = config.openpgp.encrypt(msg.as_string(unixfrom=False), encraddr) +    encstr = config.openpgp.encrypt(part.as_string(unixfrom=False), encraddr)      encmsg = MIMEApplication(          encstr, _subtype='octet-stream', _encoder=lambda x: x)      encmsg.add_header('content-disposition', 'attachment', @@ -83,17 +85,16 @@ def _encrypt_mime(msg, config):  def _sign_mime(msg, config): -    newmsg = MultipartSigned('application/pgp-signature', 'pgp-sha512') -    for hkey, hval in msg.items(): -        newmsg.add_header(hkey, hval) -        del(msg[hkey]) +    newmsg, part = _fix_headers( +        msg, MultipartSigned('application/pgp-signature', 'pgp-sha512'), +        config)      # apply base64 content-transfer-encoding -    encode_base64_rec(msg) +    encode_base64_rec(part)      # get message text with headers and replace \n for \r\n      fp = StringIO()      g = RFC3156CompliantGenerator(fp, mangle_from_=False, maxheaderlen=76) -    g.flatten(msg) +    g.flatten(part)      msgtext = re.sub('\r?\n', '\r\n', fp.getvalue())      # make sure signed message ends with \r\n as per OpenPGP stantard.      if msg.is_multipart() and not msgtext.endswith("\r\n"): @@ -101,12 +102,28 @@ def _sign_mime(msg, config):      signature = config.openpgp.sign(msgtext)      sigmsg = PGPSignature(signature) +      # attach original message and signature to new message -    newmsg.attach(msg) +    newmsg.attach(part)      newmsg.attach(sigmsg)      return newmsg +def _fix_headers(oldmsg, newmsg, config): +    part = deepcopy(oldmsg) +    for hkey, hval in part.items(): +        newmsg.add_header(hkey, hval) +        del(part[hkey]) +    _protect_headers(newmsg, part, config.protected_headers) +    return newmsg, part + + +def _protect_headers(orig, dest, headers): +    for header in headers: +        if header in orig: +            dest.add_header(header, orig[header]) + +  def _recipient_addresses(msg):      recipients = []      for header in ('to', 'cc', 'bcc'): diff --git a/tests/test_protection.py b/tests/test_protection.py index 980a5ff..2475cb7 100644 --- a/tests/test_protection.py +++ b/tests/test_protection.py @@ -33,7 +33,7 @@ class ProtectTest(unittest.TestCase):          encmsg = protect(msg, config=conf)          self.assertEqual(encmsg.get_payload(1).get_payload(), encrypter.encstr) -        self.assertEqual(BODY, encrypter.data[1:-1])  # remove '\n' +        self._assert_body(encrypter.data, BODY+'\n')          self.assertEqual([TO], encrypter.encraddr)          self.assertEqual(encmsg.get_content_type(), "multipart/encrypted") @@ -58,9 +58,7 @@ class ProtectTest(unittest.TestCase):          b64body = b64encode(six.b(BODY+'\n'))          self.assertEqual(six.b(encmsg.get_payload(0).get_payload()), b64body)          self.assertEqual(encmsg.get_payload(1).get_payload(), signer.signature) -        self.assertEqual( -            six.b("Content-Transfer-Encoding: base64\r\n\r\n")+b64body, -            six.b(signer.data)) +        self._assert_body(signer.data, b64body.decode('utf-8'))          self.assertEqual(encmsg.get_content_type(), "multipart/signed")      def test_signed_headers(self): @@ -68,11 +66,21 @@ class ProtectTest(unittest.TestCase):          msg = p.parsestr(EMAIL)          signer = Signer()          conf = ProtectConfig(openpgp=signer) -        encmsg = protect(msg, encrypt=False, config=conf) +        signmsg = protect(msg, encrypt=False, config=conf) -        self.assertEqual(encmsg['from'], FROM) -        self.assertEqual(encmsg['to'], TO) -        self.assertEqual(encmsg['subject'], SUBJECT) +        self.assertEqual(signmsg['from'], FROM) +        self.assertEqual(signmsg['to'], TO) +        self.assertEqual(signmsg['subject'], SUBJECT) + +        signedpart = signmsg.get_payload(0) +        self.assertEqual(signedpart['from'], FROM) +        self.assertEqual(signedpart['to'], TO) +        self.assertEqual(signedpart['subject'], SUBJECT) + +    def _assert_body(self, data, body): +        p = Parser() +        msg = p.parsestr(data) +        self.assertEqual(msg.get_payload(), body)  @implementer(IOpenPGP) | 
