path: root/src/leap/mx/vendor/pgpy/packet/
diff options
authorKali Kaneko <>2017-06-26 15:06:41 +0200
committerKali Kaneko <>2017-06-27 10:33:48 +0200
commit59504c7ddf7aab71614d691e705d386f58b5100d (patch)
treeea6afce37890a23cd088eac50587c3293035e78c /src/leap/mx/vendor/pgpy/packet/
parente82b8143d3e9b2f62650e06798eee262885036c2 (diff)
[pkg] vendor pgpy 0.4.1
Diffstat (limited to 'src/leap/mx/vendor/pgpy/packet/')
1 files changed, 1514 insertions, 0 deletions
diff --git a/src/leap/mx/vendor/pgpy/packet/ b/src/leap/mx/vendor/pgpy/packet/
new file mode 100644
index 0000000..b5b5f2e
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/packet/
@@ -0,0 +1,1514 @@
+from __future__ import absolute_import, division
+import abc
+import binascii
+import collections
+import copy
+import hashlib
+import itertools
+import math
+import os
+from pyasn1.codec.der import decoder
+from pyasn1.codec.der import encoder
+from pyasn1.type.univ import Integer
+from pyasn1.type.univ import Sequence
+from cryptography.exceptions import InvalidSignature
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.asymmetric import dsa
+from cryptography.hazmat.primitives.asymmetric import ec
+from cryptography.hazmat.primitives.asymmetric import padding
+from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash
+from cryptography.hazmat.primitives.keywrap import aes_key_wrap
+from cryptography.hazmat.primitives.keywrap import aes_key_unwrap
+from cryptography.hazmat.primitives.padding import PKCS7
+from .subpackets import Signature as SignatureSP
+from .subpackets import UserAttribute
+from .subpackets import signature
+from .subpackets import userattribute
+from .types import MPI
+from .types import MPIs
+from ..constants import EllipticCurveOID
+from ..constants import HashAlgorithm
+from ..constants import PubKeyAlgorithm
+from ..constants import String2KeyType
+from ..constants import SymmetricKeyAlgorithm
+from ..decorators import sdproperty
+from ..errors import PGPDecryptionError
+from ..errors import PGPError
+from ..symenc import _decrypt
+from ..symenc import _encrypt
+from ..types import Field
+__all__ = ['SubPackets',
+ 'UserAttributeSubPackets',
+ 'Signature',
+ 'RSASignature',
+ 'DSASignature',
+ 'ECDSASignature',
+ 'PubKey',
+ 'OpaquePubKey',
+ 'RSAPub',
+ 'DSAPub',
+ 'ElGPub',
+ 'ECDSAPub',
+ 'ECDHPub',
+ 'String2Key',
+ 'ECKDF',
+ 'PrivKey',
+ 'OpaquePrivKey',
+ 'RSAPriv',
+ 'DSAPriv',
+ 'ElGPriv',
+ 'ECDSAPriv',
+ 'ECDHPriv',
+ 'CipherText',
+ 'RSACipherText',
+ 'ElGCipherText',
+ 'ECDHCipherText', ]
+class SubPackets(collections.MutableMapping, Field):
+ _spmodule = signature
+ def __init__(self):
+ super(SubPackets, self).__init__()
+ self._hashed_sp = collections.OrderedDict()
+ self._unhashed_sp = collections.OrderedDict()
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += self.__hashbytearray__()
+ _bytes += self.__unhashbytearray__()
+ return _bytes
+ def __hashbytearray__(self):
+ _bytes = bytearray()
+ _bytes += self.int_to_bytes(sum(len(sp) for sp in self._hashed_sp.values()), 2)
+ for hsp in self._hashed_sp.values():
+ _bytes += hsp.__bytearray__()
+ return _bytes
+ def __unhashbytearray__(self):
+ _bytes = bytearray()
+ _bytes += self.int_to_bytes(sum(len(sp) for sp in self._unhashed_sp.values()), 2)
+ for uhsp in self._unhashed_sp.values():
+ _bytes += uhsp.__bytearray__()
+ return _bytes
+ def __len__(self): # pragma: no cover
+ return sum(sp.header.length for sp in itertools.chain(self._hashed_sp.values(), self._unhashed_sp.values())) + 4
+ def __iter__(self):
+ for sp in itertools.chain(self._hashed_sp.values(), self._unhashed_sp.values()):
+ yield sp
+ def __setitem__(self, key, val):
+ # the key provided should always be the classname for the subpacket
+ # but, there can be multiple subpackets of the same type
+ # so, it should be stored in the format: [h_]<key>_<seqid>
+ # where:
+ # - <key> is the classname of val
+ # - <seqid> is a sequence id, starting at 0, for a given classname
+ i = 0
+ if isinstance(key, tuple): # pragma: no cover
+ key, i = key
+ d = self._unhashed_sp
+ if key.startswith('h_'):
+ d, key = self._hashed_sp, key[2:]
+ while (key, i) in d:
+ i += 1
+ d[(key, i)] = val
+ def __getitem__(self, key):
+ if isinstance(key, tuple): # pragma: no cover
+ return self._hashed_sp.get(key, self._unhashed_sp.get(key))
+ if key.startswith('h_'):
+ return [v for k, v in self._hashed_sp.items() if key[2:] == k[0]]
+ else:
+ return [v for k, v in itertools.chain(self._hashed_sp.items(), self._unhashed_sp.items()) if key == k[0]]
+ def __delitem__(self, key):
+ ##TODO: this
+ raise NotImplementedError
+ def __contains__(self, key):
+ return key in set(k for k, _ in itertools.chain(self._hashed_sp, self._unhashed_sp))
+ def __copy__(self):
+ sp = SubPackets()
+ sp._hashed_sp = self._hashed_sp.copy()
+ sp._unhashed_sp = self._unhashed_sp.copy()
+ return sp
+ def addnew(self, spname, hashed=False, **kwargs):
+ nsp = getattr(self._spmodule, spname)()
+ for p, v in kwargs.items():
+ if hasattr(nsp, p):
+ setattr(nsp, p, v)
+ nsp.update_hlen()
+ if hashed:
+ self['h_' + spname] = nsp
+ else:
+ self[spname] = nsp
+ def update_hlen(self):
+ for sp in self:
+ sp.update_hlen()
+ def parse(self, packet):
+ hl = self.bytes_to_int(packet[:2])
+ del packet[:2]
+ # we do it this way because we can't ensure that subpacket headers are sized appropriately
+ # for their contents, but we can at least output that correctly
+ # so instead of tracking how many bytes we can now output, we track how many bytes have we parsed so far
+ plen = len(packet)
+ while plen - len(packet) < hl:
+ sp = SignatureSP(packet)
+ self['h_' + sp.__class__.__name__] = sp
+ uhl = self.bytes_to_int(packet[:2])
+ del packet[:2]
+ plen = len(packet)
+ while plen - len(packet) < uhl:
+ sp = SignatureSP(packet)
+ self[sp.__class__.__name__] = sp
+class UserAttributeSubPackets(SubPackets):
+ """
+ This is nearly the same as just the unhashed subpackets from above,
+ except that there isn't a length specifier. So, parse will only parse one packet,
+ appending that one packet to self.__unhashed_sp.
+ """
+ _spmodule = userattribute
+ def __bytearray__(self):
+ _bytes = bytearray()
+ for uhsp in self._unhashed_sp.values():
+ _bytes += uhsp.__bytearray__()
+ return _bytes
+ def __len__(self): # pragma: no cover
+ return sum(len(sp) for sp in self._unhashed_sp.values())
+ def parse(self, packet):
+ # parse just one packet and add it to the unhashed subpacket ordereddict
+ # I actually have yet to come across a User Attribute packet with more than one subpacket
+ # which makes sense, given that there is only one defined subpacket
+ sp = UserAttribute(packet)
+ self[sp.__class__.__name__] = sp
+class Signature(MPIs):
+ def __init__(self):
+ for i in self.__mpis__:
+ setattr(self, i, MPI(0))
+ def __bytearray__(self):
+ _bytes = bytearray()
+ for i in self:
+ _bytes += i.to_mpibytes()
+ return _bytes
+ @abc.abstractproperty
+ def __sig__(self):
+ """return the signature bytes in a format that can be understood by the signature verifier"""
+ @abc.abstractmethod
+ def from_signer(self, sig):
+ """create and parse a concrete Signature class instance"""
+class RSASignature(Signature):
+ __mpis__ = ('md_mod_n', )
+ def __sig__(self):
+ return self.md_mod_n.to_mpibytes()[2:]
+ def parse(self, packet):
+ self.md_mod_n = MPI(packet)
+ def from_signer(self, sig):
+ self.md_mod_n = MPI(self.bytes_to_int(sig))
+class DSASignature(Signature):
+ __mpis__ = ('r', 's')
+ def __sig__(self):
+ # return the signature data into an ASN.1 sequence of integers in DER format
+ seq = Sequence()
+ for i in self:
+ seq.setComponentByPosition(len(seq), Integer(i))
+ return encoder.encode(seq)
+ def from_signer(self, sig):
+ ##TODO: just use pyasn1 for this
+ def _der_intf(_asn):
+ if _asn[0] != 0x02: # pragma: no cover
+ raise ValueError("Expected: Integer (0x02). Got: 0x{:02X}".format(_asn[0]))
+ del _asn[0]
+ if _asn[0] & 0x80: # pragma: no cover
+ llen = _asn[0] & 0x7F
+ del _asn[0]
+ flen = self.bytes_to_int(_asn[:llen])
+ del _asn[:llen]
+ else:
+ flen = _asn[0] & 0x7F
+ del _asn[0]
+ i = self.bytes_to_int(_asn[:flen])
+ del _asn[:flen]
+ return i
+ if isinstance(sig, bytes):
+ sig = bytearray(sig)
+ # this is a very limited asn1 decoder - it is only intended to decode a DER encoded sequence of integers
+ if not sig[0] == 0x30:
+ raise NotImplementedError("Expected: Sequence (0x30). Got: 0x{:02X}".format(sig[0]))
+ del sig[0]
+ # skip the sequence length field
+ if sig[0] & 0x80: # pragma: no cover
+ llen = sig[0] & 0x7F
+ del sig[:llen + 1]
+ else:
+ del sig[0]
+ self.r = MPI(_der_intf(sig))
+ self.s = MPI(_der_intf(sig))
+ def parse(self, packet):
+ self.r = MPI(packet)
+ self.s = MPI(packet)
+class ECDSASignature(DSASignature):
+ def from_signer(self, sig):
+ seq, _ = decoder.decode(sig)
+ self.r = MPI(seq[0])
+ self.s = MPI(seq[1])
+class PubKey(MPIs):
+ __pubfields__ = ()
+ @property
+ def __mpis__(self):
+ for i in self.__pubfields__:
+ yield i
+ def __init__(self):
+ super(PubKey, self).__init__()
+ for field in self.__pubfields__:
+ if isinstance(field, tuple): # pragma: no cover
+ field, val = field
+ else:
+ val = MPI(0)
+ setattr(self, field, val)
+ @abc.abstractmethod
+ def __pubkey__(self):
+ """return the requisite *PublicKey class from the cryptography library"""
+ def __len__(self):
+ return sum(len(getattr(self, i)) for i in self.__pubfields__)
+ def __bytearray__(self):
+ _bytes = bytearray()
+ for field in self.__pubfields__:
+ _bytes += getattr(self, field).to_mpibytes()
+ return _bytes
+ def publen(self):
+ return len(self)
+ def verify(self, subj, sigbytes, hash_alg):
+ return NotImplemented # pragma: no cover
+class OpaquePubKey(PubKey): # pragma: no cover
+ def __init__(self):
+ super(OpaquePubKey, self).__init__()
+ = bytearray()
+ def __iter__(self):
+ yield
+ def __pubkey__(self):
+ return NotImplemented
+ def __bytearray__(self):
+ return
+ def parse(self, packet):
+ ##TODO: this needs to be length-bounded to the end of the packet
+ = packet
+class RSAPub(PubKey):
+ __pubfields__ = ('n', 'e')
+ def __pubkey__(self):
+ return rsa.RSAPublicNumbers(self.e, self.n).public_key(default_backend())
+ def verify(self, subj, sigbytes, hash_alg):
+ # zero-pad sigbytes if necessary
+ sigbytes = (b'\x00' * (self.n.byte_length() - len(sigbytes))) + sigbytes
+ verifier = self.__pubkey__().verifier(sigbytes, padding.PKCS1v15(), hash_alg)
+ verifier.update(subj)
+ try:
+ verifier.verify()
+ except InvalidSignature:
+ return False
+ return True
+ def parse(self, packet):
+ self.n = MPI(packet)
+ self.e = MPI(packet)
+class DSAPub(PubKey):
+ __pubfields__ = ('p', 'q', 'g', 'y')
+ def __pubkey__(self):
+ params = dsa.DSAParameterNumbers(self.p, self.q, self.g)
+ return dsa.DSAPublicNumbers(self.y, params).public_key(default_backend())
+ def verify(self, subj, sigbytes, hash_alg):
+ verifier = self.__pubkey__().verifier(sigbytes, hash_alg)
+ verifier.update(subj)
+ try:
+ verifier.verify()
+ except InvalidSignature:
+ return False
+ return True
+ def parse(self, packet):
+ self.p = MPI(packet)
+ self.q = MPI(packet)
+ self.g = MPI(packet)
+ self.y = MPI(packet)
+class ElGPub(PubKey):
+ __pubfields__ = ('p', 'g', 'y')
+ def __pubkey__(self):
+ raise NotImplementedError()
+ def parse(self, packet):
+ self.p = MPI(packet)
+ self.g = MPI(packet)
+ self.y = MPI(packet)
+class ECDSAPub(PubKey):
+ __pubfields__ = ('x', 'y')
+ def __init__(self):
+ super(ECDSAPub, self).__init__()
+ self.oid = None
+ def __len__(self):
+ return sum([len(getattr(self, i)) - 2 for i in self.__pubfields__] +
+ [3, len(encoder.encode(self.oid.value)) - 1])
+ def __pubkey__(self):
+ return ec.EllipticCurvePublicNumbers(self.x, self.y, self.oid.curve()).public_key(default_backend())
+ def __bytearray__(self):
+ _b = bytearray()
+ _b += encoder.encode(self.oid.value)[1:]
+ # 0x04 || x || y
+ # where x and y are the same length
+ _xy = b'\x04' + self.x.to_mpibytes()[2:] + self.y.to_mpibytes()[2:]
+ _b += MPI(self.bytes_to_int(_xy, 'big')).to_mpibytes()
+ return _b
+ def __copy__(self):
+ pkt = super(ECDSAPub, self).__copy__()
+ pkt.oid = self.oid
+ return pkt
+ def verify(self, subj, sigbytes, hash_alg):
+ verifier = self.__pubkey__().verifier(sigbytes, ec.ECDSA(hash_alg))
+ verifier.update(subj)
+ try:
+ verifier.verify()
+ except InvalidSignature:
+ return False
+ return True
+ def parse(self, packet):
+ oidlen = packet[0]
+ del packet[0]
+ _oid = bytearray(b'\x06')
+ _oid.append(oidlen)
+ _oid += bytearray(packet[:oidlen])
+ # try:
+ oid, _ = decoder.decode(bytes(_oid))
+ # except:
+ # raise PGPError("Bad OID octet stream: b'{:s}'".format(''.join(['\\x{:02X}'.format(c) for c in _oid])))
+ self.oid = EllipticCurveOID(oid)
+ del packet[:oidlen]
+ # flen = (self.oid.bit_length // 8)
+ xy = bytearray(MPI(packet).to_mpibytes()[2:])
+ # xy = bytearray(MPI(packet).to_bytes(flen, 'big'))
+ # the first byte is just \x04
+ del xy[:1]
+ # now xy needs to be separated into x, y
+ xylen = len(xy)
+ x, y = xy[:xylen // 2], xy[xylen // 2:]
+ self.x = MPI(self.bytes_to_int(x))
+ self.y = MPI(self.bytes_to_int(y))
+class ECDHPub(PubKey):
+ __pubfields__ = ('x', 'y')
+ def __init__(self):
+ super(ECDHPub, self).__init__()
+ self.oid = None
+ self.kdf = ECKDF()
+ def __len__(self):
+ return sum([len(getattr(self, i)) - 2 for i in self.__pubfields__] +
+ [3,
+ len(self.kdf),
+ len(encoder.encode(self.oid.value)) - 1])
+ def __pubkey__(self):
+ return ec.EllipticCurvePublicNumbers(self.x, self.y, self.oid.curve()).public_key(default_backend())
+ def __bytearray__(self):
+ _b = bytearray()
+ _b += encoder.encode(self.oid.value)[1:]
+ # 0x04 || x || y
+ # where x and y are the same length
+ _xy = b'\x04' + self.x.to_mpibytes()[2:] + self.y.to_mpibytes()[2:]
+ _b += MPI(self.bytes_to_int(_xy, 'big')).to_mpibytes()
+ _b += self.kdf.__bytearray__()
+ return _b
+ def __copy__(self):
+ pkt = super(ECDHPub, self).__copy__()
+ pkt.oid = self.oid
+ pkt.kdf = copy.copy(self.kdf)
+ return pkt
+ def parse(self, packet):
+ """
+ Algorithm-Specific Fields for ECDH keys:
+ o a variable-length field containing a curve OID, formatted
+ as follows:
+ - a one-octet size of the following field; values 0 and
+ 0xFF are reserved for future extensions
+ - the octets representing a curve OID, defined in
+ Section 11
+ - MPI of an EC point representing a public key
+ o a variable-length field containing KDF parameters,
+ formatted as follows:
+ - a one-octet size of the following fields; values 0 and
+ 0xff are reserved for future extensions
+ - a one-octet value 01, reserved for future extensions
+ - a one-octet hash function ID used with a KDF
+ - a one-octet algorithm ID for the symmetric algorithm
+ used to wrap the symmetric key used for the message
+ encryption; see Section 8 for details
+ """
+ oidlen = packet[0]
+ del packet[0]
+ _oid = bytearray(b'\x06')
+ _oid.append(oidlen)
+ _oid += bytearray(packet[:oidlen])
+ # try:
+ oid, _ = decoder.decode(bytes(_oid))
+ # except:
+ # raise PGPError("Bad OID octet stream: b'{:s}'".format(''.join(['\\x{:02X}'.format(c) for c in _oid])))
+ self.oid = EllipticCurveOID(oid)
+ del packet[:oidlen]
+ # flen = (self.oid.bit_length // 8)
+ xy = bytearray(MPI(packet).to_mpibytes()[2:])
+ # xy = bytearray(MPI(packet).to_bytes(flen, 'big'))
+ # the first byte is just \x04
+ del xy[:1]
+ # now xy needs to be separated into x, y
+ xylen = len(xy)
+ x, y = xy[:xylen // 2], xy[xylen // 2:]
+ self.x = MPI(self.bytes_to_int(x))
+ self.y = MPI(self.bytes_to_int(y))
+ self.kdf.parse(packet)
+class String2Key(Field):
+ """
+ 3.7. String-to-Key (S2K) Specifiers
+ String-to-key (S2K) specifiers are used to convert passphrase strings
+ into symmetric-key encryption/decryption keys. They are used in two
+ places, currently: to encrypt the secret part of private keys in the
+ private keyring, and to convert passphrases to encryption keys for
+ symmetrically encrypted messages.
+ 3.7.1. String-to-Key (S2K) Specifier Types
+ There are three types of S2K specifiers currently supported, and
+ some reserved values:
+ ID S2K Type
+ -- --------
+ 0 Simple S2K
+ 1 Salted S2K
+ 2 Reserved value
+ 3 Iterated and Salted S2K
+ 100 to 110 Private/Experimental S2K
+ These are described in Sections -
+ Simple S2K
+ This directly hashes the string to produce the key data. See below
+ for how this hashing is done.
+ Octet 0: 0x00
+ Octet 1: hash algorithm
+ Simple S2K hashes the passphrase to produce the session key. The
+ manner in which this is done depends on the size of the session key
+ (which will depend on the cipher used) and the size of the hash
+ algorithm's output. If the hash size is greater than the session key
+ size, the high-order (leftmost) octets of the hash are used as the
+ key.
+ If the hash size is less than the key size, multiple instances of the
+ hash context are created -- enough to produce the required key data.
+ These instances are preloaded with 0, 1, 2, ... octets of zeros (that
+ is to say, the first instance has no preloading, the second gets
+ preloaded with 1 octet of zero, the third is preloaded with two
+ octets of zeros, and so forth).
+ As the data is hashed, it is given independently to each hash
+ context. Since the contexts have been initialized differently, they
+ will each produce different hash output. Once the passphrase is
+ hashed, the output data from the multiple hashes is concatenated,
+ first hash leftmost, to produce the key data, with any excess octets
+ on the right discarded.
+ Salted S2K
+ This includes a "salt" value in the S2K specifier -- some arbitrary
+ data -- that gets hashed along with the passphrase string, to help
+ prevent dictionary attacks.
+ Octet 0: 0x01
+ Octet 1: hash algorithm
+ Octets 2-9: 8-octet salt value
+ Salted S2K is exactly like Simple S2K, except that the input to the
+ hash function(s) consists of the 8 octets of salt from the S2K
+ specifier, followed by the passphrase.
+ Iterated and Salted S2K
+ This includes both a salt and an octet count. The salt is combined
+ with the passphrase and the resulting value is hashed repeatedly.
+ This further increases the amount of work an attacker must do to try
+ dictionary attacks.
+ Octet 0: 0x03
+ Octet 1: hash algorithm
+ Octets 2-9: 8-octet salt value
+ Octet 10: count, a one-octet, coded value
+ The count is coded into a one-octet number using the following
+ formula:
+ #define EXPBIAS 6
+ count = ((Int32)16 + (c & 15)) << ((c >> 4) + EXPBIAS);
+ The above formula is in C, where "Int32" is a type for a 32-bit
+ integer, and the variable "c" is the coded count, Octet 10.
+ Iterated-Salted S2K hashes the passphrase and salt data multiple
+ times. The total number of octets to be hashed is specified in the
+ encoded count in the S2K specifier. Note that the resulting count
+ value is an octet count of how many octets will be hashed, not an
+ iteration count.
+ Initially, one or more hash contexts are set up as with the other S2K
+ algorithms, depending on how many octets of key data are needed.
+ Then the salt, followed by the passphrase data, is repeatedly hashed
+ until the number of octets specified by the octet count has been
+ hashed. The one exception is that if the octet count is less than
+ the size of the salt plus passphrase, the full salt plus passphrase
+ will be hashed even though that is greater than the octet count.
+ After the hashing is done, the data is unloaded from the hash
+ context(s) as with the other S2K algorithms.
+ """
+ @sdproperty
+ def encalg(self):
+ return self._encalg
+ @encalg.register(int)
+ @encalg.register(SymmetricKeyAlgorithm)
+ def encalg_int(self, val):
+ self._encalg = SymmetricKeyAlgorithm(val)
+ @sdproperty
+ def specifier(self):
+ return self._specifier
+ @specifier.register(int)
+ @specifier.register(String2KeyType)
+ def specifier_int(self, val):
+ self._specifier = String2KeyType(val)
+ @sdproperty
+ def halg(self):
+ return self._halg
+ @halg.register(int)
+ @halg.register(HashAlgorithm)
+ def halg_int(self, val):
+ self._halg = HashAlgorithm(val)
+ @sdproperty
+ def count(self):
+ return (16 + (self._count & 15)) << ((self._count >> 4) + 6)
+ @count.register(int)
+ def count_int(self, val):
+ if val < 0 or val > 255: # pragma: no cover
+ raise ValueError("count must be between 0 and 256")
+ self._count = val
+ def __init__(self):
+ super(String2Key, self).__init__()
+ self.usage = 0
+ self.encalg = 0
+ self.specifier = 0
+ self.iv = None
+ # specifier-specific fields
+ # simple, salted, iterated
+ self.halg = 0
+ # salted, iterated
+ self.salt = bytearray()
+ # iterated
+ self.count = 0
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes.append(self.usage)
+ if bool(self):
+ _bytes.append(self.encalg)
+ _bytes.append(self.specifier)
+ if self.specifier >= String2KeyType.Simple:
+ _bytes.append(self.halg)
+ if self.specifier >= String2KeyType.Salted:
+ _bytes += self.salt
+ if self.specifier == String2KeyType.Iterated:
+ _bytes.append(self._count)
+ if self.iv is not None:
+ _bytes += self.iv
+ return _bytes
+ def __len__(self):
+ return len(self.__bytearray__())
+ def __bool__(self):
+ return self.usage in [254, 255]
+ def __nonzero__(self):
+ return self.__bool__()
+ def __copy__(self):
+ s2k = String2Key()
+ s2k.usage = self.usage
+ s2k.encalg = self.encalg
+ s2k.specifier = self.specifier
+ s2k.iv = self.iv
+ s2k.halg = self.halg
+ s2k.salt = copy.copy(self.salt)
+ s2k.count = self._count
+ return s2k
+ def parse(self, packet, iv=True):
+ self.usage = packet[0]
+ del packet[0]
+ if bool(self):
+ self.encalg = packet[0]
+ del packet[0]
+ self.specifier = packet[0]
+ del packet[0]
+ if self.specifier >= String2KeyType.Simple:
+ # this will always be true
+ self.halg = packet[0]
+ del packet[0]
+ if self.specifier >= String2KeyType.Salted:
+ self.salt = packet[:8]
+ del packet[:8]
+ if self.specifier == String2KeyType.Iterated:
+ self.count = packet[0]
+ del packet[0]
+ if iv:
+ self.iv = packet[:(self.encalg.block_size // 8)]
+ del packet[:(self.encalg.block_size // 8)]
+ def derive_key(self, passphrase):
+ ##TODO: raise an exception if self.usage is not 254 or 255
+ keylen = self.encalg.key_size
+ hashlen = self.halg.digest_size * 8
+ ctx = int(math.ceil((keylen / hashlen)))
+ # Simple S2K - always done
+ hsalt = b''
+ hpass = passphrase.encode('latin-1')
+ # salted, iterated S2K
+ if self.specifier >= String2KeyType.Salted:
+ hsalt = bytes(self.salt)
+ count = len(hsalt + hpass)
+ if self.specifier == String2KeyType.Iterated and self.count > len(hsalt + hpass):
+ count = self.count
+ hcount = (count // len(hsalt + hpass))
+ hleft = count - (hcount * len(hsalt + hpass))
+ hashdata = ((hsalt + hpass) * hcount) + (hsalt + hpass)[:hleft]
+ h = []
+ for i in range(0, ctx):
+ _h = self.halg.hasher
+ _h.update(b'\x00' * i)
+ _h.update(hashdata)
+ h.append(_h)
+ # GC some stuff
+ del hsalt
+ del hpass
+ del hashdata
+ # and return the key!
+ return b''.join(hc.digest() for hc in h)[:(keylen // 8)]
+class ECKDF(Field):
+ """
+ o a variable-length field containing KDF parameters,
+ formatted as follows:
+ - a one-octet size of the following fields; values 0 and
+ 0xff are reserved for future extensions
+ - a one-octet value 01, reserved for future extensions
+ - a one-octet hash function ID used with a KDF
+ - a one-octet algorithm ID for the symmetric algorithm
+ used to wrap the symmetric key used for the message
+ encryption; see Section 8 for details
+ """
+ @sdproperty
+ def halg(self):
+ return self._halg
+ @halg.register(int)
+ @halg.register(HashAlgorithm)
+ def halg_int(self, val):
+ self._halg = HashAlgorithm(val)
+ @sdproperty
+ def encalg(self):
+ return self._encalg
+ @encalg.register(int)
+ @encalg.register(SymmetricKeyAlgorithm)
+ def encalg_int(self, val):
+ self._encalg = SymmetricKeyAlgorithm(val)
+ def __init__(self):
+ super(ECKDF, self).__init__()
+ self.halg = 0
+ self.encalg = 0
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes.append(len(self) - 1)
+ _bytes.append(0x01)
+ _bytes.append(self.halg)
+ _bytes.append(self.encalg)
+ return _bytes
+ def __len__(self):
+ return 4
+ def parse(self, packet):
+ # packet[0] should always be 3
+ # packet[1] should always be 1
+ # TODO: this assert is likely not necessary, but we should raise some kind of exception
+ # if parsing fails due to these fields being incorrect
+ assert packet[:2] == b'\x03\x01'
+ del packet[:2]
+ self.halg = packet[0]
+ del packet[0]
+ self.encalg = packet[0]
+ del packet[0]
+ def derive_key(self, s, curve, pkalg, fingerprint):
+ # wrapper around the Concatenation KDF method provided by cryptography
+ # assemble the additional data as defined in RFC 6637:
+ # Param = curve_OID_len || curve_OID || public_key_alg_ID || 03 || 01 || KDF_hash_ID || KEK_alg_ID for AESKeyWrap || "Anonymous
+ data = bytearray()
+ data += encoder.encode(curve.value)[1:]
+ data.append(pkalg)
+ data += b'\x03\x01'
+ data.append(self.halg)
+ data.append(self.encalg)
+ data += b'Anonymous Sender '
+ data += binascii.unhexlify(fingerprint.replace(' ', ''))
+ ckdf = ConcatKDFHash(algorithm=getattr(hashes,, length=self.encalg.key_size // 8, otherinfo=bytes(data), backend=default_backend())
+ return ckdf.derive(s)
+class PrivKey(PubKey):
+ __privfields__ = ()
+ @property
+ def __mpis__(self):
+ for i in super(PrivKey, self).__mpis__:
+ yield i
+ for i in self.__privfields__:
+ yield i
+ def __init__(self):
+ super(PrivKey, self).__init__()
+ self.s2k = String2Key()
+ self.encbytes = bytearray()
+ self.chksum = bytearray()
+ for field in self.__privfields__:
+ setattr(self, field, MPI(0))
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(PrivKey, self).__bytearray__()
+ _bytes += self.s2k.__bytearray__()
+ if self.s2k:
+ _bytes += self.encbytes
+ else:
+ for field in self.__privfields__:
+ _bytes += getattr(self, field).to_mpibytes()
+ if self.s2k.usage == 0:
+ _bytes += self.chksum
+ return _bytes
+ def __len__(self):
+ l = super(PrivKey, self).__len__() + len(self.s2k) + len(self.chksum)
+ if self.s2k:
+ l += len(self.encbytes)
+ else:
+ l += sum(len(getattr(self, i)) for i in self.__privfields__)
+ return l
+ def __copy__(self):
+ pk = super(PrivKey, self).__copy__()
+ pk.s2k = copy.copy(self.s2k)
+ pk.encbytes = copy.copy(self.encbytes)
+ pk.chksum = copy.copy(self.chksum)
+ return pk
+ @abc.abstractmethod
+ def __privkey__(self):
+ """return the requisite *PrivateKey class from the cryptography library"""
+ @abc.abstractmethod
+ def _generate(self, key_size):
+ """Generate a new PrivKey"""
+ def _compute_chksum(self):
+ chs = sum(sum(bytearray(c.to_mpibytes())) for c in self) % 65536
+ self.chksum = bytearray(self.int_to_bytes(chs, 2))
+ def publen(self):
+ return super(PrivKey, self).__len__()
+ def encrypt_keyblob(self, passphrase, enc_alg, hash_alg):
+ # PGPy will only ever use iterated and salted S2k mode
+ self.s2k.usage = 254
+ self.s2k.encalg = enc_alg
+ self.s2k.specifier = String2KeyType.Iterated
+ self.s2k.iv = enc_alg.gen_iv()
+ self.s2k.halg = hash_alg
+ self.s2k.salt = bytearray(os.urandom(8))
+ self.s2k.count = hash_alg.tuned_count
+ # now that String-to-Key is ready to go, derive sessionkey from passphrase
+ # and then unreference passphrase
+ sessionkey = self.s2k.derive_key(passphrase)
+ del passphrase
+ pt = bytearray()
+ for pf in self.__privfields__:
+ pt += getattr(self, pf).to_mpibytes()
+ # append a SHA-1 hash of the plaintext so far to the plaintext
+ pt +='sha1', pt).digest()
+ # encrypt
+ self.encbytes = bytearray(_encrypt(bytes(pt), bytes(sessionkey), enc_alg, bytes(self.s2k.iv)))
+ # delete pt and clear self
+ del pt
+ self.clear()
+ @abc.abstractmethod
+ def decrypt_keyblob(self, passphrase):
+ if not self.s2k: # pragma: no cover
+ # not encrypted
+ return
+ # Encryption/decryption of the secret data is done in CFB mode using
+ # the key created from the passphrase and the Initial Vector from the
+ # packet. A different mode is used with V3 keys (which are only RSA)
+ # than with other key formats. (...)
+ #
+ # With V4 keys, a simpler method is used. All secret MPI values are
+ # encrypted in CFB mode, including the MPI bitcount prefix.
+ # derive the session key from our passphrase, and then unreference passphrase
+ sessionkey = self.s2k.derive_key(passphrase)
+ del passphrase
+ # attempt to decrypt this key
+ pt = _decrypt(bytes(self.encbytes), bytes(sessionkey), self.s2k.encalg, bytes(self.s2k.iv))
+ # check the hash to see if we decrypted successfully or not
+ if self.s2k.usage == 254 and not pt[-20:] =='sha1', pt[:-20]).digest():
+ # if the usage byte is 254, key material is followed by a 20-octet sha-1 hash of the rest
+ # of the key material block
+ raise PGPDecryptionError("Passphrase was incorrect!")
+ if self.s2k.usage == 255 and not self.bytes_to_int(pt[-2:]) == (sum(bytearray(pt[:-2])) % 65536): # pragma: no cover
+ # if the usage byte is 255, key material is followed by a 2-octet checksum of the rest
+ # of the key material block
+ raise PGPDecryptionError("Passphrase was incorrect!")
+ return bytearray(pt)
+ def sign(self, sigdata, hash_alg):
+ return NotImplemented # pragma: no cover
+ def clear(self):
+ """delete and re-initialize all private components to zero"""
+ for field in self.__privfields__:
+ delattr(self, field)
+ setattr(self, field, MPI(0))
+class OpaquePrivKey(PrivKey, OpaquePubKey): # pragma: no cover
+ def __privkey__(self):
+ return NotImplemented
+ def _generate(self, key_size):
+ # return NotImplemented
+ raise NotImplementedError()
+ def decrypt_keyblob(self, passphrase):
+ return NotImplemented
+class RSAPriv(PrivKey, RSAPub):
+ __privfields__ = ('d', 'p', 'q', 'u')
+ def __privkey__(self):
+ return rsa.RSAPrivateNumbers(self.p, self.q, self.d,
+ rsa.rsa_crt_dmp1(self.d, self.p),
+ rsa.rsa_crt_dmq1(self.d, self.q),
+ rsa.rsa_crt_iqmp(self.p, self.q),
+ rsa.RSAPublicNumbers(self.e, self.n)).private_key(default_backend())
+ def _generate(self, key_size):
+ if any(c != 0 for c in self): # pragma: no cover
+ raise PGPError("key is already populated")
+ # generate some big numbers!
+ pk = rsa.generate_private_key(65537, key_size, default_backend())
+ pkn = pk.private_numbers()
+ self.n = MPI(pkn.public_numbers.n)
+ self.e = MPI(pkn.public_numbers.e)
+ self.d = MPI(pkn.d)
+ self.p = MPI(pkn.p)
+ self.q = MPI(pkn.q)
+ # from the RFC:
+ # "- MPI of u, the multiplicative inverse of p, mod q."
+ # or, simply, p^-1 mod p
+ # rsa.rsa_crt_iqmp(p, q) normally computes q^-1 mod p,
+ # so if we swap the values around we get the answer we want
+ self.u = MPI(rsa.rsa_crt_iqmp(pkn.q, pkn.p))
+ del pkn
+ del pk
+ self._compute_chksum()
+ def parse(self, packet):
+ super(RSAPriv, self).parse(packet)
+ self.s2k.parse(packet)
+ if not self.s2k:
+ self.d = MPI(packet)
+ self.p = MPI(packet)
+ self.q = MPI(packet)
+ self.u = MPI(packet)
+ if self.s2k.usage == 0:
+ self.chksum = packet[:2]
+ del packet[:2]
+ else:
+ ##TODO: this needs to be bounded to the length of the encrypted key material
+ self.encbytes = packet
+ def decrypt_keyblob(self, passphrase):
+ kb = super(RSAPriv, self).decrypt_keyblob(passphrase)
+ del passphrase
+ self.d = MPI(kb)
+ self.p = MPI(kb)
+ self.q = MPI(kb)
+ self.u = MPI(kb)
+ if self.s2k.usage in [254, 255]:
+ self.chksum = kb
+ del kb
+ def sign(self, sigdata, hash_alg):
+ signer = self.__privkey__().signer(padding.PKCS1v15(), hash_alg)
+ signer.update(sigdata)
+ return signer.finalize()
+class DSAPriv(PrivKey, DSAPub):
+ __privfields__ = ('x',)
+ def __privkey__(self):
+ params = dsa.DSAParameterNumbers(self.p, self.q, self.g)
+ pn = dsa.DSAPublicNumbers(self.y, params)
+ return dsa.DSAPrivateNumbers(self.x, pn).private_key(default_backend())
+ def _generate(self, key_size):
+ if any(c != 0 for c in self): # pragma: no cover
+ raise PGPError("key is already populated")
+ # generate some big numbers!
+ pk = dsa.generate_private_key(key_size, default_backend())
+ pkn = pk.private_numbers()
+ self.p = MPI(pkn.public_numbers.parameter_numbers.p)
+ self.q = MPI(pkn.public_numbers.parameter_numbers.q)
+ self.g = MPI(pkn.public_numbers.parameter_numbers.g)
+ self.y = MPI(pkn.public_numbers.y)
+ self.x = MPI(pkn.x)
+ del pkn
+ del pk
+ self._compute_chksum()
+ def parse(self, packet):
+ super(DSAPriv, self).parse(packet)
+ self.s2k.parse(packet)
+ if not self.s2k:
+ self.x = MPI(packet)
+ else:
+ self.encbytes = packet
+ if self.s2k.usage in [0, 255]:
+ self.chksum = packet[:2]
+ del packet[:2]
+ def decrypt_keyblob(self, passphrase):
+ kb = super(DSAPriv, self).decrypt_keyblob(passphrase)
+ del passphrase
+ self.x = MPI(kb)
+ if self.s2k.usage in [254, 255]:
+ self.chksum = kb
+ del kb
+ def sign(self, sigdata, hash_alg):
+ signer = self.__privkey__().signer(hash_alg)
+ signer.update(sigdata)
+ return signer.finalize()
+class ElGPriv(PrivKey, ElGPub):
+ __privfields__ = ('x', )
+ def __privkey__(self):
+ raise NotImplementedError()
+ def _generate(self, key_size):
+ raise NotImplementedError(PubKeyAlgorithm.ElGamal)
+ def parse(self, packet):
+ super(ElGPriv, self).parse(packet)
+ self.s2k.parse(packet)
+ if not self.s2k:
+ self.x = MPI(packet)
+ else:
+ self.encbytes = packet
+ if self.s2k.usage in [0, 255]:
+ self.chksum = packet[:2]
+ del packet[:2]
+ def decrypt_keyblob(self, passphrase):
+ kb = super(ElGPriv, self).decrypt_keyblob(passphrase)
+ del passphrase
+ self.x = MPI(kb)
+ if self.s2k.usage in [254, 255]:
+ self.chksum = kb
+ del kb
+class ECDSAPriv(PrivKey, ECDSAPub):
+ __privfields__ = ('s', )
+ def __privkey__(self):
+ ecp = ec.EllipticCurvePublicNumbers(self.x, self.y, self.oid.curve())
+ return ec.EllipticCurvePrivateNumbers(self.s, ecp).private_key(default_backend())
+ def _generate(self, oid):
+ if any(c != 0 for c in self): # pragma: no cover
+ raise PGPError("Key is already populated!")
+ self.oid = EllipticCurveOID(oid)
+ if not self.oid.can_gen:
+ raise ValueError("Curve not currently supported: {}".format(
+ pk = ec.generate_private_key(self.oid.curve(), default_backend())
+ pubn = pk.public_key().public_numbers()
+ self.x = MPI(pubn.x)
+ self.y = MPI(pubn.y)
+ self.s = MPI(pk.private_numbers().private_value)
+ def parse(self, packet):
+ super(ECDSAPriv, self).parse(packet)
+ self.s2k.parse(packet)
+ if not self.s2k:
+ self.s = MPI(packet)
+ if self.s2k.usage == 0:
+ self.chksum = packet[:2]
+ del packet[:2]
+ else:
+ ##TODO: this needs to be bounded to the length of the encrypted key material
+ self.encbytes = packet
+ def decrypt_keyblob(self, passphrase):
+ kb = super(ECDSAPriv, self).decrypt_keyblob(passphrase)
+ del passphrase
+ self.s = MPI(kb)
+ def sign(self, sigdata, hash_alg):
+ signer = self.__privkey__().signer(ec.ECDSA(hash_alg))
+ signer.update(sigdata)
+ return signer.finalize()
+class ECDHPriv(ECDSAPriv, ECDHPub):
+ def __bytearray__(self):
+ _b = ECDHPub.__bytearray__(self)
+ _b += self.s2k.__bytearray__()
+ if not self.s2k:
+ _b += self.s.to_mpibytes()
+ if self.s2k.usage == 0:
+ _b += self.chksum
+ else:
+ _b += self.encbytes
+ return _b
+ def __len__(self):
+ # because of the inheritance used for this, ECDSAPub.__len__ is called instead of ECDHPub.__len__
+ # the only real difference is self.kdf, so we can just add that
+ return super(ECDHPriv, self).__len__() + len(self.kdf)
+ def _generate(self, oid):
+ ECDSAPriv._generate(self, oid)
+ self.kdf.halg = self.oid.kdf_halg
+ self.kdf.encalg = self.oid.kek_alg
+ def publen(self):
+ return ECDHPub.__len__(self)
+ def parse(self, packet):
+ ECDHPub.parse(self, packet)
+ self.s2k.parse(packet)
+ if not self.s2k:
+ self.s = MPI(packet)
+ if self.s2k.usage == 0:
+ self.chksum = packet[:2]
+ del packet[:2]
+ else:
+ ##TODO: this needs to be bounded to the length of the encrypted key material
+ self.encbytes = packet
+class CipherText(MPIs):
+ def __init__(self):
+ super(CipherText, self).__init__()
+ for i in self.__mpis__:
+ setattr(self, i, MPI(0))
+ @classmethod
+ @abc.abstractmethod
+ def encrypt(cls, encfn, *args):
+ """create and populate a concrete CipherText class instance"""
+ @abc.abstractmethod
+ def decrypt(self, decfn, *args):
+ """decrypt the ciphertext contained in this CipherText instance"""
+ def __bytearray__(self):
+ _bytes = bytearray()
+ for i in self:
+ _bytes += i.to_mpibytes()
+ return _bytes
+class RSACipherText(CipherText):
+ __mpis__ = ('me_mod_n', )
+ @classmethod
+ def encrypt(cls, encfn, *args):
+ ct = cls()
+ ct.me_mod_n = MPI(cls.bytes_to_int(encfn(*args)))
+ return ct
+ def decrypt(self, decfn, *args):
+ return decfn(*args)
+ def parse(self, packet):
+ self.me_mod_n = MPI(packet)
+class ElGCipherText(CipherText):
+ __mpis__ = ('gk_mod_p', 'myk_mod_p')
+ @classmethod
+ def encrypt(cls, encfn, *args):
+ raise NotImplementedError()
+ def decrypt(self, decfn, *args):
+ raise NotImplementedError()
+ def parse(self, packet):
+ self.gk_mod_p = MPI(packet)
+ self.myk_mod_p = MPI(packet)
+class ECDHCipherText(CipherText):
+ __mpis__ = ('vX', 'vY')
+ @classmethod
+ def encrypt(cls, pk, *args):
+ """
+ For convenience, the synopsis of the encoding method is given below;
+ however, this section, [NIST-SP800-56A], and [RFC3394] are the
+ normative sources of the definition.
+ Obtain the authenticated recipient public key R
+ Generate an ephemeral key pair {v, V=vG}
+ Compute the shared point S = vR;
+ m = symm_alg_ID || session key || checksum || pkcs5_padding;
+ curve_OID_len = (byte)len(curve_OID);
+ Param = curve_OID_len || curve_OID || public_key_alg_ID || 03
+ || 01 || KDF_hash_ID || KEK_alg_ID for AESKeyWrap || "Anonymous
+ Sender " || recipient_fingerprint;
+ Z_len = the key size for the KEK_alg_ID used with AESKeyWrap
+ Compute Z = KDF( S, Z_len, Param );
+ Compute C = AESKeyWrap( Z, m ) as per [RFC3394]
+ VB = convert point V to the octet string
+ Output (MPI(VB) || len(C) || C).
+ The decryption is the inverse of the method given. Note that the
+ recipient obtains the shared secret by calculating
+ """
+ # *args should be:
+ # - m
+ #
+ _m, = args
+ # m may need to be PKCS5-padded
+ padder = PKCS7(64).padder()
+ m = padder.update(_m) + padder.finalize()
+ km = pk.keymaterial
+ ct = cls()
+ # generate ephemeral key pair, then store it in ct
+ v = ec.generate_private_key(km.oid.curve(), default_backend())
+ ct.vX = MPI(v.public_key().public_numbers().x)
+ ct.vY = MPI(v.public_key().public_numbers().y)
+ # compute the shared point S
+ s =, km.__pubkey__())
+ # derive the wrapping key
+ z = km.kdf.derive_key(s, km.oid, PubKeyAlgorithm.ECDH, pk.fingerprint)
+ # compute C
+ ct.c = aes_key_wrap(z, m, default_backend())
+ return ct
+ def decrypt(self, pk, *args):
+ km = pk.keymaterial
+ # assemble the public component of ephemeral key v
+ v = ec.EllipticCurvePublicNumbers(self.vX, self.vY, km.oid.curve()).public_key(default_backend())
+ # compute s using the inverse of how it was derived during encryption
+ s = km.__privkey__().exchange(ec.ECDH(), v)
+ # derive the wrapping key
+ z = km.kdf.derive_key(s, km.oid, PubKeyAlgorithm.ECDH, pk.fingerprint)
+ # unwrap and unpad m
+ _m = aes_key_unwrap(z, self.c, default_backend())
+ padder = PKCS7(64).unpadder()
+ return padder.update(_m) + padder.finalize()
+ def __init__(self):
+ super(ECDHCipherText, self).__init__()
+ self.c = bytearray(0)
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _xy = b'\x04' + self.vX.to_mpibytes()[2:] + self.vY.to_mpibytes()[2:]
+ _bytes += MPI(self.bytes_to_int(_xy, 'big')).to_mpibytes()
+ _bytes.append(len(self.c))
+ _bytes += self.c
+ return _bytes
+ def parse(self, packet):
+ # self.v = MPI(packet)
+ xy = bytearray(MPI(packet).to_mpibytes()[2:])
+ del xy[:1]
+ xylen = len(xy)
+ x, y = xy[:xylen // 2], xy[xylen // 2:]
+ self.vX = MPI(self.bytes_to_int(x))
+ self.vY = MPI(self.bytes_to_int(y))
+ clen = packet[0]
+ del packet[0]
+ self.c += packet[:clen]
+ del packet[:clen]