diff options
Diffstat (limited to 'src/leap/mx/vendor/pgpy/pgp.py')
-rw-r--r-- | src/leap/mx/vendor/pgpy/pgp.py | 2527 |
1 files changed, 2527 insertions, 0 deletions
diff --git a/src/leap/mx/vendor/pgpy/pgp.py b/src/leap/mx/vendor/pgpy/pgp.py new file mode 100644 index 0000000..82b3a0b --- /dev/null +++ b/src/leap/mx/vendor/pgpy/pgp.py @@ -0,0 +1,2527 @@ +""" pgp.py + +this is where the armorable PGP block objects live +""" +import binascii +import calendar +import collections +import contextlib +import copy +import functools +import itertools +import operator +import os +import re +import warnings +import weakref + +import six + +from datetime import datetime + +from cryptography.hazmat.primitives import hashes + +from .constants import CompressionAlgorithm +from .constants import Features +from .constants import HashAlgorithm +from .constants import ImageEncoding +from .constants import KeyFlags +from .constants import NotationDataFlags +from .constants import PacketTag +from .constants import PubKeyAlgorithm +from .constants import RevocationKeyClass +from .constants import RevocationReason +from .constants import SignatureType +from .constants import SymmetricKeyAlgorithm + +from .decorators import KeyAction + +from .errors import PGPDecryptionError +from .errors import PGPError + +from .packet import Key +from .packet import MDC +from .packet import Packet +from .packet import Primary +from .packet import Private +from .packet import PubKeyV4 +from .packet import PubSubKeyV4 +from .packet import PrivKeyV4 +from .packet import PrivSubKeyV4 +from .packet import Public +from .packet import Sub +from .packet import UserID +from .packet import UserAttribute + +from .packet.packets import CompressedData +from .packet.packets import IntegrityProtectedSKEData +from .packet.packets import IntegrityProtectedSKEDataV1 +from .packet.packets import LiteralData +from .packet.packets import OnePassSignature +from .packet.packets import OnePassSignatureV3 +from .packet.packets import PKESessionKey +from .packet.packets import PKESessionKeyV3 +from .packet.packets import Signature +from .packet.packets import SignatureV4 +from .packet.packets import SKEData +from .packet.packets import Marker +from .packet.packets import SKESessionKey +from .packet.packets import SKESessionKeyV4 + +from .packet.types import Opaque + +from .types import Armorable +from .types import Fingerprint +from .types import ParentRef +from .types import PGPObject +from .types import SignatureVerification +from .types import SorteDeque + +__all__ = ['PGPSignature', + 'PGPUID', + 'PGPMessage', + 'PGPKey', + 'PGPKeyring'] + + +class PGPSignature(Armorable, ParentRef, PGPObject): + @property + def __sig__(self): + return self._signature.signature.__sig__() + + @property + def cipherprefs(self): + """ + A ``list`` of preferred symmetric algorithms specified in this signature, if any. Otherwise, an empty ``list``. + """ + if 'PreferredSymmetricAlgorithms' in self._signature.subpackets: + return next(iter(self._signature.subpackets['h_PreferredSymmetricAlgorithms'])).flags + return [] + + @property + def compprefs(self): + """ + A ``list`` of preferred compression algorithms specified in this signature, if any. Otherwise, an empty ``list``. + """ + if 'PreferredCompressionAlgorithms' in self._signature.subpackets: + return next(iter(self._signature.subpackets['h_PreferredCompressionAlgorithms'])).flags + return [] + + @property + def created(self): + """ + A :py:obj:`~datetime.datetime` of when this signature was created. + """ + return self._signature.subpackets['h_CreationTime'][-1].created + + @property + def embedded(self): + return self.parent is not None + + @property + def expires_at(self): + """ + A :py:obj:`~datetime.datetime` of when this signature expires, if a signature expiration date is specified. + Otherwise, ``None`` + """ + if 'SignatureExpirationTime' in self._signature.subpackets: + expd = next(iter(self._signature.subpackets['SignatureExpirationTime'])).expires + return self.created + expd + return None + + @property + def exportable(self): + """ + ``False`` if this signature is marked as being not exportable. Otherwise, ``True``. + """ + if 'ExportableCertification' in self._signature.subpackets: + return bool(next(iter(self._signature.subpackets['ExportableCertification']))) + + return True + + @property + def features(self): + """ + A ``set`` of implementation features specified in this signature, if any. Otherwise, an empty ``set``. + """ + if 'Features' in self._signature.subpackets: + return next(iter(self._signature.subpackets['Features'])).flags + return set() + + @property + def hash2(self): + return self._signature.hash2 + + @property + def hashprefs(self): + """ + A ``list`` of preferred hash algorithms specified in this signature, if any. Otherwise, an empty ``list``. + """ + if 'PreferredHashAlgorithms' in self._signature.subpackets: + return next(iter(self._signature.subpackets['h_PreferredHashAlgorithms'])).flags + return [] + + @property + def hash_algorithm(self): + """ + The :py:obj:`~constants.HashAlgorithm` used when computing this signature. + """ + return self._signature.halg + + @property + def is_expired(self): + """ + ``True`` if the signature has an expiration date, and is expired. Otherwise, ``False`` + """ + expires_at = self.expires_at + if expires_at is not None and expires_at != self.created: + return expires_at < datetime.utcnow() + + return False + + @property + def key_algorithm(self): + """ + The :py:obj:`~constants.PubKeyAlgorithm` of the key that generated this signature. + """ + return self._signature.pubalg + + @property + def key_expiration(self): + if 'KeyExpirationTime' in self._signature.subpackets: + return next(iter(self._signature.subpackets['KeyExpirationTime'])).expires + return None + + @property + def key_flags(self): + """ + A ``set`` of :py:obj:`~constants.KeyFlags` specified in this signature, if any. Otherwise, an empty ``set``. + """ + if 'KeyFlags' in self._signature.subpackets: + return next(iter(self._signature.subpackets['h_KeyFlags'])).flags + return set() + + @property + def keyserver(self): + """ + The preferred key server specified in this signature, if any. Otherwise, an empty ``str``. + """ + if 'PreferredKeyServer' in self._signature.subpackets: + return next(iter(self._signature.subpackets['h_PreferredKeyServer'])).uri + return '' + + @property + def keyserverprefs(self): + """ + A ``list`` of :py:obj:`~constants.KeyServerPreferences` in this signature, if any. Otherwise, an empty ``list``. + """ + if 'KeyServerPreferences' in self._signature.subpackets: + return next(iter(self._signature.subpackets['h_KeyServerPreferences'])).flags + return [] + + @property + def magic(self): + return "SIGNATURE" + + @property + def notation(self): + """ + A ``dict`` of notation data in this signature, if any. Otherwise, an empty ``dict``. + """ + return dict((nd.name, nd.value) for nd in self._signature.subpackets['NotationData']) + + @property + def policy_uri(self): + """ + The policy URI specified in this signature, if any. Otherwise, an empty ``str``. + """ + if 'Policy' in self._signature.subpackets: + return next(iter(self._signature.subpackets['Policy'])).uri + return '' + + @property + def revocable(self): + """ + ``False`` if this signature is marked as being not revocable. Otherwise, ``True``. + """ + if 'Revocable' in self._signature.subpackets: + return bool(next(iter(self._signature.subpackets['Revocable']))) + return True + + @property + def revocation_key(self): + if 'RevocationKey' in self._signature.subpackets: + raise NotImplementedError() + return None + + @property + def signer(self): + """ + The 16-character Key ID of the key that generated this signature. + """ + return self._signature.signer + + @property + def target_signature(self): + return NotImplemented + + @property + def type(self): + """ + The :py:obj:`~constants.SignatureType` of this signature. + """ + return self._signature.sigtype + + @classmethod + def new(cls, sigtype, pkalg, halg, signer): + sig = PGPSignature() + + sigpkt = SignatureV4() + sigpkt.header.tag = 2 + sigpkt.header.version = 4 + sigpkt.subpackets.addnew('CreationTime', hashed=True, created=datetime.utcnow()) + sigpkt.subpackets.addnew('Issuer', _issuer=signer) + + sigpkt.sigtype = sigtype + sigpkt.pubalg = pkalg + + if halg is not None: + sigpkt.halg = halg + + sig._signature = sigpkt + return sig + + def __init__(self): + """ + PGPSignature objects represent OpenPGP compliant signatures. + + PGPSignature implements the ``__str__`` method, the output of which will be the signature object in + OpenPGP-compliant ASCII-armored format. + + PGPSignature implements the ``__bytes__`` method, the output of which will be the signature object in + OpenPGP-compliant binary format. + """ + super(PGPSignature, self).__init__() + self._signature = None + + def __bytearray__(self): + return self._signature.__bytearray__() + + def __repr__(self): + return "<PGPSignature [{:s}] object at 0x{:02x}>".format(self.type.name, id(self)) + + def __lt__(self, other): + return self.created < other.created + + def __or__(self, other): + if isinstance(other, Signature): + if self._signature is None: + self._signature = other + return self + + ##TODO: this is not a great way to do this + if other.__class__.__name__ == 'EmbeddedSignature': + self._signature = other + return self + + raise TypeError + + def __copy__(self): + # because the default shallow copy isn't actually all that useful, + # and deepcopy does too much work + sig = super(PGPSignature, self).__copy__() + # sig = PGPSignature() + # sig.ascii_headers = self.ascii_headers.copy() + sig |= copy.copy(self._signature) + return sig + + def hashdata(self, subject): + _data = bytearray() + + if isinstance(subject, six.string_types): + subject = subject.encode('latin-1') + + """ + All signatures are formed by producing a hash over the signature + data, and then using the resulting hash in the signature algorithm. + """ + + if self.type == SignatureType.BinaryDocument: + """ + For binary document signatures (type 0x00), the document data is + hashed directly. + """ + _data += bytearray(subject) + + if self.type == SignatureType.CanonicalDocument: + """ + For text document signatures (type 0x01), the + document is canonicalized by converting line endings to <CR><LF>, + and the resulting data is hashed. + """ + _data += re.subn(br'\r?\n', b'\r\n', subject)[0] + + if self.type in {SignatureType.Generic_Cert, SignatureType.Persona_Cert, SignatureType.Casual_Cert, + SignatureType.Positive_Cert, SignatureType.CertRevocation, SignatureType.Subkey_Binding, + SignatureType.PrimaryKey_Binding}: + """ + When a signature is made over a key, the hash data starts with the + octet 0x99, followed by a two-octet length of the key, and then body + of the key packet. (Note that this is an old-style packet header for + a key packet with two-octet length.) ... + Key revocation signatures (types 0x20 and 0x28) + hash only the key being revoked. + """ + _s = b'' + if isinstance(subject, PGPUID): + _s = subject._parent.hashdata + + elif isinstance(subject, PGPKey) and not subject.is_primary: + _s = subject._parent.hashdata + + elif isinstance(subject, PGPKey) and subject.is_primary: + _s = subject.hashdata + + if len(_s) > 0: + _data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s + + if self.type in {SignatureType.Subkey_Binding, SignatureType.PrimaryKey_Binding}: + """ + A subkey binding signature + (type 0x18) or primary key binding signature (type 0x19) then hashes + the subkey using the same format as the main key (also using 0x99 as + the first octet). + """ + if subject.is_primary: + _s = subject.subkeys[self.signer].hashdata + + else: + _s = subject.hashdata + + _data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s + + if self.type in {SignatureType.KeyRevocation, SignatureType.SubkeyRevocation, SignatureType.DirectlyOnKey}: + """ + The signature is calculated directly on the key being revoked. A + revoked key is not to be used. Only revocation signatures by the + key being revoked, or by an authorized revocation key, should be + considered valid revocation signatures. + + Subkey revocation signature + The signature is calculated directly on the subkey being revoked. + A revoked subkey is not to be used. Only revocation signatures + by the top-level signature key that is bound to this subkey, or + by an authorized revocation key, should be considered valid + revocation signatures. + + Signature directly on a key + This signature is calculated directly on a key. It binds the + information in the Signature subpackets to the key, and is + appropriate to be used for subpackets that provide information + about the key, such as the Revocation Key subpacket. It is also + appropriate for statements that non-self certifiers want to make + about the key itself, rather than the binding between a key and a + name. + """ + _s = subject.hashdata + _data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s + + if self.type in {SignatureType.Generic_Cert, SignatureType.Persona_Cert, SignatureType.Casual_Cert, + SignatureType.Positive_Cert, SignatureType.CertRevocation}: + """ + A certification signature (type 0x10 through 0x13) hashes the User + ID being bound to the key into the hash context after the above + data. ... A V4 certification + hashes the constant 0xB4 for User ID certifications or the constant + 0xD1 for User Attribute certifications, followed by a four-octet + number giving the length of the User ID or User Attribute data, and + then the User ID or User Attribute data. + + ... + + The [certificate revocation] signature + is computed over the same data as the certificate that it + revokes, and should have a later creation date than that + certificate. + """ + + _s = subject.hashdata + if subject.is_uid: + _data += b'\xb4' + + else: + _data += b'\xd1' + + _data += self.int_to_bytes(len(_s), 4) + _s + + # if this is a new signature, do update_hlen + if 0 in list(self._signature.signature): + self._signature.update_hlen() + + """ + Once the data body is hashed, then a trailer is hashed. (...) + A V4 signature hashes the packet body + starting from its first field, the version number, through the end + of the hashed subpacket data. Thus, the fields hashed are the + signature version, the signature type, the public-key algorithm, the + hash algorithm, the hashed subpacket length, and the hashed + subpacket body. + + V4 signatures also hash in a final trailer of six octets: the + version of the Signature packet, i.e., 0x04; 0xFF; and a four-octet, + big-endian number that is the length of the hashed data from the + Signature packet (note that this number does not include these final + six octets). + """ + + hcontext = bytearray() + hcontext.append(self._signature.header.version if not self.embedded else self._signature._sig.header.version) + hcontext.append(self.type) + hcontext.append(self.key_algorithm) + hcontext.append(self.hash_algorithm) + hcontext += self._signature.subpackets.__hashbytearray__() + hlen = len(hcontext) + _data += hcontext + _data += b'\x04\xff' + _data += self.int_to_bytes(hlen, 4) + return bytes(_data) + + def make_onepass(self): + onepass = OnePassSignatureV3() + onepass.sigtype = self.type + onepass.halg = self.hash_algorithm + onepass.pubalg = self.key_algorithm + onepass.signer = self.signer + onepass.update_hlen() + return onepass + + def parse(self, packet): + unarmored = self.ascii_unarmor(packet) + data = unarmored['body'] + + if unarmored['magic'] is not None and unarmored['magic'] != 'SIGNATURE': + raise ValueError('Expected: SIGNATURE. Got: {}'.format(str(unarmored['magic']))) + + if unarmored['headers'] is not None: + self.ascii_headers = unarmored['headers'] + + # load *one* packet from data + pkt = Packet(data) + if pkt.header.tag == PacketTag.Signature and not isinstance(pkt, Opaque): + self._signature = pkt + + else: + raise ValueError('Expected: Signature. Got: {:s}'.format(pkt.__class__.__name__)) + + +class PGPUID(ParentRef): + @property + def __sig__(self): + return list(self._signatures) + + @property + def name(self): + """If this is a User ID, the stored name. If this is not a User ID, this will be an empty string.""" + return self._uid.name if isinstance(self._uid, UserID) else "" + + @property + def comment(self): + """ + If this is a User ID, this will be the stored comment. If this is not a User ID, or there is no stored comment, + this will be an empty string., + """ + + return self._uid.comment if isinstance(self._uid, UserID) else "" + + @property + def email(self): + """ + If this is a User ID, this will be the stored email address. If this is not a User ID, or there is no stored + email address, this will be an empty string. + """ + return self._uid.email if isinstance(self._uid, UserID) else "" + + @property + def image(self): + """ + If this is a User Attribute, this will be the stored image. If this is not a User Attribute, this will be ``None``. + """ + return self._uid.image.image if isinstance(self._uid, UserAttribute) else None + + @property + def is_primary(self): + """ + If the most recent, valid self-signature specifies this as being primary, this will be True. Otherwise, Faqlse. + """ + return bool(next(iter(self.selfsig._signature.subpackets['h_PrimaryUserID']), False)) + + @property + def is_uid(self): + """ + ``True`` if this is a User ID, otherwise False. + """ + return isinstance(self._uid, UserID) + + @property + def is_ua(self): + """ + ``True`` if this is a User Attribute, otherwise False. + """ + return isinstance(self._uid, UserAttribute) + + @property + def selfsig(self): + """ + This will be the most recent, self-signature of this User ID or Attribute. If there isn't one, this will be ``None``. + """ + if self.parent is not None: + return next((sig for sig in reversed(self._signatures) if sig.signer == self.parent.fingerprint.keyid), None) + + @property + def signers(self): + """ + This will be a set of all of the key ids which have signed this User ID or Attribute. + """ + return set(s.signer for s in self.__sig__) + + @property + def hashdata(self): + if self.is_uid: + return self._uid.__bytearray__()[len(self._uid.header):] + + if self.is_ua: + return self._uid.subpackets.__bytearray__() + + @classmethod + def new(cls, pn, comment="", email=""): + """ + Create a new User ID or photo. + + :param pn: User ID name, or photo. If this is a ``bytearray``, it will be loaded as a photo. + Otherwise, it will be used as the name field for a User ID. + :type pn: ``bytearray``, ``str``, ``unicode`` + :param comment: The comment field for a User ID. Ignored if this is a photo. + :type comment: ``str``, ``unicode`` + :param email: The email address field for a User ID. Ignored if this is a photo. + :type email: ``str``, ``unicode`` + :returns: :py:obj:`PGPUID` + """ + uid = PGPUID() + if isinstance(pn, bytearray): + uid._uid = UserAttribute() + uid._uid.image.image = pn + uid._uid.image.iencoding = ImageEncoding.encodingof(pn) + uid._uid.update_hlen() + + else: + uid._uid = UserID() + uid._uid.name = pn + uid._uid.comment = comment + uid._uid.email = email + uid._uid.update_hlen() + + return uid + + def __init__(self): + """ + PGPUID objects represent User IDs and User Attributes for keys. + + PGPUID implements the ``__format__`` method for User IDs, returning a string in the format + 'name (comment) <email>', leaving out any comment or email fields that are not present. + """ + super(PGPUID, self).__init__() + self._uid = None + self._signatures = SorteDeque() + + def __repr__(self): + if self.selfsig is not None: + return "<PGPUID [{:s}][{}] at 0x{:02X}>".format(self._uid.__class__.__name__, self.selfsig.created, id(self)) + return "<PGPUID [{:s}] at 0x{:02X}>".format(self._uid.__class__.__name__, id(self)) + + def __lt__(self, other): # pragma: no cover + if self.is_uid == other.is_uid: + if self.is_primary == other.is_primary: + return self.selfsig > other.selfsig + + if self.is_primary: + return True + + return False + + if self.is_uid and other.is_ua: + return True + + if self.is_ua and other.is_uid: + return False + + def __or__(self, other): + if isinstance(other, PGPSignature): + self._signatures.insort(other) + if self.parent is not None and self in self.parent._uids: + self.parent._uids.resort(self) + + return self + + if isinstance(other, UserID) and self._uid is None: + self._uid = other + return self + + if isinstance(other, UserAttribute) and self._uid is None: + self._uid = other + return self + + raise TypeError("unsupported operand type(s) for |: '{:s}' and '{:s}'" + "".format(self.__class__.__name__, other.__class__.__name__)) + + def __copy__(self): + # because the default shallow copy isn't actually all that useful, + # and deepcopy does too much work + uid = PGPUID() + uid |= copy.copy(self._uid) + for sig in self._signatures: + uid |= copy.copy(sig) + return uid + + def __format__(self, format_spec): + if self.is_uid: + comment = six.u("") if self.comment == "" else six.u(" ({:s})").format(self.comment) + email = six.u("") if self.email == "" else six.u(" <{:s}>").format(self.email) + return six.u("{:s}{:s}{:s}").format(self.name, comment, email) + + raise NotImplementedError + + +class PGPMessage(Armorable, PGPObject): + @staticmethod + def dash_unescape(text): + return re.subn(r'^- -', '-', text, flags=re.MULTILINE)[0] + + @staticmethod + def dash_escape(text): + return re.subn(r'^-', '- -', text, flags=re.MULTILINE)[0] + + @property + def encrypters(self): + """A ``set`` containing all key ids (if any) to which this message was encrypted.""" + return set(m.encrypter for m in self._sessionkeys if isinstance(m, PKESessionKey)) + + @property + def filename(self): + """If applicable, returns the original filename of the message. Otherwise, returns an empty string.""" + if self.type == 'literal': + return self._message.filename + return '' + + @property + def is_compressed(self): + """``True`` if this message will be compressed when exported""" + return self._compression != CompressionAlgorithm.Uncompressed + + @property + def is_encrypted(self): + """``True`` if this message is encrypted; otherwise, ``False``""" + return isinstance(self._message, (SKEData, IntegrityProtectedSKEData)) + + @property + def is_sensitive(self): + """``True`` if this message is marked sensitive; otherwise ``False``""" + return self.type == 'literal' and self._message.filename == '_CONSOLE' + + @property + def is_signed(self): + """ + ``True`` if this message is signed; otherwise, ``False``. + Should always be ``False`` if the message is encrypted. + """ + return len(self._signatures) > 0 + + @property + def issuers(self): + """A ``set`` containing all key ids (if any) which have signed or encrypted this message.""" + return self.encrypters | self.signers + + @property + def magic(self): + if self.type == 'cleartext': + return "SIGNATURE" + return "MESSAGE" + + @property + def message(self): + """The message contents""" + if self.type == 'cleartext': + return self.bytes_to_text(self._message) + + if self.type == 'literal': + return self._message.contents + + if self.type == 'encrypted': + return self._message + + @property + def signatures(self): + """A ``set`` containing all key ids (if any) which have signed this message.""" + return list(self._signatures) + + @property + def signers(self): + """A ``set`` containing all key ids (if any) which have signed this message.""" + return set(m.signer for m in self._signatures) + + @property + def type(self): + ##TODO: it might be better to use an Enum for the output of this + if isinstance(self._message, (six.string_types, six.binary_type, bytearray)): + return 'cleartext' + + if isinstance(self._message, LiteralData): + return 'literal' + + if isinstance(self._message, (SKEData, IntegrityProtectedSKEData)): + return 'encrypted' + + raise NotImplementedError + + def __init__(self): + """ + PGPMessage objects represent OpenPGP message compositions. + + PGPMessage implements the `__str__` method, the output of which will be the message composition in + OpenPGP-compliant ASCII-armored format. + + PGPMessage implements the `__bytes__` method, the output of which will be the message composition in + OpenPGP-compliant binary format. + + Any signatures within the PGPMessage that are marked as being non-exportable will not be included in the output + of either of those methods. + """ + super(PGPMessage, self).__init__() + self._compression = CompressionAlgorithm.Uncompressed + self._message = None + self._mdc = None + self._signatures = SorteDeque() + self._sessionkeys = [] + + def __bytearray__(self): + if self.is_compressed: + comp = CompressedData() + comp.calg = self._compression + comp.packets = [pkt for pkt in self] + comp.update_hlen() + return comp.__bytearray__() + + _bytes = bytearray() + for pkt in self: + _bytes += pkt.__bytearray__() + return _bytes + + def __str__(self): + if self.type == 'cleartext': + tmpl = u"-----BEGIN PGP SIGNED MESSAGE-----\n" \ + u"{hhdr:s}\n" \ + u"{cleartext:s}\n" \ + u"{signature:s}" + + # only add a Hash: header if we actually have at least one signature + hashes = set(s.hash_algorithm.name for s in self.signatures) + hhdr = 'Hash: {hashes:s}\n'.format(hashes=','.join(sorted(hashes))) if hashes else '' + + return tmpl.format(hhdr=hhdr, + cleartext=self.dash_escape(self.bytes_to_text(self._message)), + signature=super(PGPMessage, self).__str__()) + + return super(PGPMessage, self).__str__() + + def __iter__(self): + if self.type == 'cleartext': + for sig in self._signatures: + yield sig + + elif self.is_encrypted: + for pkt in self._sessionkeys: + yield pkt + yield self.message + + else: + ##TODO: is it worth coming up with a way of disabling one-pass signing? + for sig in self._signatures: + ops = sig.make_onepass() + if sig is not self._signatures[-1]: + ops.nested = True + yield ops + + yield self._message + if self._mdc is not None: # pragma: no cover + yield self._mdc + + for sig in self._signatures: + yield sig + + def __or__(self, other): + if isinstance(other, Marker): + return self + + if isinstance(other, CompressedData): + self._compression = other.calg + for pkt in other.packets: + self |= pkt + return self + + if isinstance(other, (six.string_types, six.binary_type, bytearray)): + if self._message is None: + self._message = self.text_to_bytes(other) + return self + + if isinstance(other, (LiteralData, SKEData, IntegrityProtectedSKEData)): + if self._message is None: + self._message = other + return self + + if isinstance(other, MDC): + if self._mdc is None: + self._mdc = other + return self + + if isinstance(other, OnePassSignature): + # these are "generated" on the fly during composition + return self + + if isinstance(other, Signature): + other = PGPSignature() | other + + if isinstance(other, PGPSignature): + self._signatures.insort(other) + return self + + if isinstance(other, (PKESessionKey, SKESessionKey)): + self._sessionkeys.append(other) + return self + + if isinstance(other, PGPMessage): + self._message = other._message + self._mdc = other._mdc + self._compression = other._compression + self._sessionkeys += other._sessionkeys + self._signatures += other._signatures + return self + + raise NotImplementedError(str(type(other))) + + def __copy__(self): + msg = super(PGPMessage, self).__copy__() + msg._compression = self._compression + msg._message = copy.copy(self._message) + msg._mdc = copy.copy(self._mdc) + + for sig in self._signatures: + msg |= copy.copy(sig) + + for sk in self._sessionkeys: + msg |= copy.copy(sk) + + return msg + + @classmethod + def new(cls, message, **kwargs): + """ + Create a new PGPMessage object. + + :param message: The message to be stored. + :type message: ``str``, ``unicode``, ``bytes``, ``bytearray`` + :returns: :py:obj:`PGPMessage` + + The following optional keyword arguments can be used with :py:meth:`PGPMessage.new`: + + :keyword file: if True, ``message`` should be a path to a file. The contents of that file will be read and used + as the contents of the message. + :type file: ``bool`` + :keyword cleartext: if True, the message will be cleartext with inline signatures. + :type cleartext: ``bool`` + :keyword sensitive: if True, the filename will be set to '_CONSOLE' to signal other OpenPGP clients to treat + this message as being 'for your eyes only'. Ignored if cleartext is True. + :type sensitive: ``bool`` + :keyword format: Set the message format identifier. Ignored if cleartext is True. + :type format: ``str`` + :keyword compression: Set the compression algorithm for the new message. + Defaults to :py:obj:`CompressionAlgorithm.ZIP`. Ignored if cleartext is True. + :keyword encoding: Set the Charset header for the message. + :type encoding: ``str`` representing a valid codec in codecs + """ + # TODO: have 'codecs' above (in :type encoding:) link to python documentation page on codecs + cleartext = kwargs.pop('cleartext', False) + format = kwargs.pop('format', None) + sensitive = kwargs.pop('sensitive', False) + compression = kwargs.pop('compression', CompressionAlgorithm.ZIP) + file = kwargs.pop('file', False) + charset = kwargs.pop('encoding', None) + + filename = '' + mtime = datetime.utcnow() + + msg = PGPMessage() + + if charset: + msg.charset = charset + + # if format in 'tu' and isinstance(message, (six.binary_type, bytearray)): + # # if message format is text or unicode and we got binary data, we'll need to transcode it to UTF-8 + # message = + + if file and os.path.isfile(message): + filename = message + message = bytearray(os.path.getsize(filename)) + mtime = datetime.utcfromtimestamp(os.path.getmtime(filename)) + + with open(filename, 'rb') as mf: + mf.readinto(message) + + # if format is None, we can try to detect it + if format is None: + if isinstance(message, six.text_type): + # message is definitely UTF-8 already + format = 'u' + + elif cls.is_ascii(message): + # message is probably text + format = 't' + + else: + # message is probably binary + format = 'b' + + # if message is a binary type and we're building a textual message, we need to transcode the bytes to UTF-8 + if isinstance(message, (six.binary_type, bytearray)) and (cleartext or format in 'tu'): + message = message.decode(charset or 'utf-8') + + if cleartext: + msg |= message + + else: + # load literal data + lit = LiteralData() + lit._contents = bytearray(msg.text_to_bytes(message)) + lit.filename = '_CONSOLE' if sensitive else os.path.basename(filename) + lit.mtime = mtime + lit.format = format + + # if cls.is_ascii(message): + # lit.format = 't' + + lit.update_hlen() + + msg |= lit + msg._compression = compression + + return msg + + def encrypt(self, passphrase, sessionkey=None, **prefs): + """ + Encrypt the contents of this message using a passphrase. + :param passphrase: The passphrase to use for encrypting this message. + :type passphrase: ``str``, ``unicode``, ``bytes`` + + :optional param sessionkey: Provide a session key to use when encrypting something. Default is ``None``. + If ``None``, a session key of the appropriate length will be generated randomly. + + .. warning:: + + Care should be taken when making use of this option! Session keys *absolutely need* + to be unpredictable! Use the ``gen_key()`` method on the desired + :py:obj:`~constants.SymmetricKeyAlgorithm` to generate the session key! + + :type sessionkey: ``bytes``, ``str`` + :raises: :py:exc:`~errors.PGPEncryptionError` + :returns: A new :py:obj:`PGPMessage` containing the encrypted contents of this message. + """ + cipher_algo = prefs.pop('cipher', SymmetricKeyAlgorithm.AES256) + hash_algo = prefs.pop('hash', HashAlgorithm.SHA256) + + # set up a new SKESessionKeyV4 + skesk = SKESessionKeyV4() + skesk.s2k.usage = 255 + skesk.s2k.specifier = 3 + skesk.s2k.halg = hash_algo + skesk.s2k.encalg = cipher_algo + skesk.s2k.count = skesk.s2k.halg.tuned_count + + if sessionkey is None: + sessionkey = cipher_algo.gen_key() + skesk.encrypt_sk(passphrase, sessionkey) + del passphrase + + msg = PGPMessage() | skesk + + if not self.is_encrypted: + skedata = IntegrityProtectedSKEDataV1() + skedata.encrypt(sessionkey, cipher_algo, self.__bytes__()) + msg |= skedata + + else: + msg |= self + + return msg + + def decrypt(self, passphrase): + """ + Attempt to decrypt this message using a passphrase. + + :param passphrase: The passphrase to use to attempt to decrypt this message. + :type passphrase: ``str``, ``unicode``, ``bytes`` + :raises: :py:exc:`~errors.PGPDecryptionError` if decryption failed for any reason. + :returns: A new :py:obj:`PGPMessage` containing the decrypted contents of this message + """ + if not self.is_encrypted: + raise PGPError("This message is not encrypted!") + + for skesk in iter(sk for sk in self._sessionkeys if isinstance(sk, SKESessionKey)): + try: + symalg, key = skesk.decrypt_sk(passphrase) + decmsg = PGPMessage() + decmsg.parse(self.message.decrypt(key, symalg)) + + except (TypeError, ValueError, NotImplementedError, PGPDecryptionError): + continue + + else: + del passphrase + break + + else: + raise PGPDecryptionError("Decryption failed") + + return decmsg + + def parse(self, packet): + unarmored = self.ascii_unarmor(packet) + data = unarmored['body'] + + if unarmored['magic'] is not None and unarmored['magic'] not in ['MESSAGE', 'SIGNATURE']: + raise ValueError('Expected: MESSAGE. Got: {}'.format(str(unarmored['magic']))) + + if unarmored['headers'] is not None: + self.ascii_headers = unarmored['headers'] + + # cleartext signature + if unarmored['magic'] == 'SIGNATURE': + # the composition for this will be the 'cleartext' as a str, + # followed by one or more signatures (each one loaded into a PGPSignature) + self |= self.dash_unescape(unarmored['cleartext']) + while len(data) > 0: + pkt = Packet(data) + if not isinstance(pkt, Signature): # pragma: no cover + warnings.warn("Discarded unexpected packet: {:s}".format(pkt.__class__.__name__), stacklevel=2) + continue + self |= PGPSignature() | pkt + + else: + while len(data) > 0: + self |= Packet(data) + + +class PGPKey(Armorable, ParentRef, PGPObject): + """ + 11.1. Transferable Public Keys + + OpenPGP users may transfer public keys. The essential elements of a + transferable public key are as follows: + + - One Public-Key packet + + - Zero or more revocation signatures + - One or more User ID packets + + - After each User ID packet, zero or more Signature packets + (certifications) + + - Zero or more User Attribute packets + + - After each User Attribute packet, zero or more Signature packets + (certifications) + + - Zero or more Subkey packets + + - After each Subkey packet, one Signature packet, plus optionally a + revocation + + The Public-Key packet occurs first. Each of the following User ID + packets provides the identity of the owner of this public key. If + there are multiple User ID packets, this corresponds to multiple + means of identifying the same unique individual user; for example, a + user may have more than one email address, and construct a User ID + for each one. + + Immediately following each User ID packet, there are zero or more + Signature packets. Each Signature packet is calculated on the + immediately preceding User ID packet and the initial Public-Key + packet. The signature serves to certify the corresponding public key + and User ID. In effect, the signer is testifying to his or her + belief that this public key belongs to the user identified by this + User ID. + + Within the same section as the User ID packets, there are zero or + more User Attribute packets. Like the User ID packets, a User + Attribute packet is followed by zero or more Signature packets + calculated on the immediately preceding User Attribute packet and the + initial Public-Key packet. + + User Attribute packets and User ID packets may be freely intermixed + in this section, so long as the signatures that follow them are + maintained on the proper User Attribute or User ID packet. + + After the User ID packet or Attribute packet, there may be zero or + more Subkey packets. In general, subkeys are provided in cases where + the top-level public key is a signature-only key. However, any V4 + key may have subkeys, and the subkeys may be encryption-only keys, + signature-only keys, or general-purpose keys. V3 keys MUST NOT have + subkeys. + + Each Subkey packet MUST be followed by one Signature packet, which + should be a subkey binding signature issued by the top-level key. + For subkeys that can issue signatures, the subkey binding signature + MUST contain an Embedded Signature subpacket with a primary key + binding signature (0x19) issued by the subkey on the top-level key. + + Subkey and Key packets may each be followed by a revocation Signature + packet to indicate that the key is revoked. Revocation signatures + are only accepted if they are issued by the key itself, or by a key + that is authorized to issue revocations via a Revocation Key + subpacket in a self-signature by the top-level key. + + Transferable public-key packet sequences may be concatenated to allow + transferring multiple public keys in one operation. + + 11.2. Transferable Secret Keys + + OpenPGP users may transfer secret keys. The format of a transferable + secret key is the same as a transferable public key except that + secret-key and secret-subkey packets are used instead of the public + key and public-subkey packets. Implementations SHOULD include self- + signatures on any user IDs and subkeys, as this allows for a complete + public key to be automatically extracted from the transferable secret + key. Implementations MAY choose to omit the self-signatures, + especially if a transferable public key accompanies the transferable + secret key. + """ + @property + def __key__(self): + return self._key.keymaterial + + @property + def __sig__(self): + return list(self._signatures) + + @property + def created(self): + """A :py:obj:`~datetime.datetime` object of the creation date and time of the key, in UTC.""" + return self._key.created + + @property + def expires_at(self): + """A :py:obj:`~datetime.datetime` object of when this key is to be considered expired, if any. Otherwise, ``None``""" + try: + expires = min(sig.key_expiration for sig in itertools.chain(iter(uid.selfsig for uid in self.userids), self.self_signatures) + if sig.key_expiration is not None) + + except ValueError: + return None + + else: + return (self.created + expires) + + @property + def fingerprint(self): + """The fingerprint of this key, as a :py:obj:`~pgpy.types.Fingerprint` object.""" + if self._key: + return self._key.fingerprint + + @property + def hashdata(self): + # when signing a key, only the public portion of the keys is hashed + # if this is a private key, the private components of the key material need to be left out + if self.is_public: + return self._key.__bytearray__()[len(self._key.header):] + + pub = self._key.pubkey() + return pub.__bytearray__()[len(pub.header):] + + @property + def is_expired(self): + """``True`` if this key is expired, otherwise ``False``""" + expires = self.expires_at + if expires is not None: + return expires <= datetime.utcnow() + + return False + + @property + def is_primary(self): + """``True`` if this is a primary key; ``False`` if this is a subkey""" + return isinstance(self._key, Primary) and not isinstance(self._key, Sub) + + @property + def is_protected(self): + """``True`` if this is a private key that is protected with a passphrase, otherwise ``False``""" + if self.is_public: + return False + + return self._key.protected + + @property + def is_public(self): + """``True`` if this is a public key, otherwise ``False``""" + return isinstance(self._key, Public) and not isinstance(self._key, Private) + + @property + def is_unlocked(self): + """``False`` if this is a private key that is protected with a passphrase and has not yet been unlocked, otherwise ``True``""" + if self.is_public: + return True + + if not self.is_protected: + return True + + return self._key.unlocked + + @property + def key_algorithm(self): + """The :py:obj:`constants.PubKeyAlgorithm` pertaining to this key""" + return self._key.pkalg + + @property + def key_size(self): + """*new in 0.4.1* + The size pertaining to this key. ``int`` for non-EC key algorithms; :py:obj:`constants.EllipticCurveOID` for EC keys. + """ + if self.key_algorithm in {PubKeyAlgorithm.ECDSA, PubKeyAlgorithm.ECDH}: + return self._key.keymaterial.oid + return next(iter(self._key.keymaterial)).bit_length() + + @property + def magic(self): + return '{:s} KEY BLOCK'.format('PUBLIC' if (isinstance(self._key, Public) and not isinstance(self._key, Private)) else + 'PRIVATE' if isinstance(self._key, Private) else '') + + @property + def pubkey(self): + """If the :py:obj:`PGPKey` object is a private key, this method returns a corresponding public key object with + all the trimmings. Otherwise, returns ``None`` + """ + if not self.is_public: + if self._sibling is None or isinstance(self._sibling, weakref.ref): + # create a new key shell + pub = PGPKey() + pub.ascii_headers = self.ascii_headers.copy() + + # get the public half of the primary key + pub._key = self._key.pubkey() + + # get the public half of each subkey + for skid, subkey in self.subkeys.items(): + pub.subkeys[skid] = subkey.pubkey + + # copy user ids and user attributes + for uid in self._uids: + pub |= copy.copy(uid) + + # copy signatures that weren't copied with uids + for sig in self._signatures: + if sig.parent is None: + pub |= copy.copy(sig) + + # keep connect the two halves using a weak reference + self._sibling = weakref.ref(pub) + pub._sibling = weakref.ref(self) + + return self._sibling() + return None + + @pubkey.setter + def pubkey(self, pubkey): + if self.is_public: + raise TypeError("cannot add public sibling to pubkey") + + if not pubkey.is_public: + raise TypeError("sibling must be public") + + if self._sibling is not None and self._sibling() is not None: + raise ValueError("public key reference already set") + + if pubkey.fingerprint != self.fingerprint: + raise ValueError("key fingerprint mismatch") + + # TODO: sync packets with sibling + self._sibling = weakref.ref(pubkey) + pubkey._sibling = weakref.ref(self) + + @property + def self_signatures(self): + keyid, keytype = (self.fingerprint.keyid, SignatureType.DirectlyOnKey) if self.is_primary \ + else (self.parent.fingerprint.keyid, SignatureType.Subkey_Binding) + + ##TODO: filter out revoked signatures as well + for sig in iter(sig for sig in self._signatures + if all([sig.type == keytype, sig.signer == keyid, not sig.is_expired])): + yield sig + + @property + def signers(self): + """A ``set`` of key ids of keys that were used to sign this key""" + return {sig.signer for sig in self.__sig__} + + @property + def subkeys(self): + """An :py:obj:`~collections.OrderedDict` of subkeys bound to this primary key, if applicable, + selected by 16-character keyid.""" + return self._children + + @property + def userids(self): + """A ``list`` of :py:obj:`PGPUID` objects containing User ID information about this key""" + return [ u for u in self._uids if u.is_uid ] + + @property + def userattributes(self): + """A ``list`` of :py:obj:`PGPUID` objects containing one or more images associated with this key""" + return [u for u in self._uids if u.is_ua] + + @classmethod + def new(cls, key_algorithm, key_size): + """ + Generate a new PGP key + + :param key_algorithm: Key algorithm to use. + :type key_algorithm: A :py:obj:`~constants.PubKeyAlgorithm` + :param key_size: Key size in bits, unless `key_algorithm` is :py:obj:`~constants.PubKeyAlgorithm.ECDSA` or + :py:obj:`~constants.PubKeyAlgorithm.ECDH`, in which case it should be the Curve OID to use. + :type key_size: ``int`` or :py:obj:`~constants.EllipticCurveOID` + :return: A newly generated :py:obj:`PGPKey` + """ + # new private key shell first + key = PGPKey() + + if key_algorithm in {PubKeyAlgorithm.RSAEncrypt, PubKeyAlgorithm.RSASign}: # pragma: no cover + warnings.warn('{:s} is deprecated - generating key using RSAEncryptOrSign'.format(key_algorithm.name)) + key_algorithm = PubKeyAlgorithm.RSAEncryptOrSign + + # generate some key data to match key_algorithm and key_size + key._key = PrivKeyV4.new(key_algorithm, key_size) + + return key + + def __init__(self): + """ + PGPKey objects represent OpenPGP compliant keys along with all of their associated data. + + PGPKey implements the `__str__` method, the output of which will be the key composition in + OpenPGP-compliant ASCII-armored format. + + PGPKey implements the `__bytes__` method, the output of which will be the key composition in + OpenPGP-compliant binary format. + + Any signatures within the PGPKey that are marked as being non-exportable will not be included in the output + of either of those methods. + """ + super(PGPKey, self).__init__() + self._key = None + self._children = collections.OrderedDict() + self._signatures = SorteDeque() + self._uids = SorteDeque() + self._sibling = None + + def __bytearray__(self): + _bytes = bytearray() + # us + _bytes += self._key.__bytearray__() + # our signatures; ignore embedded signatures + for sig in iter(s for s in self._signatures if not s.embedded and s.exportable): + _bytes += sig.__bytearray__() + # one or more User IDs, followed by their signatures + for uid in self._uids: + _bytes += uid._uid.__bytearray__() + for s in [s for s in uid._signatures if s.exportable]: + _bytes += s.__bytearray__() + # subkeys + for sk in self._children.values(): + _bytes += sk.__bytearray__() + + return _bytes + + def __repr__(self): + if self._key is not None: + return "<PGPKey [{:s}][0x{:s}] at 0x{:02X}>" \ + "".format(self._key.__class__.__name__, self.fingerprint.keyid, id(self)) + + return "<PGPKey [unknown] at 0x{:02X}>" \ + "".format(id(self)) + + def __contains__(self, item): + if isinstance(item, PGPKey): # pragma: no cover + return item.fingerprint.keyid in self.subkeys + + if isinstance(item, Fingerprint): # pragma: no cover + return item.keyid in self.subkeys + + if isinstance(item, PGPUID): + return item in self._uids + + if isinstance(item, PGPSignature): + return item in self._signatures + + raise TypeError + + def __or__(self, other, from_sib=False): + if isinstance(other, Key) and self._key is None: + self._key = other + + elif isinstance(other, PGPKey) and not other.is_primary and other.is_public == self.is_public: + other._parent = self + self._children[other.fingerprint.keyid] = other + + elif isinstance(other, PGPSignature): + self._signatures.insort(other) + + # if this is a subkey binding signature that has embedded primary key binding signatures, add them to parent + if other.type == SignatureType.Subkey_Binding: + for es in iter(pkb for pkb in other._signature.subpackets['EmbeddedSignature']): + esig = PGPSignature() | es + esig._parent = other + self._signatures.insort(esig) + + elif isinstance(other, PGPUID): + other._parent = weakref.ref(self) + self._uids.insort(other) + + else: + raise TypeError("unsupported operand type(s) for |: '{:s}' and '{:s}'" + "".format(self.__class__.__name__, other.__class__.__name__)) + + if isinstance(self._sibling, weakref.ref) and not from_sib: + sib = self._sibling() + if sib is None: + self._sibling = None + + else: # pragma: no cover + sib.__or__(copy.copy(other), True) + + return self + + def __copy__(self): + key = super(PGPKey, self).__copy__() + key._key = copy.copy(self._key) + + for uid in self._uids: + key |= copy.copy(uid) + + for id, subkey in self._children.items(): + key |= copy.copy(subkey) + + for sig in self._signatures: + if sig.embedded: + # embedded signatures don't need to be explicitly copied + continue + + print(len(key._signatures)) + key |= copy.copy(sig) + + return key + + def protect(self, passphrase, enc_alg, hash_alg): + """ + Add a passphrase to a private key. If the key is already passphrase protected, it should be unlocked before + a new passphrase can be specified. + + Has no effect on public keys. + + :param passphrase: A passphrase to protect the key with + :type passphrase: ``str``, ``unicode`` + :param enc_alg: Symmetric encryption algorithm to use to protect the key + :type enc_alg: :py:obj:`~constants.SymmetricKeyAlgorithm` + :param hash_alg: Hash algorithm to use in the String-to-Key specifier + :type hash_alg: :py:obj:`~constants.HashAlgorithm` + """ + ##TODO: specify strong defaults for enc_alg and hash_alg + if self.is_public: + # we can't protect public keys because only private key material is ever protected + warnings.warn("Public keys cannot be passphrase-protected", stacklevel=2) + return + + if self.is_protected and not self.is_unlocked: + # we can't protect a key that is already protected unless it is unlocked first + warnings.warn("This key is already protected with a passphrase - " + "please unlock it before attempting to specify a new passphrase", stacklevel=2) + return + + for sk in itertools.chain([self], self.subkeys.values()): + sk._key.protect(passphrase, enc_alg, hash_alg) + + del passphrase + + @contextlib.contextmanager + def unlock(self, passphrase): + """ + Context manager method for unlocking passphrase-protected private keys. Has no effect if the key is not both + private and passphrase-protected. + + When the context managed block is exited, the unprotected private key material is removed. + + Example:: + + privkey = PGPKey() + privkey.parse(keytext) + + assert privkey.is_protected + assert privkey.is_unlocked is False + # privkey.sign("some text") <- this would raise an exception + + with privkey.unlock("TheCorrectPassphrase"): + # privkey is now unlocked + assert privkey.is_unlocked + # so you can do things with it + sig = privkey.sign("some text") + + # privkey is no longer unlocked + assert privkey.is_unlocked is False + + Emits a :py:obj:`~warnings.UserWarning` if the key is public or not passphrase protected. + + :param str passphrase: The passphrase to be used to unlock this key. + :raises: :py:exc:`~pgpy.errors.PGPDecryptionError` if the passphrase is incorrect + """ + if self.is_public: + # we can't unprotect public keys because only private key material is ever protected + warnings.warn("Public keys cannot be passphrase-protected", stacklevel=3) + yield self + return + + if not self.is_protected: + # we can't unprotect private keys that are not protected, because there is no ciphertext to decrypt + warnings.warn("This key is not protected with a passphrase", stacklevel=3) + yield self + return + + try: + for sk in itertools.chain([self], self.subkeys.values()): + sk._key.unprotect(passphrase) + del passphrase + yield self + + finally: + # clean up here by deleting the previously decrypted secret key material + for sk in itertools.chain([self], self.subkeys.values()): + sk._key.keymaterial.clear() + + def add_uid(self, uid, selfsign=True, **prefs): + """ + Add a User ID to this key. + + :param uid: The user id to add + :type uid: :py:obj:`~pgpy.PGPUID` + :param selfsign: Whether or not to self-sign the user id before adding it + :type selfsign: ``bool`` + + Valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPKey.certify`. + Any such keyword arguments are ignored if selfsign is ``False`` + """ + uid._parent = self + if selfsign: + uid |= self.certify(uid, SignatureType.Positive_Cert, **prefs) + + self |= uid + + def get_uid(self, search): + """ + Find and return a User ID that matches the search string given. + + :param search: A text string to match name, comment, or email address against + :type search: ``str``, ``unicode`` + :return: The first matching :py:obj:`~pgpy.PGPUID`, or ``None`` if no matches were found. + """ + if self.is_primary: + return next((u for u in self._uids if search in filter(lambda a: a is not None, (u.name, u.comment, u.email))), None) + return self.parent.get_uid(search) + + def del_uid(self, search): + """ + Find and remove a user id that matches the search string given. This method does not modify the corresponding + :py:obj:`~pgpy.PGPUID` object; it only removes it from the list of user ids on the key. + + :param search: A text string to match name, comment, or email address against + :type search: ``str``, ``unicode`` + """ + u = self.get_uid(search) + + if u is None: + raise KeyError("uid '{:s}' not found".format(search)) + + u._parent = None + self._uids.remove(u) + + def add_subkey(self, key, **prefs): + """ + Add a key as a subkey to this key. + :param key: A private :py:obj:`~pgpy.PGPKey` that does not have any subkeys of its own + + :keyword usage: A ``set`` of key usage flags, as :py:obj:`~constants.KeyFlags` for the subkey to be added. + :type usage: ``set`` + + Other valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPKey.certify` + """ + if self.is_public: + raise PGPError("Cannot add a subkey to a public key. Add the subkey to the private component first!") + + if key.is_public: + raise PGPError("Cannot add a public key as a subkey to this key") + + if key.is_primary: + if len(key._children) > 0: + raise PGPError("Cannot add a key that already has subkeys as a subkey!") + + # convert key into a subkey + npk = PrivSubKeyV4() + npk.pkalg = key._key.pkalg + npk.created = key._key.created + npk.keymaterial = key._key.keymaterial + key._key = npk + key._key.update_hlen() + + self._children[key.fingerprint.keyid] = key + key._parent = self + + ##TODO: skip this step if the key already has a subkey binding signature + bsig = self.bind(key, **prefs) + key |= bsig + + def _get_key_flags(self, user=None): + if self.is_primary: + if user is not None: + user = self.get_uid(user) + + elif len(self._uids) == 0: + return {KeyFlags.Certify} + + else: + user = next(iter(self.userids)) + + # RFC 4880 says that primary keys *must* be capable of certification + return {KeyFlags.Certify} | user.selfsig.key_flags + + return next(self.self_signatures).key_flags + + def _sign(self, subject, sig, **prefs): + """ + The actual signing magic happens here. + :param subject: The subject to sign + :param sig: The :py:obj:`PGPSignature` object the new signature is to be encapsulated within + :returns: ``sig``, after the signature is added to it. + """ + user = prefs.pop('user', None) + uid = None + if user is not None: + uid = self.get_uid(user) + + else: + uid = next(iter(self.userids), None) + if uid is None and self.parent is not None: + uid = next(iter(self.parent.userids), None) + + if sig.hash_algorithm is None: + sig._signature.halg = uid.selfsig.hashprefs[0] + + if uid is not None and sig.hash_algorithm not in uid.selfsig.hashprefs: + warnings.warn("Selected hash algorithm not in key preferences", stacklevel=4) + + # signature options that can be applied at any level + expires = prefs.pop('expires', None) + notation = prefs.pop('notation', None) + revocable = prefs.pop('revocable', True) + policy_uri = prefs.pop('policy_uri', None) + + if expires is not None: + # expires should be a timedelta, so if it's a datetime, turn it into a timedelta + if isinstance(expires, datetime): + expires = expires - self.created + + sig._signature.subpackets.addnew('SignatureExpirationTime', hashed=True, expires=expires) + + if revocable is False: + sig._signature.subpackets.addnew('Revocable', hashed=True, bflag=revocable) + + if notation is not None: + for name, value in notation.items(): + # mark all notations as human readable unless value is a bytearray + flags = NotationDataFlags.HumanReadable + if isinstance(value, bytearray): + flags = 0x00 + + sig._signature.subpackets.addnew('NotationData', hashed=True, flags=flags, name=name, value=value) + + if policy_uri is not None: + sig._signature.subpackets.addnew('Policy', hashed=True, uri=policy_uri) + + if user is not None and uid is not None: + signers_uid = "{:s}".format(uid) + sig._signature.subpackets.addnew('SignersUserID', hashed=True, userid=signers_uid) + + # handle an edge case for timestamp signatures vs standalone signatures + if sig.type == SignatureType.Timestamp and len(sig._signature.subpackets._hashed_sp) > 1: + sig._signature.sigtype = SignatureType.Standalone + + sigdata = sig.hashdata(subject) + h2 = sig.hash_algorithm.hasher + h2.update(sigdata) + sig._signature.hash2 = bytearray(h2.digest()[:2]) + + _sig = self._key.sign(sigdata, getattr(hashes, sig.hash_algorithm.name)()) + if _sig is NotImplemented: + raise NotImplementedError(self.key_algorithm) + + sig._signature.signature.from_signer(_sig) + sig._signature.update_hlen() + + return sig + + @KeyAction(KeyFlags.Sign, is_unlocked=True, is_public=False) + def sign(self, subject, **prefs): + """ + Sign text, a message, or a timestamp using this key. + + :param subject: The text to be signed + :type subject: ``str``, :py:obj:`~pgpy.PGPMessage`, ``None`` + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public + :returns: :py:obj:`PGPSignature` + + The following optional keyword arguments can be used with :py:meth:`PGPKey.sign`, as well as + :py:meth:`PGPKey.certify`, :py:meth:`PGPKey.revoke`, and :py:meth:`PGPKey.bind`: + + :keyword expires: Set an expiration date for this signature + :type expires: :py:obj:`~datetime.datetime`, :py:obj:`~datetime.timedelta` + :keyword notation: Add arbitrary notation data to this signature. + :type notation: ``dict`` + :keyword policy_uri: Add a URI to the signature that should describe the policy under which the signature + was issued. + :type policy_uri: ``str`` + :keyword revocable: If ``False``, this signature will be marked non-revocable + :type revocable: ``bool`` + :keyword user: Specify which User ID to use when creating this signature. Also adds a "Signer's User ID" + to the signature. + :type user: ``str`` + """ + sig_type = SignatureType.BinaryDocument + hash_algo = prefs.pop('hash', None) + + if subject is None: + sig_type = SignatureType.Timestamp + + if isinstance(subject, PGPMessage): + if subject.type == 'cleartext': + sig_type = SignatureType.CanonicalDocument + + subject = subject.message + + sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid) + + return self._sign(subject, sig, **prefs) + + @KeyAction(KeyFlags.Certify, is_unlocked=True, is_public=False) + def certify(self, subject, level=SignatureType.Generic_Cert, **prefs): + """ + Sign a key or a user id within a key. + + :param subject: The user id or key to be certified. + :type subject: :py:obj:`PGPKey`, :py:obj:`PGPUID` + :param level: :py:obj:`~constants.SignatureType.Generic_Cert`, :py:obj:`~constants.SignatureType.Persona_Cert`, + :py:obj:`~constants.SignatureType.Casual_Cert`, or :py:obj:`~constants.SignatureType.Positive_Cert`. + Only used if subject is a :py:obj:`PGPUID`; otherwise, it is ignored. + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public + :returns: :py:obj:`PGPSignature` + + In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional + keyword arguments can be used with :py:meth:`PGPKey.certify`. + + These optional keywords only make sense, and thus only have an effect, when self-signing a key or User ID: + + :keyword usage: A ``set`` of key usage flags, as :py:obj:`~constants.KeyFlags`. + This keyword is ignored for non-self-certifications. + :type usage: ``set`` + :keyword ciphers: A list of preferred symmetric ciphers, as :py:obj:`~constants.SymmetricKeyAlgorithm`. + This keyword is ignored for non-self-certifications. + :type ciphers: ``list`` + :keyword hashes: A list of preferred hash algorithms, as :py:obj:`~constants.HashAlgorithm`. + This keyword is ignored for non-self-certifications. + :type hashes: ``list`` + :keyword compression: A list of preferred compression algorithms, as :py:obj:`~constants.CompressionAlgorithm`. + This keyword is ignored for non-self-certifications. + :type compression: ``list`` + :keyword key_expiration: Specify a key expiration date for when this key should expire, or a + :py:obj:`~datetime.timedelta` of how long after the key was created it should expire. + This keyword is ignored for non-self-certifications. + :type key_expiration: :py:obj:`datetime.datetime`, :py:obj:`datetime.timedelta` + :keyword keyserver: Specify the URI of the preferred key server of the user. + This keyword is ignored for non-self-certifications. + :type keyserver: ``str``, ``unicode``, ``bytes`` + :keyword primary: Whether or not to consider the certified User ID as the primary one. + This keyword is ignored for non-self-certifications, and any certifications directly on keys. + :type primary: ``bool`` + + These optional keywords only make sense, and thus only have an effect, when signing another key or User ID: + + :keyword trust: Specify the level and amount of trust to assert when certifying a public key. Should be a tuple + of two ``int`` s, specifying the trust level and trust amount. See + `RFC 4880 Section 5.2.3.13. Trust Signature <http://tools.ietf.org/html/rfc4880#section-5.2.3.13>`_ + for more on what these values mean. + :type trust: ``tuple`` of two ``int`` s + :keyword regex: Specify a regular expression to constrain the specified trust signature in the resulting signature. + Symbolically signifies that the specified trust signature only applies to User IDs which match + this regular expression. + This is meaningless without also specifying trust level and amount. + :type regex: ``str`` + """ + hash_algo = prefs.pop('hash', None) + sig_type = level + if isinstance(subject, PGPKey): + sig_type = SignatureType.DirectlyOnKey + + sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid) + + # signature options that only make sense in certifications + usage = prefs.pop('usage', None) + exportable = prefs.pop('exportable', None) + + if usage is not None: + sig._signature.subpackets.addnew('KeyFlags', hashed=True, flags=usage) + + if exportable is not None: + sig._signature.subpackets.addnew('ExportableCertification', hashed=True, bflag=exportable) + + keyfp = self.fingerprint + if isinstance(subject, PGPKey): + keyfp = subject.fingerprint + if isinstance(subject, PGPUID) and subject._parent is not None: + keyfp = subject._parent.fingerprint + + if keyfp == self.fingerprint: + # signature options that only make sense in self-certifications + cipher_prefs = prefs.pop('ciphers', None) + hash_prefs = prefs.pop('hashes', None) + compression_prefs = prefs.pop('compression', None) + key_expires = prefs.pop('key_expiration', None) + keyserver_flags = prefs.pop('keyserver_flags', None) + keyserver = prefs.pop('keyserver', None) + primary_uid = prefs.pop('primary', None) + + if key_expires is not None: + # key expires should be a timedelta, so if it's a datetime, turn it into a timedelta + if isinstance(key_expires, datetime): + key_expires = key_expires - self.created + + sig._signature.subpackets.addnew('KeyExpirationTime', hashed=True, expires=key_expires) + + if cipher_prefs is not None: + sig._signature.subpackets.addnew('PreferredSymmetricAlgorithms', hashed=True, flags=cipher_prefs) + + if hash_prefs is not None: + sig._signature.subpackets.addnew('PreferredHashAlgorithms', hashed=True, flags=hash_prefs) + if sig.hash_algorithm is None: + sig._signature.halg = hash_prefs[0] + + if compression_prefs is not None: + sig._signature.subpackets.addnew('PreferredCompressionAlgorithms', hashed=True, flags=compression_prefs) + + if keyserver_flags is not None: + sig._signature.subpackets.addnew('KeyServerPreferences', hashed=True, flags=keyserver_flags) + + if keyserver is not None: + sig._signature.subpackets.addnew('PreferredKeyServer', hashed=True, uri=keyserver) + + if primary_uid is not None: + sig._signature.subpackets.addnew('PrimaryUserID', hashed=True, primary=primary_uid) + + # Features is always set on self-signatures + sig._signature.subpackets.addnew('Features', hashed=True, flags=Features.pgpy_features) + + else: + # signature options that only make sense in non-self-certifications + trust = prefs.pop('trust', None) + regex = prefs.pop('regex', None) + + if trust is not None: + sig._signature.subpackets.addnew('TrustSignature', hashed=True, level=trust[0], amount=trust[1]) + + if regex is not None: + sig._signature.subpackets.addnew('RegularExpression', hashed=True, regex=regex) + + return self._sign(subject, sig, **prefs) + + @KeyAction(KeyFlags.Certify, is_unlocked=True, is_public=False) + def revoke(self, target, **prefs): + """ + Revoke a key, a subkey, or all current certification signatures of a User ID that were generated by this key so far. + + :param target: The key to revoke + :type target: :py:obj:`PGPKey`, :py:obj:`PGPUID` + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public + :returns: :py:obj:`PGPSignature` + + In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional + keyword arguments can be used with :py:meth:`PGPKey.revoke`. + + :keyword reason: Defaults to :py:obj:`constants.RevocationReason.NotSpecified` + :type reason: One of :py:obj:`constants.RevocationReason`. + :keyword comment: Defaults to an empty string. + :type comment: ``str`` + """ + hash_algo = prefs.pop('hash', None) + if isinstance(target, PGPUID): + sig_type = SignatureType.CertRevocation + + elif isinstance(target, PGPKey): + ##TODO: check to make sure that the key that is being revoked: + # - is this key + # - is one of this key's subkeys + # - specifies this key as its revocation key + if target.is_primary: + sig_type = SignatureType.KeyRevocation + + else: + sig_type = SignatureType.SubkeyRevocation + + else: # pragma: no cover + raise TypeError + + sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid) + + # signature options that only make sense when revoking + reason = prefs.pop('reason', RevocationReason.NotSpecified) + comment = prefs.pop('comment', "") + sig._signature.subpackets.addnew('ReasonForRevocation', hashed=True, code=reason, string=comment) + + return self._sign(target, sig, **prefs) + + @KeyAction(is_unlocked=True, is_public=False) + def revoker(self, revoker, **prefs): + """ + Generate a signature that specifies another key as being valid for revoking this key. + + :param revoker: The :py:obj:`PGPKey` to specify as a valid revocation key. + :type revoker: :py:obj:`PGPKey` + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public + :returns: :py:obj:`PGPSignature` + + In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional + keyword arguments can be used with :py:meth:`PGPKey.revoker`. + + :keyword sensitive: If ``True``, this sets the sensitive flag on the RevocationKey subpacket. Currently, + this has no other effect. + :type sensitive: ``bool`` + """ + hash_algo = prefs.pop('hash', None) + + sig = PGPSignature.new(SignatureType.DirectlyOnKey, self.key_algorithm, hash_algo, self.fingerprint.keyid) + + # signature options that only make sense when adding a revocation key + sensitive = prefs.pop('sensitive', False) + keyclass = RevocationKeyClass.Normal | (RevocationKeyClass.Sensitive if sensitive else 0x00) + + sig._signature.subpackets.addnew('RevocationKey', + hashed=True, + algorithm=revoker.key_algorithm, + fingerprint=revoker.fingerprint, + keyclass=keyclass) + + # revocation keys should really not be revocable themselves + prefs['revocable'] = False + return self._sign(self, sig, **prefs) + + @KeyAction(is_unlocked=True, is_public=False) + def bind(self, key, **prefs): + """ + Bind a subkey to this key. + + Valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPkey.certify` + """ + hash_algo = prefs.pop('hash', None) + + if self.is_primary and not key.is_primary: + sig_type = SignatureType.Subkey_Binding + + elif key.is_primary and not self.is_primary: + sig_type = SignatureType.PrimaryKey_Binding + + else: # pragma: no cover + raise PGPError + + sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid) + + if sig_type == SignatureType.Subkey_Binding: + # signature options that only make sense in subkey binding signatures + usage = prefs.pop('usage', None) + + if usage is not None: + sig._signature.subpackets.addnew('KeyFlags', hashed=True, flags=usage) + + # if possible, have the subkey create a primary key binding signature + if key.key_algorithm.can_sign: + subkeyid = key.fingerprint.keyid + esig = None + + if not key.is_public: + esig = key.bind(self) + + elif subkeyid in self.subkeys: # pragma: no cover + esig = self.subkeys[subkeyid].bind(self) + + if esig is not None: + sig._signature.subpackets.addnew('EmbeddedSignature', hashed=False, _sig=esig._signature) + + return self._sign(key, sig, **prefs) + + def verify(self, subject, signature=None): + """ + Verify a subject with a signature using this key. + + :param subject: The subject to verify + :type subject: ``str``, ``unicode``, ``None``, :py:obj:`PGPMessage`, :py:obj:`PGPKey`, :py:obj:`PGPUID` + :param signature: If the signature is detached, it should be specified here. + :type signature: :py:obj:`PGPSignature` + :returns: :py:obj:`~pgpy.types.SignatureVerification` + """ + sspairs = [] + + # some type checking + if not isinstance(subject, (type(None), PGPMessage, PGPKey, PGPUID, PGPSignature, six.string_types, bytes, bytearray)): + raise TypeError("Unexpected subject value: {:s}".format(str(type(subject)))) + if not isinstance(signature, (type(None), PGPSignature)): + raise TypeError("Unexpected signature value: {:s}".format(str(type(signature)))) + + def _filter_sigs(sigs): + _ids = {self.fingerprint.keyid} | set(self.subkeys) + return [ sig for sig in sigs if sig.signer in _ids ] + + # collect signature(s) + if signature is None: + if isinstance(subject, PGPMessage): + sspairs += [ (sig, subject.message) for sig in _filter_sigs(subject.signatures) ] + + if isinstance(subject, (PGPUID, PGPKey)): + sspairs += [ (sig, subject) for sig in _filter_sigs(subject.__sig__) ] + + if isinstance(subject, PGPKey): + # user ids + sspairs += [ (sig, uid) for uid in subject.userids for sig in _filter_sigs(uid.__sig__) ] + # user attributes + sspairs += [ (sig, ua) for ua in subject.userattributes for sig in _filter_sigs(ua.__sig__) ] + # subkey binding signatures + sspairs += [ (sig, subkey) for subkey in subject.subkeys.values() for sig in _filter_sigs(subkey.__sig__) ] + + elif signature.signer in {self.fingerprint.keyid} | set(self.subkeys): + sspairs += [(signature, subject)] + + if len(sspairs) == 0: + raise PGPError("No signatures to verify") + + # finally, start verifying signatures + sigv = SignatureVerification() + for sig, subj in sspairs: + if self.fingerprint.keyid != sig.signer and sig.signer in self.subkeys: + warnings.warn("Signature was signed with this key's subkey: {:s}. " + "Verifying with subkey...".format(sig.signer), + stacklevel=2) + sigv &= self.subkeys[sig.signer].verify(subj, sig) + + else: + verified = self._key.verify(sig.hashdata(subj), sig.__sig__, getattr(hashes, sig.hash_algorithm.name)()) + if verified is NotImplemented: + raise NotImplementedError(sig.key_algorithm) + + sigv.add_sigsubj(sig, self.fingerprint.keyid, subj, verified) + + return sigv + + @KeyAction(KeyFlags.EncryptCommunications, KeyFlags.EncryptStorage, is_public=True) + def encrypt(self, message, sessionkey=None, **prefs): + """ + Encrypt a PGPMessage using this key. + + :param message: The message to encrypt. + :type message: :py:obj:`PGPMessage` + :optional param sessionkey: Provide a session key to use when encrypting something. Default is ``None``. + If ``None``, a session key of the appropriate length will be generated randomly. + + .. warning:: + + Care should be taken when making use of this option! Session keys *absolutely need* + to be unpredictable! Use the ``gen_key()`` method on the desired + :py:obj:`~constants.SymmetricKeyAlgorithm` to generate the session key! + :type sessionkey: ``bytes``, ``str`` + + :raises: :py:exc:`~errors.PGPEncryptionError` if encryption failed for any reason. + :returns: A new :py:obj:`PGPMessage` with the encrypted contents of ``message`` + + The following optional keyword arguments can be used with :py:meth:`PGPKey.encrypt`: + + :keyword cipher: Specifies the symmetric block cipher to use when encrypting the message. + :type cipher: :py:obj:`~constants.SymmetricKeyAlgorithm` + :keyword user: Specifies the User ID to use as the recipient for this encryption operation, for the purposes of + preference defaults and selection validation. + :type user: ``str``, ``unicode`` + """ + user = prefs.pop('user', None) + uid = None + if user is not None: + uid = self.get_uid(user) + else: + uid = next(iter(self.userids), None) + if uid is None and self.parent is not None: + uid = next(iter(self.parent.userids), None) + cipher_algo = prefs.pop('cipher', uid.selfsig.cipherprefs[0]) + + if cipher_algo not in uid.selfsig.cipherprefs: + warnings.warn("Selected symmetric algorithm not in key preferences", stacklevel=3) + + if message.is_compressed and message._compression not in uid.selfsig.compprefs: + warnings.warn("Selected compression algorithm not in key preferences", stacklevel=3) + + if sessionkey is None: + sessionkey = cipher_algo.gen_key() + + # set up a new PKESessionKeyV3 + pkesk = PKESessionKeyV3() + pkesk.encrypter = bytearray(binascii.unhexlify(self.fingerprint.keyid.encode('latin-1'))) + pkesk.pkalg = self.key_algorithm + # pkesk.encrypt_sk(self.__key__, cipher_algo, sessionkey) + pkesk.encrypt_sk(self._key, cipher_algo, sessionkey) + + if message.is_encrypted: # pragma: no cover + _m = message + + else: + _m = PGPMessage() + skedata = IntegrityProtectedSKEDataV1() + skedata.encrypt(sessionkey, cipher_algo, message.__bytes__()) + _m |= skedata + + _m |= pkesk + + return _m + + def decrypt(self, message): + """ + Decrypt a PGPMessage using this key. + + :param message: An encrypted :py:obj:`PGPMessage` + :returns: A new :py:obj:`PGPMessage` with the decrypted contents of ``message`` + """ + if not message.is_encrypted: + warnings.warn("This message is not encrypted", stacklevel=2) + return message + + if self.fingerprint.keyid not in message.issuers: + sks = set(self.subkeys) + mis = set(message.issuers) + if sks & mis: + skid = list(sks & mis)[0] + warnings.warn("Message was encrypted with this key's subkey: {:s}. " + "Decrypting with that...".format(skid), + stacklevel=2) + return self.subkeys[skid].decrypt(message) + + raise PGPError("Cannot decrypt the provided message with this key") + + pkesk = next(pk for pk in message._sessionkeys if pk.pkalg == self.key_algorithm and pk.encrypter == self.fingerprint.keyid) + alg, key = pkesk.decrypt_sk(self._key) + + # now that we have the symmetric cipher used and the key, we can decrypt the actual message + decmsg = PGPMessage() + decmsg.parse(message.message.decrypt(key, alg)) + + return decmsg + + def parse(self, data): + unarmored = self.ascii_unarmor(data) + data = unarmored['body'] + + if unarmored['magic'] is not None and 'KEY' not in unarmored['magic']: + raise ValueError('Expected: KEY. Got: {}'.format(str(unarmored['magic']))) + + if unarmored['headers'] is not None: + self.ascii_headers = unarmored['headers'] + + # parse packets + # keys will hold other keys parsed here + keys = collections.OrderedDict() + # orphaned will hold all non-opaque orphaned packets + orphaned = [] + # last holds the last non-signature thing processed + + ##TODO: see issue #141 and fix this better + getpkt = lambda d: Packet(d) if len(d) > 0 else None # flake8: noqa + # some packets are filtered out + getpkt = filter(lambda p: p.header.tag != PacketTag.Trust, iter(functools.partial(getpkt, data), None)) + + def pktgrouper(): + class PktGrouper(object): + def __init__(self): + self.last = None + + def __call__(self, pkt): + if pkt.header.tag != PacketTag.Signature: + self.last = '{:02X}_{:s}'.format(id(pkt), pkt.__class__.__name__) + return self.last + return PktGrouper() + + while True: + # print(type(p) for p in getpkt) + for group in iter(group for _, group in itertools.groupby(getpkt, key=pktgrouper()) if not _.endswith('Opaque')): + pkt = next(group) + + # deal with pkt first + if isinstance(pkt, Key): + pgpobj = (self if self._key is None else PGPKey()) | pkt + + elif isinstance(pkt, (UserID, UserAttribute)): + pgpobj = PGPUID() | pkt + + else: # pragma: no cover + break + + # add signatures to whatever we got + [ operator.ior(pgpobj, PGPSignature() | sig) for sig in group if not isinstance(sig, Opaque) ] + + # and file away pgpobj + if isinstance(pgpobj, PGPKey): + if pgpobj.is_primary: + keys[(pgpobj.fingerprint.keyid, pgpobj.is_public)] = pgpobj + + else: + keys[next(reversed(keys))] |= pgpobj + + elif isinstance(pgpobj, PGPUID): + # parent is likely the most recently parsed primary key + keys[next(reversed(keys))] |= pgpobj + + else: # pragma: no cover + break + else: + # finished normally + break + + # this will only be reached called if the inner loop hit a break + warnings.warn("Warning: Orphaned packet detected! {:s}".format(repr(pkt)), stacklevel=2) # pragma: no cover + orphaned.append(pkt) # pragma: no cover + for pkt in group: # pragma: no cover + orphaned.append(pkt) + + # remove the reference to self from keys + [ keys.pop((getattr(self, 'fingerprint.keyid', '~'), None), t) for t in (True, False) ] + # return {'keys': keys, 'orphaned': orphaned} + return keys + + +class PGPKeyring(collections.Container, collections.Iterable, collections.Sized): + def __init__(self, *args): + """ + PGPKeyring objects represent in-memory keyrings that can contain any combination of supported private and public + keys. It can not currently be conveniently exported to a format that can be understood by GnuPG. + """ + super(PGPKeyring, self).__init__() + self._keys = {} + self._pubkeys = collections.deque() + self._privkeys = collections.deque() + self._aliases = collections.deque([{}]) + self.load(*args) + + def __contains__(self, alias): + aliases = set().union(*self._aliases) + + if isinstance(alias, six.string_types): + return alias in aliases or alias.replace(' ', '') in aliases + + return alias in aliases # pragma: no cover + + def __len__(self): + return len(self._keys) + + def __iter__(self): # pragma: no cover + for pgpkey in itertools.chain(self._pubkeys, self._privkeys): + yield pgpkey + + def _get_key(self, alias): + for m in self._aliases: + if alias in m: + return self._keys[m[alias]] + + if alias.replace(' ', '') in m: + return self._keys[m[alias.replace(' ', '')]] + + raise KeyError(alias) + + def _get_keys(self, alias): + return [self._keys[m[alias]] for m in self._aliases if alias in m] + + def _sort_alias(self, alias): + # remove alias from all levels of _aliases, and sort by created time and key half + # so the order of _aliases from left to right: + # - newer keys come before older ones + # - private keys come before public ones + # + # this list is sorted in the opposite direction from that, because they will be placed into self._aliases + # from right to left. + pkids = sorted(list(set().union(m.pop(alias) for m in self._aliases if alias in m)), + key=lambda pkid: (self._keys[pkid].created, self._keys[pkid].is_public)) + + # drop the now-sorted aliases into place + for depth, pkid in enumerate(pkids): + self._aliases[depth][alias] = pkid + + # finally, remove any empty dicts left over + while {} in self._aliases: # pragma: no cover + self._aliases.remove({}) + + def _add_alias(self, alias, pkid): + # brand new alias never seen before! + if alias not in self: + self._aliases[-1][alias] = pkid + + # this is a duplicate alias->key link; ignore it + elif alias in self and pkid in set(m[alias] for m in self._aliases if alias in m): + pass # pragma: no cover + + # this is an alias that already exists, but points to a key that is not already referenced by it + else: + adepth = len(self._aliases) - len([None for m in self._aliases if alias in m]) - 1 + # all alias maps have this alias, so increase total depth by 1 + if adepth == -1: + self._aliases.appendleft({}) + adepth = 0 + + self._aliases[adepth][alias] = pkid + self._sort_alias(alias) + + def _add_key(self, pgpkey): + pkid = id(pgpkey) + if pkid not in self._keys: + self._keys[pkid] = pgpkey + + # add to _{pub,priv}keys if this is either a primary key, or a subkey without one + if pgpkey.parent is None: + if pgpkey.is_public: + self._pubkeys.append(pkid) + + else: + self._privkeys.append(pkid) + + # aliases + self._add_alias(pgpkey.fingerprint, pkid) + self._add_alias(pgpkey.fingerprint.keyid, pkid) + self._add_alias(pgpkey.fingerprint.shortid, pkid) + for uid in pgpkey.userids: + self._add_alias(uid.name, pkid) + if uid.comment: + self._add_alias(uid.comment, pkid) + + if uid.email: + self._add_alias(uid.email, pkid) + + # subkeys + for subkey in pgpkey.subkeys.values(): + self._add_key(subkey) + + def load(self, *args): + """ + Load all keys provided into this keyring object. + + :param \*args: Each arg in ``args`` can be any of the formats supported by :py:meth:`PGPKey.from_path` and + :py:meth:`PGPKey.from_blob`, or a ``list`` or ``tuple`` of these. + :type \*args: ``list``, ``tuple``, ``str``, ``unicode``, ``bytes``, ``bytearray`` + :returns: a ``set`` containing the unique fingerprints of all of the keys that were loaded during this operation. + """ + def _preiter(first, iterable): + yield first + for item in iterable: + yield item + + loaded = set() + for key in iter(item for ilist in iter(ilist if isinstance(ilist, (tuple, list)) else [ilist] for ilist in args) + for item in ilist): + if os.path.isfile(key): + _key, keys = PGPKey.from_file(key) + + else: + _key, keys = PGPKey.from_blob(key) + + for ik in _preiter(_key, keys.values()): + self._add_key(ik) + loaded |= {ik.fingerprint} | {isk.fingerprint for isk in ik.subkeys.values()} + + return list(loaded) + + @contextlib.contextmanager + def key(self, identifier): + """ + A context-manager method. Yields the first :py:obj:`PGPKey` object that matches the provided identifier. + + :param identifier: The identifier to use to select a loaded key. + :type identifier: :py:exc:`PGPMessage`, :py:exc:`PGPSignature`, ``str`` + :raises: :py:exc:`KeyError` if there is no loaded key that satisfies the identifier. + """ + if isinstance(identifier, PGPMessage): + for issuer in identifier.issuers: + if issuer in self: + identifier = issuer + break + + if isinstance(identifier, PGPSignature): + identifier = identifier.signer + + yield self._get_key(identifier) + + def fingerprints(self, keyhalf='any', keytype='any'): + """ + List loaded fingerprints with some optional filtering. + + :param str keyhalf: Can be 'any', 'public', or 'private'. If 'public', or 'private', the fingerprints of keys of the + the other type will not be included in the results. + :param str keytype: Can be 'any', 'primary', or 'sub'. If 'primary' or 'sub', the fingerprints of keys of the + the other type will not be included in the results. + :returns: a ``set`` of fingerprints of keys matching the filters specified. + """ + return {pk.fingerprint for pk in self._keys.values() + if pk.is_primary in [True if keytype in ['primary', 'any'] else None, + False if keytype in ['sub', 'any'] else None] + if pk.is_public in [True if keyhalf in ['public', 'any'] else None, + False if keyhalf in ['private', 'any'] else None]} + + def unload(self, key): + """ + Unload a loaded key and its subkeys. + + The easiest way to do this is to select a key using :py:meth:`PGPKeyring.key` first:: + + with keyring.key("DSA von TestKey") as key: + keyring.unload(key) + + :param key: The key to unload. + :type key: :py:obj:`PGPKey` + """ + assert isinstance(key, PGPKey) + pkid = id(key) + if pkid in self._keys: + # remove references + [ kd.remove(pkid) for kd in [self._pubkeys, self._privkeys] if pkid in kd ] + # remove the key + self._keys.pop(pkid) + + # remove aliases + for m, a in [ (m, a) for m in self._aliases for a, p in m.items() if p == pkid ]: + m.pop(a) + # do a re-sort of this alias if it was not unique + if a in self: + self._sort_alias(a) + + # if key is a primary key, unload its subkeys as well + if key.is_primary: + [ self.unload(sk) for sk in key.subkeys.values() ] |