summaryrefslogtreecommitdiff
path: root/src/leap/mx/vendor/pgpy/pgp.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mx/vendor/pgpy/pgp.py')
-rw-r--r--src/leap/mx/vendor/pgpy/pgp.py2527
1 files changed, 2527 insertions, 0 deletions
diff --git a/src/leap/mx/vendor/pgpy/pgp.py b/src/leap/mx/vendor/pgpy/pgp.py
new file mode 100644
index 0000000..82b3a0b
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/pgp.py
@@ -0,0 +1,2527 @@
+""" pgp.py
+
+this is where the armorable PGP block objects live
+"""
+import binascii
+import calendar
+import collections
+import contextlib
+import copy
+import functools
+import itertools
+import operator
+import os
+import re
+import warnings
+import weakref
+
+import six
+
+from datetime import datetime
+
+from cryptography.hazmat.primitives import hashes
+
+from .constants import CompressionAlgorithm
+from .constants import Features
+from .constants import HashAlgorithm
+from .constants import ImageEncoding
+from .constants import KeyFlags
+from .constants import NotationDataFlags
+from .constants import PacketTag
+from .constants import PubKeyAlgorithm
+from .constants import RevocationKeyClass
+from .constants import RevocationReason
+from .constants import SignatureType
+from .constants import SymmetricKeyAlgorithm
+
+from .decorators import KeyAction
+
+from .errors import PGPDecryptionError
+from .errors import PGPError
+
+from .packet import Key
+from .packet import MDC
+from .packet import Packet
+from .packet import Primary
+from .packet import Private
+from .packet import PubKeyV4
+from .packet import PubSubKeyV4
+from .packet import PrivKeyV4
+from .packet import PrivSubKeyV4
+from .packet import Public
+from .packet import Sub
+from .packet import UserID
+from .packet import UserAttribute
+
+from .packet.packets import CompressedData
+from .packet.packets import IntegrityProtectedSKEData
+from .packet.packets import IntegrityProtectedSKEDataV1
+from .packet.packets import LiteralData
+from .packet.packets import OnePassSignature
+from .packet.packets import OnePassSignatureV3
+from .packet.packets import PKESessionKey
+from .packet.packets import PKESessionKeyV3
+from .packet.packets import Signature
+from .packet.packets import SignatureV4
+from .packet.packets import SKEData
+from .packet.packets import Marker
+from .packet.packets import SKESessionKey
+from .packet.packets import SKESessionKeyV4
+
+from .packet.types import Opaque
+
+from .types import Armorable
+from .types import Fingerprint
+from .types import ParentRef
+from .types import PGPObject
+from .types import SignatureVerification
+from .types import SorteDeque
+
+__all__ = ['PGPSignature',
+ 'PGPUID',
+ 'PGPMessage',
+ 'PGPKey',
+ 'PGPKeyring']
+
+
+class PGPSignature(Armorable, ParentRef, PGPObject):
+ @property
+ def __sig__(self):
+ return self._signature.signature.__sig__()
+
+ @property
+ def cipherprefs(self):
+ """
+ A ``list`` of preferred symmetric algorithms specified in this signature, if any. Otherwise, an empty ``list``.
+ """
+ if 'PreferredSymmetricAlgorithms' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['h_PreferredSymmetricAlgorithms'])).flags
+ return []
+
+ @property
+ def compprefs(self):
+ """
+ A ``list`` of preferred compression algorithms specified in this signature, if any. Otherwise, an empty ``list``.
+ """
+ if 'PreferredCompressionAlgorithms' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['h_PreferredCompressionAlgorithms'])).flags
+ return []
+
+ @property
+ def created(self):
+ """
+ A :py:obj:`~datetime.datetime` of when this signature was created.
+ """
+ return self._signature.subpackets['h_CreationTime'][-1].created
+
+ @property
+ def embedded(self):
+ return self.parent is not None
+
+ @property
+ def expires_at(self):
+ """
+ A :py:obj:`~datetime.datetime` of when this signature expires, if a signature expiration date is specified.
+ Otherwise, ``None``
+ """
+ if 'SignatureExpirationTime' in self._signature.subpackets:
+ expd = next(iter(self._signature.subpackets['SignatureExpirationTime'])).expires
+ return self.created + expd
+ return None
+
+ @property
+ def exportable(self):
+ """
+ ``False`` if this signature is marked as being not exportable. Otherwise, ``True``.
+ """
+ if 'ExportableCertification' in self._signature.subpackets:
+ return bool(next(iter(self._signature.subpackets['ExportableCertification'])))
+
+ return True
+
+ @property
+ def features(self):
+ """
+ A ``set`` of implementation features specified in this signature, if any. Otherwise, an empty ``set``.
+ """
+ if 'Features' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['Features'])).flags
+ return set()
+
+ @property
+ def hash2(self):
+ return self._signature.hash2
+
+ @property
+ def hashprefs(self):
+ """
+ A ``list`` of preferred hash algorithms specified in this signature, if any. Otherwise, an empty ``list``.
+ """
+ if 'PreferredHashAlgorithms' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['h_PreferredHashAlgorithms'])).flags
+ return []
+
+ @property
+ def hash_algorithm(self):
+ """
+ The :py:obj:`~constants.HashAlgorithm` used when computing this signature.
+ """
+ return self._signature.halg
+
+ @property
+ def is_expired(self):
+ """
+ ``True`` if the signature has an expiration date, and is expired. Otherwise, ``False``
+ """
+ expires_at = self.expires_at
+ if expires_at is not None and expires_at != self.created:
+ return expires_at < datetime.utcnow()
+
+ return False
+
+ @property
+ def key_algorithm(self):
+ """
+ The :py:obj:`~constants.PubKeyAlgorithm` of the key that generated this signature.
+ """
+ return self._signature.pubalg
+
+ @property
+ def key_expiration(self):
+ if 'KeyExpirationTime' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['KeyExpirationTime'])).expires
+ return None
+
+ @property
+ def key_flags(self):
+ """
+ A ``set`` of :py:obj:`~constants.KeyFlags` specified in this signature, if any. Otherwise, an empty ``set``.
+ """
+ if 'KeyFlags' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['h_KeyFlags'])).flags
+ return set()
+
+ @property
+ def keyserver(self):
+ """
+ The preferred key server specified in this signature, if any. Otherwise, an empty ``str``.
+ """
+ if 'PreferredKeyServer' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['h_PreferredKeyServer'])).uri
+ return ''
+
+ @property
+ def keyserverprefs(self):
+ """
+ A ``list`` of :py:obj:`~constants.KeyServerPreferences` in this signature, if any. Otherwise, an empty ``list``.
+ """
+ if 'KeyServerPreferences' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['h_KeyServerPreferences'])).flags
+ return []
+
+ @property
+ def magic(self):
+ return "SIGNATURE"
+
+ @property
+ def notation(self):
+ """
+ A ``dict`` of notation data in this signature, if any. Otherwise, an empty ``dict``.
+ """
+ return dict((nd.name, nd.value) for nd in self._signature.subpackets['NotationData'])
+
+ @property
+ def policy_uri(self):
+ """
+ The policy URI specified in this signature, if any. Otherwise, an empty ``str``.
+ """
+ if 'Policy' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['Policy'])).uri
+ return ''
+
+ @property
+ def revocable(self):
+ """
+ ``False`` if this signature is marked as being not revocable. Otherwise, ``True``.
+ """
+ if 'Revocable' in self._signature.subpackets:
+ return bool(next(iter(self._signature.subpackets['Revocable'])))
+ return True
+
+ @property
+ def revocation_key(self):
+ if 'RevocationKey' in self._signature.subpackets:
+ raise NotImplementedError()
+ return None
+
+ @property
+ def signer(self):
+ """
+ The 16-character Key ID of the key that generated this signature.
+ """
+ return self._signature.signer
+
+ @property
+ def target_signature(self):
+ return NotImplemented
+
+ @property
+ def type(self):
+ """
+ The :py:obj:`~constants.SignatureType` of this signature.
+ """
+ return self._signature.sigtype
+
+ @classmethod
+ def new(cls, sigtype, pkalg, halg, signer):
+ sig = PGPSignature()
+
+ sigpkt = SignatureV4()
+ sigpkt.header.tag = 2
+ sigpkt.header.version = 4
+ sigpkt.subpackets.addnew('CreationTime', hashed=True, created=datetime.utcnow())
+ sigpkt.subpackets.addnew('Issuer', _issuer=signer)
+
+ sigpkt.sigtype = sigtype
+ sigpkt.pubalg = pkalg
+
+ if halg is not None:
+ sigpkt.halg = halg
+
+ sig._signature = sigpkt
+ return sig
+
+ def __init__(self):
+ """
+ PGPSignature objects represent OpenPGP compliant signatures.
+
+ PGPSignature implements the ``__str__`` method, the output of which will be the signature object in
+ OpenPGP-compliant ASCII-armored format.
+
+ PGPSignature implements the ``__bytes__`` method, the output of which will be the signature object in
+ OpenPGP-compliant binary format.
+ """
+ super(PGPSignature, self).__init__()
+ self._signature = None
+
+ def __bytearray__(self):
+ return self._signature.__bytearray__()
+
+ def __repr__(self):
+ return "<PGPSignature [{:s}] object at 0x{:02x}>".format(self.type.name, id(self))
+
+ def __lt__(self, other):
+ return self.created < other.created
+
+ def __or__(self, other):
+ if isinstance(other, Signature):
+ if self._signature is None:
+ self._signature = other
+ return self
+
+ ##TODO: this is not a great way to do this
+ if other.__class__.__name__ == 'EmbeddedSignature':
+ self._signature = other
+ return self
+
+ raise TypeError
+
+ def __copy__(self):
+ # because the default shallow copy isn't actually all that useful,
+ # and deepcopy does too much work
+ sig = super(PGPSignature, self).__copy__()
+ # sig = PGPSignature()
+ # sig.ascii_headers = self.ascii_headers.copy()
+ sig |= copy.copy(self._signature)
+ return sig
+
+ def hashdata(self, subject):
+ _data = bytearray()
+
+ if isinstance(subject, six.string_types):
+ subject = subject.encode('latin-1')
+
+ """
+ All signatures are formed by producing a hash over the signature
+ data, and then using the resulting hash in the signature algorithm.
+ """
+
+ if self.type == SignatureType.BinaryDocument:
+ """
+ For binary document signatures (type 0x00), the document data is
+ hashed directly.
+ """
+ _data += bytearray(subject)
+
+ if self.type == SignatureType.CanonicalDocument:
+ """
+ For text document signatures (type 0x01), the
+ document is canonicalized by converting line endings to <CR><LF>,
+ and the resulting data is hashed.
+ """
+ _data += re.subn(br'\r?\n', b'\r\n', subject)[0]
+
+ if self.type in {SignatureType.Generic_Cert, SignatureType.Persona_Cert, SignatureType.Casual_Cert,
+ SignatureType.Positive_Cert, SignatureType.CertRevocation, SignatureType.Subkey_Binding,
+ SignatureType.PrimaryKey_Binding}:
+ """
+ When a signature is made over a key, the hash data starts with the
+ octet 0x99, followed by a two-octet length of the key, and then body
+ of the key packet. (Note that this is an old-style packet header for
+ a key packet with two-octet length.) ...
+ Key revocation signatures (types 0x20 and 0x28)
+ hash only the key being revoked.
+ """
+ _s = b''
+ if isinstance(subject, PGPUID):
+ _s = subject._parent.hashdata
+
+ elif isinstance(subject, PGPKey) and not subject.is_primary:
+ _s = subject._parent.hashdata
+
+ elif isinstance(subject, PGPKey) and subject.is_primary:
+ _s = subject.hashdata
+
+ if len(_s) > 0:
+ _data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s
+
+ if self.type in {SignatureType.Subkey_Binding, SignatureType.PrimaryKey_Binding}:
+ """
+ A subkey binding signature
+ (type 0x18) or primary key binding signature (type 0x19) then hashes
+ the subkey using the same format as the main key (also using 0x99 as
+ the first octet).
+ """
+ if subject.is_primary:
+ _s = subject.subkeys[self.signer].hashdata
+
+ else:
+ _s = subject.hashdata
+
+ _data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s
+
+ if self.type in {SignatureType.KeyRevocation, SignatureType.SubkeyRevocation, SignatureType.DirectlyOnKey}:
+ """
+ The signature is calculated directly on the key being revoked. A
+ revoked key is not to be used. Only revocation signatures by the
+ key being revoked, or by an authorized revocation key, should be
+ considered valid revocation signatures.
+
+ Subkey revocation signature
+ The signature is calculated directly on the subkey being revoked.
+ A revoked subkey is not to be used. Only revocation signatures
+ by the top-level signature key that is bound to this subkey, or
+ by an authorized revocation key, should be considered valid
+ revocation signatures.
+
+ Signature directly on a key
+ This signature is calculated directly on a key. It binds the
+ information in the Signature subpackets to the key, and is
+ appropriate to be used for subpackets that provide information
+ about the key, such as the Revocation Key subpacket. It is also
+ appropriate for statements that non-self certifiers want to make
+ about the key itself, rather than the binding between a key and a
+ name.
+ """
+ _s = subject.hashdata
+ _data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s
+
+ if self.type in {SignatureType.Generic_Cert, SignatureType.Persona_Cert, SignatureType.Casual_Cert,
+ SignatureType.Positive_Cert, SignatureType.CertRevocation}:
+ """
+ A certification signature (type 0x10 through 0x13) hashes the User
+ ID being bound to the key into the hash context after the above
+ data. ... A V4 certification
+ hashes the constant 0xB4 for User ID certifications or the constant
+ 0xD1 for User Attribute certifications, followed by a four-octet
+ number giving the length of the User ID or User Attribute data, and
+ then the User ID or User Attribute data.
+
+ ...
+
+ The [certificate revocation] signature
+ is computed over the same data as the certificate that it
+ revokes, and should have a later creation date than that
+ certificate.
+ """
+
+ _s = subject.hashdata
+ if subject.is_uid:
+ _data += b'\xb4'
+
+ else:
+ _data += b'\xd1'
+
+ _data += self.int_to_bytes(len(_s), 4) + _s
+
+ # if this is a new signature, do update_hlen
+ if 0 in list(self._signature.signature):
+ self._signature.update_hlen()
+
+ """
+ Once the data body is hashed, then a trailer is hashed. (...)
+ A V4 signature hashes the packet body
+ starting from its first field, the version number, through the end
+ of the hashed subpacket data. Thus, the fields hashed are the
+ signature version, the signature type, the public-key algorithm, the
+ hash algorithm, the hashed subpacket length, and the hashed
+ subpacket body.
+
+ V4 signatures also hash in a final trailer of six octets: the
+ version of the Signature packet, i.e., 0x04; 0xFF; and a four-octet,
+ big-endian number that is the length of the hashed data from the
+ Signature packet (note that this number does not include these final
+ six octets).
+ """
+
+ hcontext = bytearray()
+ hcontext.append(self._signature.header.version if not self.embedded else self._signature._sig.header.version)
+ hcontext.append(self.type)
+ hcontext.append(self.key_algorithm)
+ hcontext.append(self.hash_algorithm)
+ hcontext += self._signature.subpackets.__hashbytearray__()
+ hlen = len(hcontext)
+ _data += hcontext
+ _data += b'\x04\xff'
+ _data += self.int_to_bytes(hlen, 4)
+ return bytes(_data)
+
+ def make_onepass(self):
+ onepass = OnePassSignatureV3()
+ onepass.sigtype = self.type
+ onepass.halg = self.hash_algorithm
+ onepass.pubalg = self.key_algorithm
+ onepass.signer = self.signer
+ onepass.update_hlen()
+ return onepass
+
+ def parse(self, packet):
+ unarmored = self.ascii_unarmor(packet)
+ data = unarmored['body']
+
+ if unarmored['magic'] is not None and unarmored['magic'] != 'SIGNATURE':
+ raise ValueError('Expected: SIGNATURE. Got: {}'.format(str(unarmored['magic'])))
+
+ if unarmored['headers'] is not None:
+ self.ascii_headers = unarmored['headers']
+
+ # load *one* packet from data
+ pkt = Packet(data)
+ if pkt.header.tag == PacketTag.Signature and not isinstance(pkt, Opaque):
+ self._signature = pkt
+
+ else:
+ raise ValueError('Expected: Signature. Got: {:s}'.format(pkt.__class__.__name__))
+
+
+class PGPUID(ParentRef):
+ @property
+ def __sig__(self):
+ return list(self._signatures)
+
+ @property
+ def name(self):
+ """If this is a User ID, the stored name. If this is not a User ID, this will be an empty string."""
+ return self._uid.name if isinstance(self._uid, UserID) else ""
+
+ @property
+ def comment(self):
+ """
+ If this is a User ID, this will be the stored comment. If this is not a User ID, or there is no stored comment,
+ this will be an empty string.,
+ """
+
+ return self._uid.comment if isinstance(self._uid, UserID) else ""
+
+ @property
+ def email(self):
+ """
+ If this is a User ID, this will be the stored email address. If this is not a User ID, or there is no stored
+ email address, this will be an empty string.
+ """
+ return self._uid.email if isinstance(self._uid, UserID) else ""
+
+ @property
+ def image(self):
+ """
+ If this is a User Attribute, this will be the stored image. If this is not a User Attribute, this will be ``None``.
+ """
+ return self._uid.image.image if isinstance(self._uid, UserAttribute) else None
+
+ @property
+ def is_primary(self):
+ """
+ If the most recent, valid self-signature specifies this as being primary, this will be True. Otherwise, Faqlse.
+ """
+ return bool(next(iter(self.selfsig._signature.subpackets['h_PrimaryUserID']), False))
+
+ @property
+ def is_uid(self):
+ """
+ ``True`` if this is a User ID, otherwise False.
+ """
+ return isinstance(self._uid, UserID)
+
+ @property
+ def is_ua(self):
+ """
+ ``True`` if this is a User Attribute, otherwise False.
+ """
+ return isinstance(self._uid, UserAttribute)
+
+ @property
+ def selfsig(self):
+ """
+ This will be the most recent, self-signature of this User ID or Attribute. If there isn't one, this will be ``None``.
+ """
+ if self.parent is not None:
+ return next((sig for sig in reversed(self._signatures) if sig.signer == self.parent.fingerprint.keyid), None)
+
+ @property
+ def signers(self):
+ """
+ This will be a set of all of the key ids which have signed this User ID or Attribute.
+ """
+ return set(s.signer for s in self.__sig__)
+
+ @property
+ def hashdata(self):
+ if self.is_uid:
+ return self._uid.__bytearray__()[len(self._uid.header):]
+
+ if self.is_ua:
+ return self._uid.subpackets.__bytearray__()
+
+ @classmethod
+ def new(cls, pn, comment="", email=""):
+ """
+ Create a new User ID or photo.
+
+ :param pn: User ID name, or photo. If this is a ``bytearray``, it will be loaded as a photo.
+ Otherwise, it will be used as the name field for a User ID.
+ :type pn: ``bytearray``, ``str``, ``unicode``
+ :param comment: The comment field for a User ID. Ignored if this is a photo.
+ :type comment: ``str``, ``unicode``
+ :param email: The email address field for a User ID. Ignored if this is a photo.
+ :type email: ``str``, ``unicode``
+ :returns: :py:obj:`PGPUID`
+ """
+ uid = PGPUID()
+ if isinstance(pn, bytearray):
+ uid._uid = UserAttribute()
+ uid._uid.image.image = pn
+ uid._uid.image.iencoding = ImageEncoding.encodingof(pn)
+ uid._uid.update_hlen()
+
+ else:
+ uid._uid = UserID()
+ uid._uid.name = pn
+ uid._uid.comment = comment
+ uid._uid.email = email
+ uid._uid.update_hlen()
+
+ return uid
+
+ def __init__(self):
+ """
+ PGPUID objects represent User IDs and User Attributes for keys.
+
+ PGPUID implements the ``__format__`` method for User IDs, returning a string in the format
+ 'name (comment) <email>', leaving out any comment or email fields that are not present.
+ """
+ super(PGPUID, self).__init__()
+ self._uid = None
+ self._signatures = SorteDeque()
+
+ def __repr__(self):
+ if self.selfsig is not None:
+ return "<PGPUID [{:s}][{}] at 0x{:02X}>".format(self._uid.__class__.__name__, self.selfsig.created, id(self))
+ return "<PGPUID [{:s}] at 0x{:02X}>".format(self._uid.__class__.__name__, id(self))
+
+ def __lt__(self, other): # pragma: no cover
+ if self.is_uid == other.is_uid:
+ if self.is_primary == other.is_primary:
+ return self.selfsig > other.selfsig
+
+ if self.is_primary:
+ return True
+
+ return False
+
+ if self.is_uid and other.is_ua:
+ return True
+
+ if self.is_ua and other.is_uid:
+ return False
+
+ def __or__(self, other):
+ if isinstance(other, PGPSignature):
+ self._signatures.insort(other)
+ if self.parent is not None and self in self.parent._uids:
+ self.parent._uids.resort(self)
+
+ return self
+
+ if isinstance(other, UserID) and self._uid is None:
+ self._uid = other
+ return self
+
+ if isinstance(other, UserAttribute) and self._uid is None:
+ self._uid = other
+ return self
+
+ raise TypeError("unsupported operand type(s) for |: '{:s}' and '{:s}'"
+ "".format(self.__class__.__name__, other.__class__.__name__))
+
+ def __copy__(self):
+ # because the default shallow copy isn't actually all that useful,
+ # and deepcopy does too much work
+ uid = PGPUID()
+ uid |= copy.copy(self._uid)
+ for sig in self._signatures:
+ uid |= copy.copy(sig)
+ return uid
+
+ def __format__(self, format_spec):
+ if self.is_uid:
+ comment = six.u("") if self.comment == "" else six.u(" ({:s})").format(self.comment)
+ email = six.u("") if self.email == "" else six.u(" <{:s}>").format(self.email)
+ return six.u("{:s}{:s}{:s}").format(self.name, comment, email)
+
+ raise NotImplementedError
+
+
+class PGPMessage(Armorable, PGPObject):
+ @staticmethod
+ def dash_unescape(text):
+ return re.subn(r'^- -', '-', text, flags=re.MULTILINE)[0]
+
+ @staticmethod
+ def dash_escape(text):
+ return re.subn(r'^-', '- -', text, flags=re.MULTILINE)[0]
+
+ @property
+ def encrypters(self):
+ """A ``set`` containing all key ids (if any) to which this message was encrypted."""
+ return set(m.encrypter for m in self._sessionkeys if isinstance(m, PKESessionKey))
+
+ @property
+ def filename(self):
+ """If applicable, returns the original filename of the message. Otherwise, returns an empty string."""
+ if self.type == 'literal':
+ return self._message.filename
+ return ''
+
+ @property
+ def is_compressed(self):
+ """``True`` if this message will be compressed when exported"""
+ return self._compression != CompressionAlgorithm.Uncompressed
+
+ @property
+ def is_encrypted(self):
+ """``True`` if this message is encrypted; otherwise, ``False``"""
+ return isinstance(self._message, (SKEData, IntegrityProtectedSKEData))
+
+ @property
+ def is_sensitive(self):
+ """``True`` if this message is marked sensitive; otherwise ``False``"""
+ return self.type == 'literal' and self._message.filename == '_CONSOLE'
+
+ @property
+ def is_signed(self):
+ """
+ ``True`` if this message is signed; otherwise, ``False``.
+ Should always be ``False`` if the message is encrypted.
+ """
+ return len(self._signatures) > 0
+
+ @property
+ def issuers(self):
+ """A ``set`` containing all key ids (if any) which have signed or encrypted this message."""
+ return self.encrypters | self.signers
+
+ @property
+ def magic(self):
+ if self.type == 'cleartext':
+ return "SIGNATURE"
+ return "MESSAGE"
+
+ @property
+ def message(self):
+ """The message contents"""
+ if self.type == 'cleartext':
+ return self.bytes_to_text(self._message)
+
+ if self.type == 'literal':
+ return self._message.contents
+
+ if self.type == 'encrypted':
+ return self._message
+
+ @property
+ def signatures(self):
+ """A ``set`` containing all key ids (if any) which have signed this message."""
+ return list(self._signatures)
+
+ @property
+ def signers(self):
+ """A ``set`` containing all key ids (if any) which have signed this message."""
+ return set(m.signer for m in self._signatures)
+
+ @property
+ def type(self):
+ ##TODO: it might be better to use an Enum for the output of this
+ if isinstance(self._message, (six.string_types, six.binary_type, bytearray)):
+ return 'cleartext'
+
+ if isinstance(self._message, LiteralData):
+ return 'literal'
+
+ if isinstance(self._message, (SKEData, IntegrityProtectedSKEData)):
+ return 'encrypted'
+
+ raise NotImplementedError
+
+ def __init__(self):
+ """
+ PGPMessage objects represent OpenPGP message compositions.
+
+ PGPMessage implements the `__str__` method, the output of which will be the message composition in
+ OpenPGP-compliant ASCII-armored format.
+
+ PGPMessage implements the `__bytes__` method, the output of which will be the message composition in
+ OpenPGP-compliant binary format.
+
+ Any signatures within the PGPMessage that are marked as being non-exportable will not be included in the output
+ of either of those methods.
+ """
+ super(PGPMessage, self).__init__()
+ self._compression = CompressionAlgorithm.Uncompressed
+ self._message = None
+ self._mdc = None
+ self._signatures = SorteDeque()
+ self._sessionkeys = []
+
+ def __bytearray__(self):
+ if self.is_compressed:
+ comp = CompressedData()
+ comp.calg = self._compression
+ comp.packets = [pkt for pkt in self]
+ comp.update_hlen()
+ return comp.__bytearray__()
+
+ _bytes = bytearray()
+ for pkt in self:
+ _bytes += pkt.__bytearray__()
+ return _bytes
+
+ def __str__(self):
+ if self.type == 'cleartext':
+ tmpl = u"-----BEGIN PGP SIGNED MESSAGE-----\n" \
+ u"{hhdr:s}\n" \
+ u"{cleartext:s}\n" \
+ u"{signature:s}"
+
+ # only add a Hash: header if we actually have at least one signature
+ hashes = set(s.hash_algorithm.name for s in self.signatures)
+ hhdr = 'Hash: {hashes:s}\n'.format(hashes=','.join(sorted(hashes))) if hashes else ''
+
+ return tmpl.format(hhdr=hhdr,
+ cleartext=self.dash_escape(self.bytes_to_text(self._message)),
+ signature=super(PGPMessage, self).__str__())
+
+ return super(PGPMessage, self).__str__()
+
+ def __iter__(self):
+ if self.type == 'cleartext':
+ for sig in self._signatures:
+ yield sig
+
+ elif self.is_encrypted:
+ for pkt in self._sessionkeys:
+ yield pkt
+ yield self.message
+
+ else:
+ ##TODO: is it worth coming up with a way of disabling one-pass signing?
+ for sig in self._signatures:
+ ops = sig.make_onepass()
+ if sig is not self._signatures[-1]:
+ ops.nested = True
+ yield ops
+
+ yield self._message
+ if self._mdc is not None: # pragma: no cover
+ yield self._mdc
+
+ for sig in self._signatures:
+ yield sig
+
+ def __or__(self, other):
+ if isinstance(other, Marker):
+ return self
+
+ if isinstance(other, CompressedData):
+ self._compression = other.calg
+ for pkt in other.packets:
+ self |= pkt
+ return self
+
+ if isinstance(other, (six.string_types, six.binary_type, bytearray)):
+ if self._message is None:
+ self._message = self.text_to_bytes(other)
+ return self
+
+ if isinstance(other, (LiteralData, SKEData, IntegrityProtectedSKEData)):
+ if self._message is None:
+ self._message = other
+ return self
+
+ if isinstance(other, MDC):
+ if self._mdc is None:
+ self._mdc = other
+ return self
+
+ if isinstance(other, OnePassSignature):
+ # these are "generated" on the fly during composition
+ return self
+
+ if isinstance(other, Signature):
+ other = PGPSignature() | other
+
+ if isinstance(other, PGPSignature):
+ self._signatures.insort(other)
+ return self
+
+ if isinstance(other, (PKESessionKey, SKESessionKey)):
+ self._sessionkeys.append(other)
+ return self
+
+ if isinstance(other, PGPMessage):
+ self._message = other._message
+ self._mdc = other._mdc
+ self._compression = other._compression
+ self._sessionkeys += other._sessionkeys
+ self._signatures += other._signatures
+ return self
+
+ raise NotImplementedError(str(type(other)))
+
+ def __copy__(self):
+ msg = super(PGPMessage, self).__copy__()
+ msg._compression = self._compression
+ msg._message = copy.copy(self._message)
+ msg._mdc = copy.copy(self._mdc)
+
+ for sig in self._signatures:
+ msg |= copy.copy(sig)
+
+ for sk in self._sessionkeys:
+ msg |= copy.copy(sk)
+
+ return msg
+
+ @classmethod
+ def new(cls, message, **kwargs):
+ """
+ Create a new PGPMessage object.
+
+ :param message: The message to be stored.
+ :type message: ``str``, ``unicode``, ``bytes``, ``bytearray``
+ :returns: :py:obj:`PGPMessage`
+
+ The following optional keyword arguments can be used with :py:meth:`PGPMessage.new`:
+
+ :keyword file: if True, ``message`` should be a path to a file. The contents of that file will be read and used
+ as the contents of the message.
+ :type file: ``bool``
+ :keyword cleartext: if True, the message will be cleartext with inline signatures.
+ :type cleartext: ``bool``
+ :keyword sensitive: if True, the filename will be set to '_CONSOLE' to signal other OpenPGP clients to treat
+ this message as being 'for your eyes only'. Ignored if cleartext is True.
+ :type sensitive: ``bool``
+ :keyword format: Set the message format identifier. Ignored if cleartext is True.
+ :type format: ``str``
+ :keyword compression: Set the compression algorithm for the new message.
+ Defaults to :py:obj:`CompressionAlgorithm.ZIP`. Ignored if cleartext is True.
+ :keyword encoding: Set the Charset header for the message.
+ :type encoding: ``str`` representing a valid codec in codecs
+ """
+ # TODO: have 'codecs' above (in :type encoding:) link to python documentation page on codecs
+ cleartext = kwargs.pop('cleartext', False)
+ format = kwargs.pop('format', None)
+ sensitive = kwargs.pop('sensitive', False)
+ compression = kwargs.pop('compression', CompressionAlgorithm.ZIP)
+ file = kwargs.pop('file', False)
+ charset = kwargs.pop('encoding', None)
+
+ filename = ''
+ mtime = datetime.utcnow()
+
+ msg = PGPMessage()
+
+ if charset:
+ msg.charset = charset
+
+ # if format in 'tu' and isinstance(message, (six.binary_type, bytearray)):
+ # # if message format is text or unicode and we got binary data, we'll need to transcode it to UTF-8
+ # message =
+
+ if file and os.path.isfile(message):
+ filename = message
+ message = bytearray(os.path.getsize(filename))
+ mtime = datetime.utcfromtimestamp(os.path.getmtime(filename))
+
+ with open(filename, 'rb') as mf:
+ mf.readinto(message)
+
+ # if format is None, we can try to detect it
+ if format is None:
+ if isinstance(message, six.text_type):
+ # message is definitely UTF-8 already
+ format = 'u'
+
+ elif cls.is_ascii(message):
+ # message is probably text
+ format = 't'
+
+ else:
+ # message is probably binary
+ format = 'b'
+
+ # if message is a binary type and we're building a textual message, we need to transcode the bytes to UTF-8
+ if isinstance(message, (six.binary_type, bytearray)) and (cleartext or format in 'tu'):
+ message = message.decode(charset or 'utf-8')
+
+ if cleartext:
+ msg |= message
+
+ else:
+ # load literal data
+ lit = LiteralData()
+ lit._contents = bytearray(msg.text_to_bytes(message))
+ lit.filename = '_CONSOLE' if sensitive else os.path.basename(filename)
+ lit.mtime = mtime
+ lit.format = format
+
+ # if cls.is_ascii(message):
+ # lit.format = 't'
+
+ lit.update_hlen()
+
+ msg |= lit
+ msg._compression = compression
+
+ return msg
+
+ def encrypt(self, passphrase, sessionkey=None, **prefs):
+ """
+ Encrypt the contents of this message using a passphrase.
+ :param passphrase: The passphrase to use for encrypting this message.
+ :type passphrase: ``str``, ``unicode``, ``bytes``
+
+ :optional param sessionkey: Provide a session key to use when encrypting something. Default is ``None``.
+ If ``None``, a session key of the appropriate length will be generated randomly.
+
+ .. warning::
+
+ Care should be taken when making use of this option! Session keys *absolutely need*
+ to be unpredictable! Use the ``gen_key()`` method on the desired
+ :py:obj:`~constants.SymmetricKeyAlgorithm` to generate the session key!
+
+ :type sessionkey: ``bytes``, ``str``
+ :raises: :py:exc:`~errors.PGPEncryptionError`
+ :returns: A new :py:obj:`PGPMessage` containing the encrypted contents of this message.
+ """
+ cipher_algo = prefs.pop('cipher', SymmetricKeyAlgorithm.AES256)
+ hash_algo = prefs.pop('hash', HashAlgorithm.SHA256)
+
+ # set up a new SKESessionKeyV4
+ skesk = SKESessionKeyV4()
+ skesk.s2k.usage = 255
+ skesk.s2k.specifier = 3
+ skesk.s2k.halg = hash_algo
+ skesk.s2k.encalg = cipher_algo
+ skesk.s2k.count = skesk.s2k.halg.tuned_count
+
+ if sessionkey is None:
+ sessionkey = cipher_algo.gen_key()
+ skesk.encrypt_sk(passphrase, sessionkey)
+ del passphrase
+
+ msg = PGPMessage() | skesk
+
+ if not self.is_encrypted:
+ skedata = IntegrityProtectedSKEDataV1()
+ skedata.encrypt(sessionkey, cipher_algo, self.__bytes__())
+ msg |= skedata
+
+ else:
+ msg |= self
+
+ return msg
+
+ def decrypt(self, passphrase):
+ """
+ Attempt to decrypt this message using a passphrase.
+
+ :param passphrase: The passphrase to use to attempt to decrypt this message.
+ :type passphrase: ``str``, ``unicode``, ``bytes``
+ :raises: :py:exc:`~errors.PGPDecryptionError` if decryption failed for any reason.
+ :returns: A new :py:obj:`PGPMessage` containing the decrypted contents of this message
+ """
+ if not self.is_encrypted:
+ raise PGPError("This message is not encrypted!")
+
+ for skesk in iter(sk for sk in self._sessionkeys if isinstance(sk, SKESessionKey)):
+ try:
+ symalg, key = skesk.decrypt_sk(passphrase)
+ decmsg = PGPMessage()
+ decmsg.parse(self.message.decrypt(key, symalg))
+
+ except (TypeError, ValueError, NotImplementedError, PGPDecryptionError):
+ continue
+
+ else:
+ del passphrase
+ break
+
+ else:
+ raise PGPDecryptionError("Decryption failed")
+
+ return decmsg
+
+ def parse(self, packet):
+ unarmored = self.ascii_unarmor(packet)
+ data = unarmored['body']
+
+ if unarmored['magic'] is not None and unarmored['magic'] not in ['MESSAGE', 'SIGNATURE']:
+ raise ValueError('Expected: MESSAGE. Got: {}'.format(str(unarmored['magic'])))
+
+ if unarmored['headers'] is not None:
+ self.ascii_headers = unarmored['headers']
+
+ # cleartext signature
+ if unarmored['magic'] == 'SIGNATURE':
+ # the composition for this will be the 'cleartext' as a str,
+ # followed by one or more signatures (each one loaded into a PGPSignature)
+ self |= self.dash_unescape(unarmored['cleartext'])
+ while len(data) > 0:
+ pkt = Packet(data)
+ if not isinstance(pkt, Signature): # pragma: no cover
+ warnings.warn("Discarded unexpected packet: {:s}".format(pkt.__class__.__name__), stacklevel=2)
+ continue
+ self |= PGPSignature() | pkt
+
+ else:
+ while len(data) > 0:
+ self |= Packet(data)
+
+
+class PGPKey(Armorable, ParentRef, PGPObject):
+ """
+ 11.1. Transferable Public Keys
+
+ OpenPGP users may transfer public keys. The essential elements of a
+ transferable public key are as follows:
+
+ - One Public-Key packet
+
+ - Zero or more revocation signatures
+ - One or more User ID packets
+
+ - After each User ID packet, zero or more Signature packets
+ (certifications)
+
+ - Zero or more User Attribute packets
+
+ - After each User Attribute packet, zero or more Signature packets
+ (certifications)
+
+ - Zero or more Subkey packets
+
+ - After each Subkey packet, one Signature packet, plus optionally a
+ revocation
+
+ The Public-Key packet occurs first. Each of the following User ID
+ packets provides the identity of the owner of this public key. If
+ there are multiple User ID packets, this corresponds to multiple
+ means of identifying the same unique individual user; for example, a
+ user may have more than one email address, and construct a User ID
+ for each one.
+
+ Immediately following each User ID packet, there are zero or more
+ Signature packets. Each Signature packet is calculated on the
+ immediately preceding User ID packet and the initial Public-Key
+ packet. The signature serves to certify the corresponding public key
+ and User ID. In effect, the signer is testifying to his or her
+ belief that this public key belongs to the user identified by this
+ User ID.
+
+ Within the same section as the User ID packets, there are zero or
+ more User Attribute packets. Like the User ID packets, a User
+ Attribute packet is followed by zero or more Signature packets
+ calculated on the immediately preceding User Attribute packet and the
+ initial Public-Key packet.
+
+ User Attribute packets and User ID packets may be freely intermixed
+ in this section, so long as the signatures that follow them are
+ maintained on the proper User Attribute or User ID packet.
+
+ After the User ID packet or Attribute packet, there may be zero or
+ more Subkey packets. In general, subkeys are provided in cases where
+ the top-level public key is a signature-only key. However, any V4
+ key may have subkeys, and the subkeys may be encryption-only keys,
+ signature-only keys, or general-purpose keys. V3 keys MUST NOT have
+ subkeys.
+
+ Each Subkey packet MUST be followed by one Signature packet, which
+ should be a subkey binding signature issued by the top-level key.
+ For subkeys that can issue signatures, the subkey binding signature
+ MUST contain an Embedded Signature subpacket with a primary key
+ binding signature (0x19) issued by the subkey on the top-level key.
+
+ Subkey and Key packets may each be followed by a revocation Signature
+ packet to indicate that the key is revoked. Revocation signatures
+ are only accepted if they are issued by the key itself, or by a key
+ that is authorized to issue revocations via a Revocation Key
+ subpacket in a self-signature by the top-level key.
+
+ Transferable public-key packet sequences may be concatenated to allow
+ transferring multiple public keys in one operation.
+
+ 11.2. Transferable Secret Keys
+
+ OpenPGP users may transfer secret keys. The format of a transferable
+ secret key is the same as a transferable public key except that
+ secret-key and secret-subkey packets are used instead of the public
+ key and public-subkey packets. Implementations SHOULD include self-
+ signatures on any user IDs and subkeys, as this allows for a complete
+ public key to be automatically extracted from the transferable secret
+ key. Implementations MAY choose to omit the self-signatures,
+ especially if a transferable public key accompanies the transferable
+ secret key.
+ """
+ @property
+ def __key__(self):
+ return self._key.keymaterial
+
+ @property
+ def __sig__(self):
+ return list(self._signatures)
+
+ @property
+ def created(self):
+ """A :py:obj:`~datetime.datetime` object of the creation date and time of the key, in UTC."""
+ return self._key.created
+
+ @property
+ def expires_at(self):
+ """A :py:obj:`~datetime.datetime` object of when this key is to be considered expired, if any. Otherwise, ``None``"""
+ try:
+ expires = min(sig.key_expiration for sig in itertools.chain(iter(uid.selfsig for uid in self.userids), self.self_signatures)
+ if sig.key_expiration is not None)
+
+ except ValueError:
+ return None
+
+ else:
+ return (self.created + expires)
+
+ @property
+ def fingerprint(self):
+ """The fingerprint of this key, as a :py:obj:`~pgpy.types.Fingerprint` object."""
+ if self._key:
+ return self._key.fingerprint
+
+ @property
+ def hashdata(self):
+ # when signing a key, only the public portion of the keys is hashed
+ # if this is a private key, the private components of the key material need to be left out
+ if self.is_public:
+ return self._key.__bytearray__()[len(self._key.header):]
+
+ pub = self._key.pubkey()
+ return pub.__bytearray__()[len(pub.header):]
+
+ @property
+ def is_expired(self):
+ """``True`` if this key is expired, otherwise ``False``"""
+ expires = self.expires_at
+ if expires is not None:
+ return expires <= datetime.utcnow()
+
+ return False
+
+ @property
+ def is_primary(self):
+ """``True`` if this is a primary key; ``False`` if this is a subkey"""
+ return isinstance(self._key, Primary) and not isinstance(self._key, Sub)
+
+ @property
+ def is_protected(self):
+ """``True`` if this is a private key that is protected with a passphrase, otherwise ``False``"""
+ if self.is_public:
+ return False
+
+ return self._key.protected
+
+ @property
+ def is_public(self):
+ """``True`` if this is a public key, otherwise ``False``"""
+ return isinstance(self._key, Public) and not isinstance(self._key, Private)
+
+ @property
+ def is_unlocked(self):
+ """``False`` if this is a private key that is protected with a passphrase and has not yet been unlocked, otherwise ``True``"""
+ if self.is_public:
+ return True
+
+ if not self.is_protected:
+ return True
+
+ return self._key.unlocked
+
+ @property
+ def key_algorithm(self):
+ """The :py:obj:`constants.PubKeyAlgorithm` pertaining to this key"""
+ return self._key.pkalg
+
+ @property
+ def key_size(self):
+ """*new in 0.4.1*
+ The size pertaining to this key. ``int`` for non-EC key algorithms; :py:obj:`constants.EllipticCurveOID` for EC keys.
+ """
+ if self.key_algorithm in {PubKeyAlgorithm.ECDSA, PubKeyAlgorithm.ECDH}:
+ return self._key.keymaterial.oid
+ return next(iter(self._key.keymaterial)).bit_length()
+
+ @property
+ def magic(self):
+ return '{:s} KEY BLOCK'.format('PUBLIC' if (isinstance(self._key, Public) and not isinstance(self._key, Private)) else
+ 'PRIVATE' if isinstance(self._key, Private) else '')
+
+ @property
+ def pubkey(self):
+ """If the :py:obj:`PGPKey` object is a private key, this method returns a corresponding public key object with
+ all the trimmings. Otherwise, returns ``None``
+ """
+ if not self.is_public:
+ if self._sibling is None or isinstance(self._sibling, weakref.ref):
+ # create a new key shell
+ pub = PGPKey()
+ pub.ascii_headers = self.ascii_headers.copy()
+
+ # get the public half of the primary key
+ pub._key = self._key.pubkey()
+
+ # get the public half of each subkey
+ for skid, subkey in self.subkeys.items():
+ pub.subkeys[skid] = subkey.pubkey
+
+ # copy user ids and user attributes
+ for uid in self._uids:
+ pub |= copy.copy(uid)
+
+ # copy signatures that weren't copied with uids
+ for sig in self._signatures:
+ if sig.parent is None:
+ pub |= copy.copy(sig)
+
+ # keep connect the two halves using a weak reference
+ self._sibling = weakref.ref(pub)
+ pub._sibling = weakref.ref(self)
+
+ return self._sibling()
+ return None
+
+ @pubkey.setter
+ def pubkey(self, pubkey):
+ if self.is_public:
+ raise TypeError("cannot add public sibling to pubkey")
+
+ if not pubkey.is_public:
+ raise TypeError("sibling must be public")
+
+ if self._sibling is not None and self._sibling() is not None:
+ raise ValueError("public key reference already set")
+
+ if pubkey.fingerprint != self.fingerprint:
+ raise ValueError("key fingerprint mismatch")
+
+ # TODO: sync packets with sibling
+ self._sibling = weakref.ref(pubkey)
+ pubkey._sibling = weakref.ref(self)
+
+ @property
+ def self_signatures(self):
+ keyid, keytype = (self.fingerprint.keyid, SignatureType.DirectlyOnKey) if self.is_primary \
+ else (self.parent.fingerprint.keyid, SignatureType.Subkey_Binding)
+
+ ##TODO: filter out revoked signatures as well
+ for sig in iter(sig for sig in self._signatures
+ if all([sig.type == keytype, sig.signer == keyid, not sig.is_expired])):
+ yield sig
+
+ @property
+ def signers(self):
+ """A ``set`` of key ids of keys that were used to sign this key"""
+ return {sig.signer for sig in self.__sig__}
+
+ @property
+ def subkeys(self):
+ """An :py:obj:`~collections.OrderedDict` of subkeys bound to this primary key, if applicable,
+ selected by 16-character keyid."""
+ return self._children
+
+ @property
+ def userids(self):
+ """A ``list`` of :py:obj:`PGPUID` objects containing User ID information about this key"""
+ return [ u for u in self._uids if u.is_uid ]
+
+ @property
+ def userattributes(self):
+ """A ``list`` of :py:obj:`PGPUID` objects containing one or more images associated with this key"""
+ return [u for u in self._uids if u.is_ua]
+
+ @classmethod
+ def new(cls, key_algorithm, key_size):
+ """
+ Generate a new PGP key
+
+ :param key_algorithm: Key algorithm to use.
+ :type key_algorithm: A :py:obj:`~constants.PubKeyAlgorithm`
+ :param key_size: Key size in bits, unless `key_algorithm` is :py:obj:`~constants.PubKeyAlgorithm.ECDSA` or
+ :py:obj:`~constants.PubKeyAlgorithm.ECDH`, in which case it should be the Curve OID to use.
+ :type key_size: ``int`` or :py:obj:`~constants.EllipticCurveOID`
+ :return: A newly generated :py:obj:`PGPKey`
+ """
+ # new private key shell first
+ key = PGPKey()
+
+ if key_algorithm in {PubKeyAlgorithm.RSAEncrypt, PubKeyAlgorithm.RSASign}: # pragma: no cover
+ warnings.warn('{:s} is deprecated - generating key using RSAEncryptOrSign'.format(key_algorithm.name))
+ key_algorithm = PubKeyAlgorithm.RSAEncryptOrSign
+
+ # generate some key data to match key_algorithm and key_size
+ key._key = PrivKeyV4.new(key_algorithm, key_size)
+
+ return key
+
+ def __init__(self):
+ """
+ PGPKey objects represent OpenPGP compliant keys along with all of their associated data.
+
+ PGPKey implements the `__str__` method, the output of which will be the key composition in
+ OpenPGP-compliant ASCII-armored format.
+
+ PGPKey implements the `__bytes__` method, the output of which will be the key composition in
+ OpenPGP-compliant binary format.
+
+ Any signatures within the PGPKey that are marked as being non-exportable will not be included in the output
+ of either of those methods.
+ """
+ super(PGPKey, self).__init__()
+ self._key = None
+ self._children = collections.OrderedDict()
+ self._signatures = SorteDeque()
+ self._uids = SorteDeque()
+ self._sibling = None
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ # us
+ _bytes += self._key.__bytearray__()
+ # our signatures; ignore embedded signatures
+ for sig in iter(s for s in self._signatures if not s.embedded and s.exportable):
+ _bytes += sig.__bytearray__()
+ # one or more User IDs, followed by their signatures
+ for uid in self._uids:
+ _bytes += uid._uid.__bytearray__()
+ for s in [s for s in uid._signatures if s.exportable]:
+ _bytes += s.__bytearray__()
+ # subkeys
+ for sk in self._children.values():
+ _bytes += sk.__bytearray__()
+
+ return _bytes
+
+ def __repr__(self):
+ if self._key is not None:
+ return "<PGPKey [{:s}][0x{:s}] at 0x{:02X}>" \
+ "".format(self._key.__class__.__name__, self.fingerprint.keyid, id(self))
+
+ return "<PGPKey [unknown] at 0x{:02X}>" \
+ "".format(id(self))
+
+ def __contains__(self, item):
+ if isinstance(item, PGPKey): # pragma: no cover
+ return item.fingerprint.keyid in self.subkeys
+
+ if isinstance(item, Fingerprint): # pragma: no cover
+ return item.keyid in self.subkeys
+
+ if isinstance(item, PGPUID):
+ return item in self._uids
+
+ if isinstance(item, PGPSignature):
+ return item in self._signatures
+
+ raise TypeError
+
+ def __or__(self, other, from_sib=False):
+ if isinstance(other, Key) and self._key is None:
+ self._key = other
+
+ elif isinstance(other, PGPKey) and not other.is_primary and other.is_public == self.is_public:
+ other._parent = self
+ self._children[other.fingerprint.keyid] = other
+
+ elif isinstance(other, PGPSignature):
+ self._signatures.insort(other)
+
+ # if this is a subkey binding signature that has embedded primary key binding signatures, add them to parent
+ if other.type == SignatureType.Subkey_Binding:
+ for es in iter(pkb for pkb in other._signature.subpackets['EmbeddedSignature']):
+ esig = PGPSignature() | es
+ esig._parent = other
+ self._signatures.insort(esig)
+
+ elif isinstance(other, PGPUID):
+ other._parent = weakref.ref(self)
+ self._uids.insort(other)
+
+ else:
+ raise TypeError("unsupported operand type(s) for |: '{:s}' and '{:s}'"
+ "".format(self.__class__.__name__, other.__class__.__name__))
+
+ if isinstance(self._sibling, weakref.ref) and not from_sib:
+ sib = self._sibling()
+ if sib is None:
+ self._sibling = None
+
+ else: # pragma: no cover
+ sib.__or__(copy.copy(other), True)
+
+ return self
+
+ def __copy__(self):
+ key = super(PGPKey, self).__copy__()
+ key._key = copy.copy(self._key)
+
+ for uid in self._uids:
+ key |= copy.copy(uid)
+
+ for id, subkey in self._children.items():
+ key |= copy.copy(subkey)
+
+ for sig in self._signatures:
+ if sig.embedded:
+ # embedded signatures don't need to be explicitly copied
+ continue
+
+ print(len(key._signatures))
+ key |= copy.copy(sig)
+
+ return key
+
+ def protect(self, passphrase, enc_alg, hash_alg):
+ """
+ Add a passphrase to a private key. If the key is already passphrase protected, it should be unlocked before
+ a new passphrase can be specified.
+
+ Has no effect on public keys.
+
+ :param passphrase: A passphrase to protect the key with
+ :type passphrase: ``str``, ``unicode``
+ :param enc_alg: Symmetric encryption algorithm to use to protect the key
+ :type enc_alg: :py:obj:`~constants.SymmetricKeyAlgorithm`
+ :param hash_alg: Hash algorithm to use in the String-to-Key specifier
+ :type hash_alg: :py:obj:`~constants.HashAlgorithm`
+ """
+ ##TODO: specify strong defaults for enc_alg and hash_alg
+ if self.is_public:
+ # we can't protect public keys because only private key material is ever protected
+ warnings.warn("Public keys cannot be passphrase-protected", stacklevel=2)
+ return
+
+ if self.is_protected and not self.is_unlocked:
+ # we can't protect a key that is already protected unless it is unlocked first
+ warnings.warn("This key is already protected with a passphrase - "
+ "please unlock it before attempting to specify a new passphrase", stacklevel=2)
+ return
+
+ for sk in itertools.chain([self], self.subkeys.values()):
+ sk._key.protect(passphrase, enc_alg, hash_alg)
+
+ del passphrase
+
+ @contextlib.contextmanager
+ def unlock(self, passphrase):
+ """
+ Context manager method for unlocking passphrase-protected private keys. Has no effect if the key is not both
+ private and passphrase-protected.
+
+ When the context managed block is exited, the unprotected private key material is removed.
+
+ Example::
+
+ privkey = PGPKey()
+ privkey.parse(keytext)
+
+ assert privkey.is_protected
+ assert privkey.is_unlocked is False
+ # privkey.sign("some text") <- this would raise an exception
+
+ with privkey.unlock("TheCorrectPassphrase"):
+ # privkey is now unlocked
+ assert privkey.is_unlocked
+ # so you can do things with it
+ sig = privkey.sign("some text")
+
+ # privkey is no longer unlocked
+ assert privkey.is_unlocked is False
+
+ Emits a :py:obj:`~warnings.UserWarning` if the key is public or not passphrase protected.
+
+ :param str passphrase: The passphrase to be used to unlock this key.
+ :raises: :py:exc:`~pgpy.errors.PGPDecryptionError` if the passphrase is incorrect
+ """
+ if self.is_public:
+ # we can't unprotect public keys because only private key material is ever protected
+ warnings.warn("Public keys cannot be passphrase-protected", stacklevel=3)
+ yield self
+ return
+
+ if not self.is_protected:
+ # we can't unprotect private keys that are not protected, because there is no ciphertext to decrypt
+ warnings.warn("This key is not protected with a passphrase", stacklevel=3)
+ yield self
+ return
+
+ try:
+ for sk in itertools.chain([self], self.subkeys.values()):
+ sk._key.unprotect(passphrase)
+ del passphrase
+ yield self
+
+ finally:
+ # clean up here by deleting the previously decrypted secret key material
+ for sk in itertools.chain([self], self.subkeys.values()):
+ sk._key.keymaterial.clear()
+
+ def add_uid(self, uid, selfsign=True, **prefs):
+ """
+ Add a User ID to this key.
+
+ :param uid: The user id to add
+ :type uid: :py:obj:`~pgpy.PGPUID`
+ :param selfsign: Whether or not to self-sign the user id before adding it
+ :type selfsign: ``bool``
+
+ Valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPKey.certify`.
+ Any such keyword arguments are ignored if selfsign is ``False``
+ """
+ uid._parent = self
+ if selfsign:
+ uid |= self.certify(uid, SignatureType.Positive_Cert, **prefs)
+
+ self |= uid
+
+ def get_uid(self, search):
+ """
+ Find and return a User ID that matches the search string given.
+
+ :param search: A text string to match name, comment, or email address against
+ :type search: ``str``, ``unicode``
+ :return: The first matching :py:obj:`~pgpy.PGPUID`, or ``None`` if no matches were found.
+ """
+ if self.is_primary:
+ return next((u for u in self._uids if search in filter(lambda a: a is not None, (u.name, u.comment, u.email))), None)
+ return self.parent.get_uid(search)
+
+ def del_uid(self, search):
+ """
+ Find and remove a user id that matches the search string given. This method does not modify the corresponding
+ :py:obj:`~pgpy.PGPUID` object; it only removes it from the list of user ids on the key.
+
+ :param search: A text string to match name, comment, or email address against
+ :type search: ``str``, ``unicode``
+ """
+ u = self.get_uid(search)
+
+ if u is None:
+ raise KeyError("uid '{:s}' not found".format(search))
+
+ u._parent = None
+ self._uids.remove(u)
+
+ def add_subkey(self, key, **prefs):
+ """
+ Add a key as a subkey to this key.
+ :param key: A private :py:obj:`~pgpy.PGPKey` that does not have any subkeys of its own
+
+ :keyword usage: A ``set`` of key usage flags, as :py:obj:`~constants.KeyFlags` for the subkey to be added.
+ :type usage: ``set``
+
+ Other valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPKey.certify`
+ """
+ if self.is_public:
+ raise PGPError("Cannot add a subkey to a public key. Add the subkey to the private component first!")
+
+ if key.is_public:
+ raise PGPError("Cannot add a public key as a subkey to this key")
+
+ if key.is_primary:
+ if len(key._children) > 0:
+ raise PGPError("Cannot add a key that already has subkeys as a subkey!")
+
+ # convert key into a subkey
+ npk = PrivSubKeyV4()
+ npk.pkalg = key._key.pkalg
+ npk.created = key._key.created
+ npk.keymaterial = key._key.keymaterial
+ key._key = npk
+ key._key.update_hlen()
+
+ self._children[key.fingerprint.keyid] = key
+ key._parent = self
+
+ ##TODO: skip this step if the key already has a subkey binding signature
+ bsig = self.bind(key, **prefs)
+ key |= bsig
+
+ def _get_key_flags(self, user=None):
+ if self.is_primary:
+ if user is not None:
+ user = self.get_uid(user)
+
+ elif len(self._uids) == 0:
+ return {KeyFlags.Certify}
+
+ else:
+ user = next(iter(self.userids))
+
+ # RFC 4880 says that primary keys *must* be capable of certification
+ return {KeyFlags.Certify} | user.selfsig.key_flags
+
+ return next(self.self_signatures).key_flags
+
+ def _sign(self, subject, sig, **prefs):
+ """
+ The actual signing magic happens here.
+ :param subject: The subject to sign
+ :param sig: The :py:obj:`PGPSignature` object the new signature is to be encapsulated within
+ :returns: ``sig``, after the signature is added to it.
+ """
+ user = prefs.pop('user', None)
+ uid = None
+ if user is not None:
+ uid = self.get_uid(user)
+
+ else:
+ uid = next(iter(self.userids), None)
+ if uid is None and self.parent is not None:
+ uid = next(iter(self.parent.userids), None)
+
+ if sig.hash_algorithm is None:
+ sig._signature.halg = uid.selfsig.hashprefs[0]
+
+ if uid is not None and sig.hash_algorithm not in uid.selfsig.hashprefs:
+ warnings.warn("Selected hash algorithm not in key preferences", stacklevel=4)
+
+ # signature options that can be applied at any level
+ expires = prefs.pop('expires', None)
+ notation = prefs.pop('notation', None)
+ revocable = prefs.pop('revocable', True)
+ policy_uri = prefs.pop('policy_uri', None)
+
+ if expires is not None:
+ # expires should be a timedelta, so if it's a datetime, turn it into a timedelta
+ if isinstance(expires, datetime):
+ expires = expires - self.created
+
+ sig._signature.subpackets.addnew('SignatureExpirationTime', hashed=True, expires=expires)
+
+ if revocable is False:
+ sig._signature.subpackets.addnew('Revocable', hashed=True, bflag=revocable)
+
+ if notation is not None:
+ for name, value in notation.items():
+ # mark all notations as human readable unless value is a bytearray
+ flags = NotationDataFlags.HumanReadable
+ if isinstance(value, bytearray):
+ flags = 0x00
+
+ sig._signature.subpackets.addnew('NotationData', hashed=True, flags=flags, name=name, value=value)
+
+ if policy_uri is not None:
+ sig._signature.subpackets.addnew('Policy', hashed=True, uri=policy_uri)
+
+ if user is not None and uid is not None:
+ signers_uid = "{:s}".format(uid)
+ sig._signature.subpackets.addnew('SignersUserID', hashed=True, userid=signers_uid)
+
+ # handle an edge case for timestamp signatures vs standalone signatures
+ if sig.type == SignatureType.Timestamp and len(sig._signature.subpackets._hashed_sp) > 1:
+ sig._signature.sigtype = SignatureType.Standalone
+
+ sigdata = sig.hashdata(subject)
+ h2 = sig.hash_algorithm.hasher
+ h2.update(sigdata)
+ sig._signature.hash2 = bytearray(h2.digest()[:2])
+
+ _sig = self._key.sign(sigdata, getattr(hashes, sig.hash_algorithm.name)())
+ if _sig is NotImplemented:
+ raise NotImplementedError(self.key_algorithm)
+
+ sig._signature.signature.from_signer(_sig)
+ sig._signature.update_hlen()
+
+ return sig
+
+ @KeyAction(KeyFlags.Sign, is_unlocked=True, is_public=False)
+ def sign(self, subject, **prefs):
+ """
+ Sign text, a message, or a timestamp using this key.
+
+ :param subject: The text to be signed
+ :type subject: ``str``, :py:obj:`~pgpy.PGPMessage`, ``None``
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public
+ :returns: :py:obj:`PGPSignature`
+
+ The following optional keyword arguments can be used with :py:meth:`PGPKey.sign`, as well as
+ :py:meth:`PGPKey.certify`, :py:meth:`PGPKey.revoke`, and :py:meth:`PGPKey.bind`:
+
+ :keyword expires: Set an expiration date for this signature
+ :type expires: :py:obj:`~datetime.datetime`, :py:obj:`~datetime.timedelta`
+ :keyword notation: Add arbitrary notation data to this signature.
+ :type notation: ``dict``
+ :keyword policy_uri: Add a URI to the signature that should describe the policy under which the signature
+ was issued.
+ :type policy_uri: ``str``
+ :keyword revocable: If ``False``, this signature will be marked non-revocable
+ :type revocable: ``bool``
+ :keyword user: Specify which User ID to use when creating this signature. Also adds a "Signer's User ID"
+ to the signature.
+ :type user: ``str``
+ """
+ sig_type = SignatureType.BinaryDocument
+ hash_algo = prefs.pop('hash', None)
+
+ if subject is None:
+ sig_type = SignatureType.Timestamp
+
+ if isinstance(subject, PGPMessage):
+ if subject.type == 'cleartext':
+ sig_type = SignatureType.CanonicalDocument
+
+ subject = subject.message
+
+ sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid)
+
+ return self._sign(subject, sig, **prefs)
+
+ @KeyAction(KeyFlags.Certify, is_unlocked=True, is_public=False)
+ def certify(self, subject, level=SignatureType.Generic_Cert, **prefs):
+ """
+ Sign a key or a user id within a key.
+
+ :param subject: The user id or key to be certified.
+ :type subject: :py:obj:`PGPKey`, :py:obj:`PGPUID`
+ :param level: :py:obj:`~constants.SignatureType.Generic_Cert`, :py:obj:`~constants.SignatureType.Persona_Cert`,
+ :py:obj:`~constants.SignatureType.Casual_Cert`, or :py:obj:`~constants.SignatureType.Positive_Cert`.
+ Only used if subject is a :py:obj:`PGPUID`; otherwise, it is ignored.
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public
+ :returns: :py:obj:`PGPSignature`
+
+ In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional
+ keyword arguments can be used with :py:meth:`PGPKey.certify`.
+
+ These optional keywords only make sense, and thus only have an effect, when self-signing a key or User ID:
+
+ :keyword usage: A ``set`` of key usage flags, as :py:obj:`~constants.KeyFlags`.
+ This keyword is ignored for non-self-certifications.
+ :type usage: ``set``
+ :keyword ciphers: A list of preferred symmetric ciphers, as :py:obj:`~constants.SymmetricKeyAlgorithm`.
+ This keyword is ignored for non-self-certifications.
+ :type ciphers: ``list``
+ :keyword hashes: A list of preferred hash algorithms, as :py:obj:`~constants.HashAlgorithm`.
+ This keyword is ignored for non-self-certifications.
+ :type hashes: ``list``
+ :keyword compression: A list of preferred compression algorithms, as :py:obj:`~constants.CompressionAlgorithm`.
+ This keyword is ignored for non-self-certifications.
+ :type compression: ``list``
+ :keyword key_expiration: Specify a key expiration date for when this key should expire, or a
+ :py:obj:`~datetime.timedelta` of how long after the key was created it should expire.
+ This keyword is ignored for non-self-certifications.
+ :type key_expiration: :py:obj:`datetime.datetime`, :py:obj:`datetime.timedelta`
+ :keyword keyserver: Specify the URI of the preferred key server of the user.
+ This keyword is ignored for non-self-certifications.
+ :type keyserver: ``str``, ``unicode``, ``bytes``
+ :keyword primary: Whether or not to consider the certified User ID as the primary one.
+ This keyword is ignored for non-self-certifications, and any certifications directly on keys.
+ :type primary: ``bool``
+
+ These optional keywords only make sense, and thus only have an effect, when signing another key or User ID:
+
+ :keyword trust: Specify the level and amount of trust to assert when certifying a public key. Should be a tuple
+ of two ``int`` s, specifying the trust level and trust amount. See
+ `RFC 4880 Section 5.2.3.13. Trust Signature <http://tools.ietf.org/html/rfc4880#section-5.2.3.13>`_
+ for more on what these values mean.
+ :type trust: ``tuple`` of two ``int`` s
+ :keyword regex: Specify a regular expression to constrain the specified trust signature in the resulting signature.
+ Symbolically signifies that the specified trust signature only applies to User IDs which match
+ this regular expression.
+ This is meaningless without also specifying trust level and amount.
+ :type regex: ``str``
+ """
+ hash_algo = prefs.pop('hash', None)
+ sig_type = level
+ if isinstance(subject, PGPKey):
+ sig_type = SignatureType.DirectlyOnKey
+
+ sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid)
+
+ # signature options that only make sense in certifications
+ usage = prefs.pop('usage', None)
+ exportable = prefs.pop('exportable', None)
+
+ if usage is not None:
+ sig._signature.subpackets.addnew('KeyFlags', hashed=True, flags=usage)
+
+ if exportable is not None:
+ sig._signature.subpackets.addnew('ExportableCertification', hashed=True, bflag=exportable)
+
+ keyfp = self.fingerprint
+ if isinstance(subject, PGPKey):
+ keyfp = subject.fingerprint
+ if isinstance(subject, PGPUID) and subject._parent is not None:
+ keyfp = subject._parent.fingerprint
+
+ if keyfp == self.fingerprint:
+ # signature options that only make sense in self-certifications
+ cipher_prefs = prefs.pop('ciphers', None)
+ hash_prefs = prefs.pop('hashes', None)
+ compression_prefs = prefs.pop('compression', None)
+ key_expires = prefs.pop('key_expiration', None)
+ keyserver_flags = prefs.pop('keyserver_flags', None)
+ keyserver = prefs.pop('keyserver', None)
+ primary_uid = prefs.pop('primary', None)
+
+ if key_expires is not None:
+ # key expires should be a timedelta, so if it's a datetime, turn it into a timedelta
+ if isinstance(key_expires, datetime):
+ key_expires = key_expires - self.created
+
+ sig._signature.subpackets.addnew('KeyExpirationTime', hashed=True, expires=key_expires)
+
+ if cipher_prefs is not None:
+ sig._signature.subpackets.addnew('PreferredSymmetricAlgorithms', hashed=True, flags=cipher_prefs)
+
+ if hash_prefs is not None:
+ sig._signature.subpackets.addnew('PreferredHashAlgorithms', hashed=True, flags=hash_prefs)
+ if sig.hash_algorithm is None:
+ sig._signature.halg = hash_prefs[0]
+
+ if compression_prefs is not None:
+ sig._signature.subpackets.addnew('PreferredCompressionAlgorithms', hashed=True, flags=compression_prefs)
+
+ if keyserver_flags is not None:
+ sig._signature.subpackets.addnew('KeyServerPreferences', hashed=True, flags=keyserver_flags)
+
+ if keyserver is not None:
+ sig._signature.subpackets.addnew('PreferredKeyServer', hashed=True, uri=keyserver)
+
+ if primary_uid is not None:
+ sig._signature.subpackets.addnew('PrimaryUserID', hashed=True, primary=primary_uid)
+
+ # Features is always set on self-signatures
+ sig._signature.subpackets.addnew('Features', hashed=True, flags=Features.pgpy_features)
+
+ else:
+ # signature options that only make sense in non-self-certifications
+ trust = prefs.pop('trust', None)
+ regex = prefs.pop('regex', None)
+
+ if trust is not None:
+ sig._signature.subpackets.addnew('TrustSignature', hashed=True, level=trust[0], amount=trust[1])
+
+ if regex is not None:
+ sig._signature.subpackets.addnew('RegularExpression', hashed=True, regex=regex)
+
+ return self._sign(subject, sig, **prefs)
+
+ @KeyAction(KeyFlags.Certify, is_unlocked=True, is_public=False)
+ def revoke(self, target, **prefs):
+ """
+ Revoke a key, a subkey, or all current certification signatures of a User ID that were generated by this key so far.
+
+ :param target: The key to revoke
+ :type target: :py:obj:`PGPKey`, :py:obj:`PGPUID`
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public
+ :returns: :py:obj:`PGPSignature`
+
+ In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional
+ keyword arguments can be used with :py:meth:`PGPKey.revoke`.
+
+ :keyword reason: Defaults to :py:obj:`constants.RevocationReason.NotSpecified`
+ :type reason: One of :py:obj:`constants.RevocationReason`.
+ :keyword comment: Defaults to an empty string.
+ :type comment: ``str``
+ """
+ hash_algo = prefs.pop('hash', None)
+ if isinstance(target, PGPUID):
+ sig_type = SignatureType.CertRevocation
+
+ elif isinstance(target, PGPKey):
+ ##TODO: check to make sure that the key that is being revoked:
+ # - is this key
+ # - is one of this key's subkeys
+ # - specifies this key as its revocation key
+ if target.is_primary:
+ sig_type = SignatureType.KeyRevocation
+
+ else:
+ sig_type = SignatureType.SubkeyRevocation
+
+ else: # pragma: no cover
+ raise TypeError
+
+ sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid)
+
+ # signature options that only make sense when revoking
+ reason = prefs.pop('reason', RevocationReason.NotSpecified)
+ comment = prefs.pop('comment', "")
+ sig._signature.subpackets.addnew('ReasonForRevocation', hashed=True, code=reason, string=comment)
+
+ return self._sign(target, sig, **prefs)
+
+ @KeyAction(is_unlocked=True, is_public=False)
+ def revoker(self, revoker, **prefs):
+ """
+ Generate a signature that specifies another key as being valid for revoking this key.
+
+ :param revoker: The :py:obj:`PGPKey` to specify as a valid revocation key.
+ :type revoker: :py:obj:`PGPKey`
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public
+ :returns: :py:obj:`PGPSignature`
+
+ In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional
+ keyword arguments can be used with :py:meth:`PGPKey.revoker`.
+
+ :keyword sensitive: If ``True``, this sets the sensitive flag on the RevocationKey subpacket. Currently,
+ this has no other effect.
+ :type sensitive: ``bool``
+ """
+ hash_algo = prefs.pop('hash', None)
+
+ sig = PGPSignature.new(SignatureType.DirectlyOnKey, self.key_algorithm, hash_algo, self.fingerprint.keyid)
+
+ # signature options that only make sense when adding a revocation key
+ sensitive = prefs.pop('sensitive', False)
+ keyclass = RevocationKeyClass.Normal | (RevocationKeyClass.Sensitive if sensitive else 0x00)
+
+ sig._signature.subpackets.addnew('RevocationKey',
+ hashed=True,
+ algorithm=revoker.key_algorithm,
+ fingerprint=revoker.fingerprint,
+ keyclass=keyclass)
+
+ # revocation keys should really not be revocable themselves
+ prefs['revocable'] = False
+ return self._sign(self, sig, **prefs)
+
+ @KeyAction(is_unlocked=True, is_public=False)
+ def bind(self, key, **prefs):
+ """
+ Bind a subkey to this key.
+
+ Valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPkey.certify`
+ """
+ hash_algo = prefs.pop('hash', None)
+
+ if self.is_primary and not key.is_primary:
+ sig_type = SignatureType.Subkey_Binding
+
+ elif key.is_primary and not self.is_primary:
+ sig_type = SignatureType.PrimaryKey_Binding
+
+ else: # pragma: no cover
+ raise PGPError
+
+ sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid)
+
+ if sig_type == SignatureType.Subkey_Binding:
+ # signature options that only make sense in subkey binding signatures
+ usage = prefs.pop('usage', None)
+
+ if usage is not None:
+ sig._signature.subpackets.addnew('KeyFlags', hashed=True, flags=usage)
+
+ # if possible, have the subkey create a primary key binding signature
+ if key.key_algorithm.can_sign:
+ subkeyid = key.fingerprint.keyid
+ esig = None
+
+ if not key.is_public:
+ esig = key.bind(self)
+
+ elif subkeyid in self.subkeys: # pragma: no cover
+ esig = self.subkeys[subkeyid].bind(self)
+
+ if esig is not None:
+ sig._signature.subpackets.addnew('EmbeddedSignature', hashed=False, _sig=esig._signature)
+
+ return self._sign(key, sig, **prefs)
+
+ def verify(self, subject, signature=None):
+ """
+ Verify a subject with a signature using this key.
+
+ :param subject: The subject to verify
+ :type subject: ``str``, ``unicode``, ``None``, :py:obj:`PGPMessage`, :py:obj:`PGPKey`, :py:obj:`PGPUID`
+ :param signature: If the signature is detached, it should be specified here.
+ :type signature: :py:obj:`PGPSignature`
+ :returns: :py:obj:`~pgpy.types.SignatureVerification`
+ """
+ sspairs = []
+
+ # some type checking
+ if not isinstance(subject, (type(None), PGPMessage, PGPKey, PGPUID, PGPSignature, six.string_types, bytes, bytearray)):
+ raise TypeError("Unexpected subject value: {:s}".format(str(type(subject))))
+ if not isinstance(signature, (type(None), PGPSignature)):
+ raise TypeError("Unexpected signature value: {:s}".format(str(type(signature))))
+
+ def _filter_sigs(sigs):
+ _ids = {self.fingerprint.keyid} | set(self.subkeys)
+ return [ sig for sig in sigs if sig.signer in _ids ]
+
+ # collect signature(s)
+ if signature is None:
+ if isinstance(subject, PGPMessage):
+ sspairs += [ (sig, subject.message) for sig in _filter_sigs(subject.signatures) ]
+
+ if isinstance(subject, (PGPUID, PGPKey)):
+ sspairs += [ (sig, subject) for sig in _filter_sigs(subject.__sig__) ]
+
+ if isinstance(subject, PGPKey):
+ # user ids
+ sspairs += [ (sig, uid) for uid in subject.userids for sig in _filter_sigs(uid.__sig__) ]
+ # user attributes
+ sspairs += [ (sig, ua) for ua in subject.userattributes for sig in _filter_sigs(ua.__sig__) ]
+ # subkey binding signatures
+ sspairs += [ (sig, subkey) for subkey in subject.subkeys.values() for sig in _filter_sigs(subkey.__sig__) ]
+
+ elif signature.signer in {self.fingerprint.keyid} | set(self.subkeys):
+ sspairs += [(signature, subject)]
+
+ if len(sspairs) == 0:
+ raise PGPError("No signatures to verify")
+
+ # finally, start verifying signatures
+ sigv = SignatureVerification()
+ for sig, subj in sspairs:
+ if self.fingerprint.keyid != sig.signer and sig.signer in self.subkeys:
+ warnings.warn("Signature was signed with this key's subkey: {:s}. "
+ "Verifying with subkey...".format(sig.signer),
+ stacklevel=2)
+ sigv &= self.subkeys[sig.signer].verify(subj, sig)
+
+ else:
+ verified = self._key.verify(sig.hashdata(subj), sig.__sig__, getattr(hashes, sig.hash_algorithm.name)())
+ if verified is NotImplemented:
+ raise NotImplementedError(sig.key_algorithm)
+
+ sigv.add_sigsubj(sig, self.fingerprint.keyid, subj, verified)
+
+ return sigv
+
+ @KeyAction(KeyFlags.EncryptCommunications, KeyFlags.EncryptStorage, is_public=True)
+ def encrypt(self, message, sessionkey=None, **prefs):
+ """
+ Encrypt a PGPMessage using this key.
+
+ :param message: The message to encrypt.
+ :type message: :py:obj:`PGPMessage`
+ :optional param sessionkey: Provide a session key to use when encrypting something. Default is ``None``.
+ If ``None``, a session key of the appropriate length will be generated randomly.
+
+ .. warning::
+
+ Care should be taken when making use of this option! Session keys *absolutely need*
+ to be unpredictable! Use the ``gen_key()`` method on the desired
+ :py:obj:`~constants.SymmetricKeyAlgorithm` to generate the session key!
+ :type sessionkey: ``bytes``, ``str``
+
+ :raises: :py:exc:`~errors.PGPEncryptionError` if encryption failed for any reason.
+ :returns: A new :py:obj:`PGPMessage` with the encrypted contents of ``message``
+
+ The following optional keyword arguments can be used with :py:meth:`PGPKey.encrypt`:
+
+ :keyword cipher: Specifies the symmetric block cipher to use when encrypting the message.
+ :type cipher: :py:obj:`~constants.SymmetricKeyAlgorithm`
+ :keyword user: Specifies the User ID to use as the recipient for this encryption operation, for the purposes of
+ preference defaults and selection validation.
+ :type user: ``str``, ``unicode``
+ """
+ user = prefs.pop('user', None)
+ uid = None
+ if user is not None:
+ uid = self.get_uid(user)
+ else:
+ uid = next(iter(self.userids), None)
+ if uid is None and self.parent is not None:
+ uid = next(iter(self.parent.userids), None)
+ cipher_algo = prefs.pop('cipher', uid.selfsig.cipherprefs[0])
+
+ if cipher_algo not in uid.selfsig.cipherprefs:
+ warnings.warn("Selected symmetric algorithm not in key preferences", stacklevel=3)
+
+ if message.is_compressed and message._compression not in uid.selfsig.compprefs:
+ warnings.warn("Selected compression algorithm not in key preferences", stacklevel=3)
+
+ if sessionkey is None:
+ sessionkey = cipher_algo.gen_key()
+
+ # set up a new PKESessionKeyV3
+ pkesk = PKESessionKeyV3()
+ pkesk.encrypter = bytearray(binascii.unhexlify(self.fingerprint.keyid.encode('latin-1')))
+ pkesk.pkalg = self.key_algorithm
+ # pkesk.encrypt_sk(self.__key__, cipher_algo, sessionkey)
+ pkesk.encrypt_sk(self._key, cipher_algo, sessionkey)
+
+ if message.is_encrypted: # pragma: no cover
+ _m = message
+
+ else:
+ _m = PGPMessage()
+ skedata = IntegrityProtectedSKEDataV1()
+ skedata.encrypt(sessionkey, cipher_algo, message.__bytes__())
+ _m |= skedata
+
+ _m |= pkesk
+
+ return _m
+
+ def decrypt(self, message):
+ """
+ Decrypt a PGPMessage using this key.
+
+ :param message: An encrypted :py:obj:`PGPMessage`
+ :returns: A new :py:obj:`PGPMessage` with the decrypted contents of ``message``
+ """
+ if not message.is_encrypted:
+ warnings.warn("This message is not encrypted", stacklevel=2)
+ return message
+
+ if self.fingerprint.keyid not in message.issuers:
+ sks = set(self.subkeys)
+ mis = set(message.issuers)
+ if sks & mis:
+ skid = list(sks & mis)[0]
+ warnings.warn("Message was encrypted with this key's subkey: {:s}. "
+ "Decrypting with that...".format(skid),
+ stacklevel=2)
+ return self.subkeys[skid].decrypt(message)
+
+ raise PGPError("Cannot decrypt the provided message with this key")
+
+ pkesk = next(pk for pk in message._sessionkeys if pk.pkalg == self.key_algorithm and pk.encrypter == self.fingerprint.keyid)
+ alg, key = pkesk.decrypt_sk(self._key)
+
+ # now that we have the symmetric cipher used and the key, we can decrypt the actual message
+ decmsg = PGPMessage()
+ decmsg.parse(message.message.decrypt(key, alg))
+
+ return decmsg
+
+ def parse(self, data):
+ unarmored = self.ascii_unarmor(data)
+ data = unarmored['body']
+
+ if unarmored['magic'] is not None and 'KEY' not in unarmored['magic']:
+ raise ValueError('Expected: KEY. Got: {}'.format(str(unarmored['magic'])))
+
+ if unarmored['headers'] is not None:
+ self.ascii_headers = unarmored['headers']
+
+ # parse packets
+ # keys will hold other keys parsed here
+ keys = collections.OrderedDict()
+ # orphaned will hold all non-opaque orphaned packets
+ orphaned = []
+ # last holds the last non-signature thing processed
+
+ ##TODO: see issue #141 and fix this better
+ getpkt = lambda d: Packet(d) if len(d) > 0 else None # flake8: noqa
+ # some packets are filtered out
+ getpkt = filter(lambda p: p.header.tag != PacketTag.Trust, iter(functools.partial(getpkt, data), None))
+
+ def pktgrouper():
+ class PktGrouper(object):
+ def __init__(self):
+ self.last = None
+
+ def __call__(self, pkt):
+ if pkt.header.tag != PacketTag.Signature:
+ self.last = '{:02X}_{:s}'.format(id(pkt), pkt.__class__.__name__)
+ return self.last
+ return PktGrouper()
+
+ while True:
+ # print(type(p) for p in getpkt)
+ for group in iter(group for _, group in itertools.groupby(getpkt, key=pktgrouper()) if not _.endswith('Opaque')):
+ pkt = next(group)
+
+ # deal with pkt first
+ if isinstance(pkt, Key):
+ pgpobj = (self if self._key is None else PGPKey()) | pkt
+
+ elif isinstance(pkt, (UserID, UserAttribute)):
+ pgpobj = PGPUID() | pkt
+
+ else: # pragma: no cover
+ break
+
+ # add signatures to whatever we got
+ [ operator.ior(pgpobj, PGPSignature() | sig) for sig in group if not isinstance(sig, Opaque) ]
+
+ # and file away pgpobj
+ if isinstance(pgpobj, PGPKey):
+ if pgpobj.is_primary:
+ keys[(pgpobj.fingerprint.keyid, pgpobj.is_public)] = pgpobj
+
+ else:
+ keys[next(reversed(keys))] |= pgpobj
+
+ elif isinstance(pgpobj, PGPUID):
+ # parent is likely the most recently parsed primary key
+ keys[next(reversed(keys))] |= pgpobj
+
+ else: # pragma: no cover
+ break
+ else:
+ # finished normally
+ break
+
+ # this will only be reached called if the inner loop hit a break
+ warnings.warn("Warning: Orphaned packet detected! {:s}".format(repr(pkt)), stacklevel=2) # pragma: no cover
+ orphaned.append(pkt) # pragma: no cover
+ for pkt in group: # pragma: no cover
+ orphaned.append(pkt)
+
+ # remove the reference to self from keys
+ [ keys.pop((getattr(self, 'fingerprint.keyid', '~'), None), t) for t in (True, False) ]
+ # return {'keys': keys, 'orphaned': orphaned}
+ return keys
+
+
+class PGPKeyring(collections.Container, collections.Iterable, collections.Sized):
+ def __init__(self, *args):
+ """
+ PGPKeyring objects represent in-memory keyrings that can contain any combination of supported private and public
+ keys. It can not currently be conveniently exported to a format that can be understood by GnuPG.
+ """
+ super(PGPKeyring, self).__init__()
+ self._keys = {}
+ self._pubkeys = collections.deque()
+ self._privkeys = collections.deque()
+ self._aliases = collections.deque([{}])
+ self.load(*args)
+
+ def __contains__(self, alias):
+ aliases = set().union(*self._aliases)
+
+ if isinstance(alias, six.string_types):
+ return alias in aliases or alias.replace(' ', '') in aliases
+
+ return alias in aliases # pragma: no cover
+
+ def __len__(self):
+ return len(self._keys)
+
+ def __iter__(self): # pragma: no cover
+ for pgpkey in itertools.chain(self._pubkeys, self._privkeys):
+ yield pgpkey
+
+ def _get_key(self, alias):
+ for m in self._aliases:
+ if alias in m:
+ return self._keys[m[alias]]
+
+ if alias.replace(' ', '') in m:
+ return self._keys[m[alias.replace(' ', '')]]
+
+ raise KeyError(alias)
+
+ def _get_keys(self, alias):
+ return [self._keys[m[alias]] for m in self._aliases if alias in m]
+
+ def _sort_alias(self, alias):
+ # remove alias from all levels of _aliases, and sort by created time and key half
+ # so the order of _aliases from left to right:
+ # - newer keys come before older ones
+ # - private keys come before public ones
+ #
+ # this list is sorted in the opposite direction from that, because they will be placed into self._aliases
+ # from right to left.
+ pkids = sorted(list(set().union(m.pop(alias) for m in self._aliases if alias in m)),
+ key=lambda pkid: (self._keys[pkid].created, self._keys[pkid].is_public))
+
+ # drop the now-sorted aliases into place
+ for depth, pkid in enumerate(pkids):
+ self._aliases[depth][alias] = pkid
+
+ # finally, remove any empty dicts left over
+ while {} in self._aliases: # pragma: no cover
+ self._aliases.remove({})
+
+ def _add_alias(self, alias, pkid):
+ # brand new alias never seen before!
+ if alias not in self:
+ self._aliases[-1][alias] = pkid
+
+ # this is a duplicate alias->key link; ignore it
+ elif alias in self and pkid in set(m[alias] for m in self._aliases if alias in m):
+ pass # pragma: no cover
+
+ # this is an alias that already exists, but points to a key that is not already referenced by it
+ else:
+ adepth = len(self._aliases) - len([None for m in self._aliases if alias in m]) - 1
+ # all alias maps have this alias, so increase total depth by 1
+ if adepth == -1:
+ self._aliases.appendleft({})
+ adepth = 0
+
+ self._aliases[adepth][alias] = pkid
+ self._sort_alias(alias)
+
+ def _add_key(self, pgpkey):
+ pkid = id(pgpkey)
+ if pkid not in self._keys:
+ self._keys[pkid] = pgpkey
+
+ # add to _{pub,priv}keys if this is either a primary key, or a subkey without one
+ if pgpkey.parent is None:
+ if pgpkey.is_public:
+ self._pubkeys.append(pkid)
+
+ else:
+ self._privkeys.append(pkid)
+
+ # aliases
+ self._add_alias(pgpkey.fingerprint, pkid)
+ self._add_alias(pgpkey.fingerprint.keyid, pkid)
+ self._add_alias(pgpkey.fingerprint.shortid, pkid)
+ for uid in pgpkey.userids:
+ self._add_alias(uid.name, pkid)
+ if uid.comment:
+ self._add_alias(uid.comment, pkid)
+
+ if uid.email:
+ self._add_alias(uid.email, pkid)
+
+ # subkeys
+ for subkey in pgpkey.subkeys.values():
+ self._add_key(subkey)
+
+ def load(self, *args):
+ """
+ Load all keys provided into this keyring object.
+
+ :param \*args: Each arg in ``args`` can be any of the formats supported by :py:meth:`PGPKey.from_path` and
+ :py:meth:`PGPKey.from_blob`, or a ``list`` or ``tuple`` of these.
+ :type \*args: ``list``, ``tuple``, ``str``, ``unicode``, ``bytes``, ``bytearray``
+ :returns: a ``set`` containing the unique fingerprints of all of the keys that were loaded during this operation.
+ """
+ def _preiter(first, iterable):
+ yield first
+ for item in iterable:
+ yield item
+
+ loaded = set()
+ for key in iter(item for ilist in iter(ilist if isinstance(ilist, (tuple, list)) else [ilist] for ilist in args)
+ for item in ilist):
+ if os.path.isfile(key):
+ _key, keys = PGPKey.from_file(key)
+
+ else:
+ _key, keys = PGPKey.from_blob(key)
+
+ for ik in _preiter(_key, keys.values()):
+ self._add_key(ik)
+ loaded |= {ik.fingerprint} | {isk.fingerprint for isk in ik.subkeys.values()}
+
+ return list(loaded)
+
+ @contextlib.contextmanager
+ def key(self, identifier):
+ """
+ A context-manager method. Yields the first :py:obj:`PGPKey` object that matches the provided identifier.
+
+ :param identifier: The identifier to use to select a loaded key.
+ :type identifier: :py:exc:`PGPMessage`, :py:exc:`PGPSignature`, ``str``
+ :raises: :py:exc:`KeyError` if there is no loaded key that satisfies the identifier.
+ """
+ if isinstance(identifier, PGPMessage):
+ for issuer in identifier.issuers:
+ if issuer in self:
+ identifier = issuer
+ break
+
+ if isinstance(identifier, PGPSignature):
+ identifier = identifier.signer
+
+ yield self._get_key(identifier)
+
+ def fingerprints(self, keyhalf='any', keytype='any'):
+ """
+ List loaded fingerprints with some optional filtering.
+
+ :param str keyhalf: Can be 'any', 'public', or 'private'. If 'public', or 'private', the fingerprints of keys of the
+ the other type will not be included in the results.
+ :param str keytype: Can be 'any', 'primary', or 'sub'. If 'primary' or 'sub', the fingerprints of keys of the
+ the other type will not be included in the results.
+ :returns: a ``set`` of fingerprints of keys matching the filters specified.
+ """
+ return {pk.fingerprint for pk in self._keys.values()
+ if pk.is_primary in [True if keytype in ['primary', 'any'] else None,
+ False if keytype in ['sub', 'any'] else None]
+ if pk.is_public in [True if keyhalf in ['public', 'any'] else None,
+ False if keyhalf in ['private', 'any'] else None]}
+
+ def unload(self, key):
+ """
+ Unload a loaded key and its subkeys.
+
+ The easiest way to do this is to select a key using :py:meth:`PGPKeyring.key` first::
+
+ with keyring.key("DSA von TestKey") as key:
+ keyring.unload(key)
+
+ :param key: The key to unload.
+ :type key: :py:obj:`PGPKey`
+ """
+ assert isinstance(key, PGPKey)
+ pkid = id(key)
+ if pkid in self._keys:
+ # remove references
+ [ kd.remove(pkid) for kd in [self._pubkeys, self._privkeys] if pkid in kd ]
+ # remove the key
+ self._keys.pop(pkid)
+
+ # remove aliases
+ for m, a in [ (m, a) for m in self._aliases for a, p in m.items() if p == pkid ]:
+ m.pop(a)
+ # do a re-sort of this alias if it was not unique
+ if a in self:
+ self._sort_alias(a)
+
+ # if key is a primary key, unload its subkeys as well
+ if key.is_primary:
+ [ self.unload(sk) for sk in key.subkeys.values() ]