From 7396398e81fda41d4cb2ea7ffbdfa8f02746cad8 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Thu, 28 Apr 2016 00:12:23 -0300 Subject: [feat] simple pgp/mime creation --- memoryhole/__init__.py | 14 ++ memoryhole/gpg.py | 22 +++ memoryhole/openpgp.py | 12 ++ memoryhole/protection.py | 35 +++++ memoryhole/rfc3156.py | 393 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.pip | 1 + tests/test_protection.py | 48 ++++++ 7 files changed, 525 insertions(+) create mode 100644 memoryhole/gpg.py create mode 100644 memoryhole/openpgp.py create mode 100644 memoryhole/protection.py create mode 100644 memoryhole/rfc3156.py create mode 100644 tests/test_protection.py diff --git a/memoryhole/__init__.py b/memoryhole/__init__.py index e69de29..5cd55b6 100644 --- a/memoryhole/__init__.py +++ b/memoryhole/__init__.py @@ -0,0 +1,14 @@ +from memoryhole.protection import protect +from memoryhole.openpgp import OpenPGP +from memoryhole.gpg import Gnupg + + +PROTECTED_HEADERS = ('Subject', 'Message-ID', 'Date', 'To', 'From') +OBSCURED_HEADERS = ('Subject', 'Message-ID', 'Date', 'To', 'From') + + +def unwrap(msg, opengp=Gnupg()): + raise NotImplementedError() + + +__all__ = ["protect", "unwrap", "OpenPGP"] diff --git a/memoryhole/gpg.py b/memoryhole/gpg.py new file mode 100644 index 0000000..fcead9e --- /dev/null +++ b/memoryhole/gpg.py @@ -0,0 +1,22 @@ +from gnupg import GPG +from zope.interface import implementer + +from memoryhole.openpgp import OpenPGP + + +@implementer(OpenPGP) +class Gnupg(object): + def __init__(self): + self.gpg = GPG() + + def encrypt(self, data, encraddr, singaddr): + # TODO + encfp = 0 + signfp = 0 + return self.gpg.encrypt(data, encfp, default_key=signfp) + + def decrypt(self, data): + pass + + def verify(self, data, signature): + pass diff --git a/memoryhole/openpgp.py b/memoryhole/openpgp.py new file mode 100644 index 0000000..5f94861 --- /dev/null +++ b/memoryhole/openpgp.py @@ -0,0 +1,12 @@ +from zope.interface import Interface + + +class OpenPGP(Interface): + def encrypt(data, encraddr, singaddr): + pass + + def decrypt(data): + pass + + def verify(data, signature): + pass diff --git a/memoryhole/protection.py b/memoryhole/protection.py new file mode 100644 index 0000000..900b29a --- /dev/null +++ b/memoryhole/protection.py @@ -0,0 +1,35 @@ +from email.mime.application import MIMEApplication + +from memoryhole.gpg import Gnupg +from memoryhole.rfc3156 import PGPEncrypted, MultipartEncrypted + + +def protect(msg, openpgp=Gnupg(), encrypt=True, obscure=True): + if encrypt: + return _encrypt_mime(msg, openpgp) + + raise NotImplementedError() + + +def _encrypt_mime(msg, openpgp): + newmsg = MultipartEncrypted('application/pgp-encrypted') + for hkey, hval in msg.items(): + newmsg.add_header(hkey, hval) + del(msg[hkey]) + + encraddr = "" # TODO + signaddr = "" # TODO + encstr = openpgp.encrypt(msg.as_string(unixfrom=False), + encraddr, signaddr) + encmsg = MIMEApplication( + encstr, _subtype='octet-stream', _encoder=lambda x: x) + encmsg.add_header('content-disposition', 'attachment', + filename='msg.asc') + + # create meta message + metamsg = PGPEncrypted() + metamsg.add_header('Content-Disposition', 'attachment') + # attach pgp message parts to new message + newmsg.attach(metamsg) + newmsg.attach(encmsg) + return newmsg diff --git a/memoryhole/rfc3156.py b/memoryhole/rfc3156.py new file mode 100644 index 0000000..43f7023 --- /dev/null +++ b/memoryhole/rfc3156.py @@ -0,0 +1,393 @@ +# -*- coding: utf-8 -*- +# rfc3156.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Implements RFC 3156: MIME Security with OpenPGP. +""" + +import base64 +import logging +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart +from email import errors +from email.generator import ( + Generator, + fcre, + NL, + _make_boundary, +) + + +# +# A generator that solves http://bugs.python.org/issue14983 +# + +class RFC3156CompliantGenerator(Generator): + """ + An email generator that addresses Python's issue #14983 for multipart + messages. + + This is just a copy of email.generator.Generator which fixes the following + bug: http://bugs.python.org/issue14983 + """ + + def _handle_multipart(self, msg): + """ + A multipart handling implementation that addresses issue #14983. + + This is just a copy of the parent's method which fixes the following + bug: http://bugs.python.org/issue14983 (see the line marked with + "(***)"). + + :param msg: The multipart message to be handled. + :type msg: email.message.Message + """ + # The trick here is to write out each part separately, merge them all + # together, and then make sure that the boundary we've chosen isn't + # present in the payload. + msgtexts = [] + subparts = msg.get_payload() + if subparts is None: + subparts = [] + elif isinstance(subparts, basestring): + # e.g. a non-strict parse of a message with no starting boundary. + self._fp.write(subparts) + return + elif not isinstance(subparts, list): + # Scalar payload + subparts = [subparts] + for part in subparts: + s = StringIO() + g = self.clone(s) + g.flatten(part, unixfrom=False) + msgtexts.append(s.getvalue()) + # BAW: What about boundaries that are wrapped in double-quotes? + boundary = msg.get_boundary() + if not boundary: + # Create a boundary that doesn't appear in any of the + # message texts. + alltext = NL.join(msgtexts) + boundary = _make_boundary(alltext) + msg.set_boundary(boundary) + # If there's a preamble, write it out, with a trailing CRLF + if msg.preamble is not None: + preamble = msg.preamble + if self._mangle_from_: + preamble = fcre.sub('>From ', msg.preamble) + self._fp.write(preamble + '\n') + # dash-boundary transport-padding CRLF + self._fp.write('--' + boundary + '\n') + # body-part + if msgtexts: + self._fp.write(msgtexts.pop(0)) + # *encapsulation + # --> delimiter transport-padding + # --> CRLF body-part + for body_part in msgtexts: + # delimiter transport-padding CRLF + self._fp.write('\n--' + boundary + '\n') + # body-part + self._fp.write(body_part) + # close-delimiter transport-padding + self._fp.write('\n--' + boundary + '--' + '\n') # (***) Solve #14983 + if msg.epilogue is not None: + self._fp.write('\n') + epilogue = msg.epilogue + if self._mangle_from_: + epilogue = fcre.sub('>From ', msg.epilogue) + self._fp.write(epilogue) + + +# +# Base64 encoding: these are almost the same as python's email.encoder +# solution, but a bit modified. +# + +def _bencode(s): + """ + Encode C{s} in base64. + + :param s: The string to be encoded. + :type s: str + """ + # We can't quite use base64.encodestring() since it tacks on a "courtesy + # newline". Blech! + if not s: + return s + value = base64.encodestring(s) + return value[:-1] + + +def encode_base64(msg): + """ + Encode a non-multipart message's payload in Base64 (in place). + + This method modifies the message contents in place and adds or replaces an + appropriate Content-Transfer-Encoding header. + + :param msg: The non-multipart message to be encoded. + :type msg: email.message.Message + """ + encoding = msg.get('Content-Transfer-Encoding', None) + if encoding is not None: + encoding = encoding.lower() + # XXX Python's email module can only decode quoted-printable, base64 and + # uuencoded data, so we might have to implement other decoding schemes in + # order to support RFC 3156 properly and correctly calculate signatures + # for multipart attachments (eg. 7bit or 8bit encoded attachments). For + # now, if content is already encoded as base64 or if it is encoded with + # some unknown encoding, we just pass. + if encoding in [None, 'quoted-printable', 'x-uuencode', 'uue', 'x-uue']: + orig = msg.get_payload(decode=True) + encdata = _bencode(orig) + msg.set_payload(encdata) + # replace or set the Content-Transfer-Encoding header. + try: + msg.replace_header('Content-Transfer-Encoding', 'base64') + except KeyError: + msg['Content-Transfer-Encoding'] = 'base64' + elif encoding is not 'base64': + logging.err('Unknown content-transfer-encoding: %s' % encoding) + + +def encode_base64_rec(msg): + """ + Encode (possibly multipart) messages in base64 (in place). + + This method modifies the message contents in place. + + :param msg: The non-multipart message to be encoded. + :type msg: email.message.Message + """ + if not msg.is_multipart(): + encode_base64(msg) + else: + for sub in msg.get_payload(): + encode_base64_rec(sub) + + +# +# RFC 1847: multipart/signed and multipart/encrypted +# + +class MultipartSigned(MIMEMultipart): + """ + Multipart/Signed MIME message according to RFC 1847. + + 2.1. Definition of Multipart/Signed + + (1) MIME type name: multipart + (2) MIME subtype name: signed + (3) Required parameters: boundary, protocol, and micalg + (4) Optional parameters: none + (5) Security considerations: Must be treated as opaque while in + transit + + The multipart/signed content type contains exactly two body parts. + The first body part is the body part over which the digital signature + was created, including its MIME headers. The second body part + contains the control information necessary to verify the digital + signature. The first body part may contain any valid MIME content + type, labeled accordingly. The second body part is labeled according + to the value of the protocol parameter. + + When the OpenPGP digital signature is generated: + + (1) The data to be signed MUST first be converted to its content- + type specific canonical form. For text/plain, this means + conversion to an appropriate character set and conversion of + line endings to the canonical sequence. + + (2) An appropriate Content-Transfer-Encoding is then applied; see + section 3. In particular, line endings in the encoded data + MUST use the canonical sequence where appropriate + (note that the canonical line ending may or may not be present + on the last line of encoded data and MUST NOT be included in + the signature if absent). + + (3) MIME content headers are then added to the body, each ending + with the canonical sequence. + + (4) As described in section 3 of this document, any trailing + whitespace MUST then be removed from the signed material. + + (5) As described in [2], the digital signature MUST be calculated + over both the data to be signed and its set of content headers. + + (6) The signature MUST be generated detached from the signed data + so that the process does not alter the signed data in any way. + """ + + def __init__(self, protocol, micalg, boundary=None, _subparts=None): + """ + Initialize the multipart/signed message. + + :param boundary: the multipart boundary string. By default it is + calculated as needed. + :type boundary: str + :param _subparts: a sequence of initial subparts for the payload. It + must be an iterable object, such as a list. You can always + attach new subparts to the message by using the attach() method. + :type _subparts: iterable + """ + MIMEMultipart.__init__( + self, _subtype='signed', boundary=boundary, + _subparts=_subparts) + self.set_param('protocol', protocol) + self.set_param('micalg', micalg) + + def attach(self, payload): + """ + Add the C{payload} to the current payload list. + + Also prevent from adding payloads with wrong Content-Type and from + exceeding a maximum of 2 payloads. + + :param payload: The payload to be attached. + :type payload: email.message.Message + """ + # second payload's content type must be equal to the protocol + # parameter given on object creation + if len(self.get_payload()) == 1: + if payload.get_content_type() != self.get_param('protocol'): + raise errors.MultipartConversionError( + 'Wrong content type %s.' % payload.get_content_type) + # prevent from adding more payloads + if len(self._payload) == 2: + raise errors.MultipartConversionError( + 'Cannot have more than two subparts.') + MIMEMultipart.attach(self, payload) + + +class MultipartEncrypted(MIMEMultipart): + """ + Multipart/encrypted MIME message according to RFC 1847. + + 2.2. Definition of Multipart/Encrypted + + (1) MIME type name: multipart + (2) MIME subtype name: encrypted + (3) Required parameters: boundary, protocol + (4) Optional parameters: none + (5) Security considerations: none + + The multipart/encrypted content type contains exactly two body parts. + The first body part contains the control information necessary to + decrypt the data in the second body part and is labeled according to + the value of the protocol parameter. The second body part contains + the data which was encrypted and is always labeled + application/octet-stream. + """ + + def __init__(self, protocol, boundary=None, _subparts=None): + """ + :param protocol: The encryption protocol to be added as a parameter to + the Content-Type header. + :type protocol: str + :param boundary: the multipart boundary string. By default it is + calculated as needed. + :type boundary: str + :param _subparts: a sequence of initial subparts for the payload. It + must be an iterable object, such as a list. You can always + attach new subparts to the message by using the attach() method. + :type _subparts: iterable + """ + MIMEMultipart.__init__( + self, _subtype='encrypted', boundary=boundary, + _subparts=_subparts) + self.set_param('protocol', protocol) + + def attach(self, payload): + """ + Add the C{payload} to the current payload list. + + Also prevent from adding payloads with wrong Content-Type and from + exceeding a maximum of 2 payloads. + + :param payload: The payload to be attached. + :type payload: email.message.Message + """ + # first payload's content type must be equal to the protocol parameter + # given on object creation + if len(self._payload) == 0: + if payload.get_content_type() != self.get_param('protocol'): + raise errors.MultipartConversionError( + 'Wrong content type.') + # second payload is always application/octet-stream + if len(self._payload) == 1: + if payload.get_content_type() != 'application/octet-stream': + raise errors.MultipartConversionError( + 'Wrong content type %s.' % payload.get_content_type) + # prevent from adding more payloads + if len(self._payload) == 2: + raise errors.MultipartConversionError( + 'Cannot have more than two subparts.') + MIMEMultipart.attach(self, payload) + + +# +# RFC 3156: application/pgp-encrypted, application/pgp-signed and +# application-pgp-signature. +# + +class PGPEncrypted(MIMEApplication): + """ + Application/pgp-encrypted MIME media type according to RFC 3156. + + * MIME media type name: application + * MIME subtype name: pgp-encrypted + * Required parameters: none + * Optional parameters: none + """ + + def __init__(self, version=1): + data = "Version: %d" % version + MIMEApplication.__init__(self, data, 'pgp-encrypted') + + +class PGPSignature(MIMEApplication): + """ + Application/pgp-signature MIME media type according to RFC 3156. + + * MIME media type name: application + * MIME subtype name: pgp-signature + * Required parameters: none + * Optional parameters: none + """ + def __init__(self, _data, name='signature.asc'): + MIMEApplication.__init__(self, _data, 'pgp-signature', + _encoder=lambda x: x, name=name) + self.add_header('Content-Description', 'OpenPGP Digital Signature') + + +class PGPKeys(MIMEApplication): + """ + Application/pgp-keys MIME media type according to RFC 3156. + + * MIME media type name: application + * MIME subtype name: pgp-keys + * Required parameters: none + * Optional parameters: none + """ + + def __init__(self, _data): + MIMEApplication.__init__(self, _data, 'pgp-keys') diff --git a/requirements.pip b/requirements.pip index e3b45e5..1cafd46 100644 --- a/requirements.pip +++ b/requirements.pip @@ -1,2 +1,3 @@ # use peep! +zope.interface gnupg diff --git a/tests/test_protection.py b/tests/test_protection.py new file mode 100644 index 0000000..f83d3f7 --- /dev/null +++ b/tests/test_protection.py @@ -0,0 +1,48 @@ +import base64 +import unittest +from email.parser import Parser +from zope.interface import implementer + +from memoryhole import protect, OpenPGP + + +FROM = "me@domain.com" +TO = "you@other.com" +SUBJECT = "some subject" +BODY = "body text" +EMAIL = """From: %(from)s +To: %(to)s +Subject: %(subject)s + +%(body)s +""" % { + "from": FROM, + "to": TO, + "subject": SUBJECT, + "body": BODY +} + + +class ProtectTest(unittest.TestCase): + def test_pgp_mime(self): + p = Parser() + msg = p.parsestr(EMAIL) + encrypter = Encrypter() + encmsg = protect(msg, encrypter) + + self.assertEqual(encmsg.get_payload(1).get_payload(), encrypter.encstr) + self.assertEqual(BODY, encrypter.data[1:-1]) # remove '\n' + self.assertEqual(encmsg.get_content_type(), "multipart/encrypted") + + +@implementer(OpenPGP) +class Encrypter(object): + encstr = "this is encrypted" + + def encrypt(self, data, encraddr, singaddr): + self.data = data + return self.encstr + + +if __name__ == "__main__": + unittest.main() -- cgit v1.2.3