diff options
Diffstat (limited to 'memoryhole/memoryhole.py')
-rw-r--r-- | memoryhole/memoryhole.py | 258 |
1 files changed, 258 insertions, 0 deletions
diff --git a/memoryhole/memoryhole.py b/memoryhole/memoryhole.py new file mode 100644 index 0000000..84d607c --- /dev/null +++ b/memoryhole/memoryhole.py @@ -0,0 +1,258 @@ +import logging +import tempfile + +import email +from email.message import Message +from email.header import Header +from email.MIMEMultipart import MIMEMultipart +from email.parser import Parser +from StringIO import StringIO + +import gnupg + +logger = logging.getLogger(__name__) + +COPY_HEADERS = ('Subject', 'Message-ID', 'Date', 'To', 'From') + + +class ProtectionLevel(object): + + def __init__(self, signed_by, encrypted_by): + + self.signed_by = signed_by + self.encrypted_by = encrypted_by + + # TODO add __cmp__ + + @property + def score(self): + if self.signed_by and self.encrypted_by: + return 3 + elif self.signed_by: + return 2 + elif self.encrypted_by: + return 1 + else: + return 0 + + def __repr__(self): + return '<ProtectionLevel: sig(%s) encr(%s) score:%s>' % ( + len(self.signed_by), len(self.encrypted_by), self.score) + + +class MemoryHoleHeader(Header): + + def __init__(self, name, value): + self._name = name + self._value = value + + self._h = Header(value, header_name=name) + + self.signed_by = set([]) + self.encrypted_by = set([]) + + self._firstlinelen = self._h._firstlinelen + self._chunks = self._h._chunks + self._continuation_ws = self._h._continuation_ws + self._charset = self._h._charset + + @property + def protection_level(self): + return ProtectionLevel(self.signed_by, self.encrypted_by) + + def __repr__(self): + return '<MemoryHoleHeader(%s) [%s: %s]>' % ( + self.protection_level.score, self._name, self._value) + + +class MemoryHoleMessage(Message): + + def __init__(self, msg, gpg): + self._msg = msg + self._gpg = gpg + + verified = verify_signature(msg, gpg) + self.signed = verified.valid + + self._mh_headers = {} + inner_headers = extract_wrapped_headers(msg) + + for name, value in inner_headers.items(): + mhh = MemoryHoleHeader(name, value) + mhh.signed_by.add(verified.key_id) + self._mh_headers[name] = mhh + + self._charset = self._msg._charset + self._headers = self._msg._headers + self._payload = self._msg._payload + self.preamble = self._msg.preamble + + def get_protected_header(self, header_name): + return self._mh_headers.get(header_name) + + # TODO add is_tampered_header + # TODO add __getattr__, lookup the protected headers first + + +def extract_wrapped_headers(msg): + top_payload = msg.get_payload()[0] + if top_payload.get_content_type() == 'multipart/mixed': + headers = _extract_attached_headers(top_payload) + return headers + + +def verify_signature(msg, gpg): + if not msg.get_content_type() == 'multipart/signed': + logger.error('attempted to verify signature, but msg is not signed') + return False + payloads = msg.get_payload() + body = payloads[0] + attached_sig = payloads[1] + + if not attached_sig.get_content_type() == 'application/pgp-signature': + logger.error('expected an attached signature') + return False + sig = _extract_sig_block(attached_sig.as_string()) + fd, sig_fname = tempfile.mkstemp() + with open(sig_fname, 'w') as sigfile: + sigfile.write(sig) + + body = body.as_string().replace('\n', '\r\n') + verified = gpg.verify_file(StringIO(body), sig_file=sig_fname) + return verified + + +def protect_message(basemsg, gpg, do_sign=True, boundary=None, + sign_digest_algo='SHA512', passphrase=None): + + msg = email.message.Message() + _msg = MIMEMultipart( + _subtype="signed", micalg="pgp-%s" % sign_digest_algo.lower(), + protocol="application/pgp-signature") + + for hdr in COPY_HEADERS: + msg[hdr] = basemsg[hdr] + for hdr in _msg.keys(): + msg[hdr] = _msg[hdr] + if boundary: + msg.set_boundary(boundary()) + + wrapper = _wrap_with_header(basemsg, boundary=boundary) + msg.attach(wrapper) + + if do_sign: + payload = wrapper.as_string().replace('\n', '\r\n') + sig = gpg.sign( + payload, detach=True, + digest_algo=sign_digest_algo, clearsign=False, + passphrase=passphrase) + + signed_msg = _make_signature_subpart(sig) + msg.attach(signed_msg) + return msg + + +def _extract_sig_block(text): + sep = '\n' + BEGIN = '-----BEGIN PGP SIGNATURE-----' + sig_lines = text.split(sep) + return sep.join(sig_lines[sig_lines.index(BEGIN):]) + + +def _extract_attached_headers(msg): + + def split_header(line): + sep = ': ' + split = line.split(sep) + return (split[0], sep.join(split[1:])) + + first = msg.get_payload()[0] + if first.get_content_type() == 'text/rfc822-headers': + raw = first.get_payload() + headers = dict([ + split_header(line) for line in filter(None, raw.split('\n'))]) + return headers + + +def _build_embedded_header(msg): + r = '' + for x in COPY_HEADERS: + if msg.get(x): + r += x + ': ' + msg.get(x) + '\n' + return r + + +def _boundary_factory(start): + counter = {'value': ord(start)} + + def _gen_boundary(): + boundary = chr(counter['value']) * 12 + counter['value'] -= 1 + return boundary + return _gen_boundary + + + +def _wrap_with_header(msg, boundary=None): + body = email.message.Message() + body.set_payload(msg.get_payload()) + body.add_header('Content-Type', msg['Content-Type']) + + emh = email.message.Message() + emh.set_type('text/rfc822-headers') + emh.add_header('Content-Disposition', 'attachment') + emh.set_payload(_build_embedded_header(msg)) + del emh['MIME-Version'] + + wrapper = email.message.Message() + wrapper.set_type('multipart/mixed') + if boundary: + wrapper.set_boundary(boundary()) + + wrapper.set_payload([emh, body]) + del wrapper['MIME-Version'] + return wrapper + + +def _make_signature_subpart(signature): + message = Message() + message['Content-Type'] = 'application/pgp-signature; name="signature.asc"' + message.set_payload(str(signature)) + return message + +if __name__ == "__main__": + import os + import sys + import logging + if os.environ.get('DEBUG'): + logging.basicConfig(level=logging.DEBUG) + if len(sys.argv) != 3: + print "Usage: memoryhole mail_path key_path" + sys.exit() + msg_path = sys.argv[1] + key_path = sys.argv[2] + parser = Parser() + gpg = gnupg.GPG( + binary='/usr/bin/gpg', + homedir='/tmp/memoryhole-tests/', use_agent=False) + + with open('tests/corpus/OpenPGP/' + key_path + '.key') as key_f: + key_data = key_f.read() + gpg.import_keys(key_data) + with open('tests/corpus/OpenPGP/' + key_path + '.pgp') as key_f: + key_data = key_f.read() + gpg.import_keys(key_data) + + #if not gpg.list_keys(): + #key_input = gpg.gen_key_input(key_length=1024, key_type='RSA') + #gpg.gen_key(key_input) + + with open(msg_path) as f: + basetext = f.read() + basemsg = parser.parsestr(basetext) + + boundary = _boundary_factory('c') + msg = protect_message( + basemsg, gpg, boundary=boundary, + sign_digest_algo='SHA256', passphrase='_' + key_path + '_') + print msg.as_string() |