diff options
author | Kali Kaneko <kali@leap.se> | 2017-06-26 15:06:41 +0200 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2017-06-27 10:33:48 +0200 |
commit | 59504c7ddf7aab71614d691e705d386f58b5100d (patch) | |
tree | ea6afce37890a23cd088eac50587c3293035e78c /src | |
parent | e82b8143d3e9b2f62650e06798eee262885036c2 (diff) |
[pkg] vendor pgpy 0.4.1
Diffstat (limited to 'src')
22 files changed, 8780 insertions, 3 deletions
diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py index e376554..276ae13 100644 --- a/src/leap/mx/mail_receiver.py +++ b/src/leap/mx/mail_receiver.py @@ -42,8 +42,6 @@ import email.utils from datetime import datetime, timedelta from email import message_from_string -from pgpy import PGPKey, PGPMessage -from pgpy.errors import PGPEncryptionError from twisted.application.service import Service, IService from twisted.internet import inotify, defer, task, reactor @@ -59,6 +57,9 @@ from leap.soledad.common.document import ServerDocument from leap.mx.bounce import bounce_message from leap.mx.bounce import InvalidReturnPathError +from leap.mx.vendor.pgpy import PGPKey, PGPMessage +from leap.mx.vendor.pgpy.errors import PGPEncryptionError + class MailReceiver(Service): """ diff --git a/src/leap/mx/tests/test_mail_receiver.py b/src/leap/mx/tests/test_mail_receiver.py index abe5a71..0d03806 100644 --- a/src/leap/mx/tests/test_mail_receiver.py +++ b/src/leap/mx/tests/test_mail_receiver.py @@ -26,11 +26,11 @@ import shutil import tempfile from email.message import Message -from pgpy import PGPKey, PGPMessage from twisted.internet import defer, reactor from twisted.trial import unittest from leap.mx.mail_receiver import MailReceiver +from leap.mx.vendor.pgpy import PGPKey, PGPMessage BOUNCE_ADDRESS = "bounce@leap.se" diff --git a/src/leap/mx/vendor/__init__.py b/src/leap/mx/vendor/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/leap/mx/vendor/__init__.py diff --git a/src/leap/mx/vendor/pgpy/README b/src/leap/mx/vendor/pgpy/README new file mode 100644 index 0000000..275a496 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/README @@ -0,0 +1,30 @@ +This submodule is a copy of PGPy 0.4.1 +We're vendoring it here because a package for python2 is not readily available. +If you're thinking about packaging it, you probably could port leap.mx to py3 instead. + +pgpy is Copyright (c) 2014 Michael Greene - All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the {organization} nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/src/leap/mx/vendor/pgpy/__init__.py b/src/leap/mx/vendor/pgpy/__init__.py new file mode 100644 index 0000000..b80740d --- /dev/null +++ b/src/leap/mx/vendor/pgpy/__init__.py @@ -0,0 +1,21 @@ +""" PGPy :: Pretty Good Privacy for Python +""" +from ._author import * + +from .pgp import PGPKey +from .pgp import PGPKeyring +from .pgp import PGPMessage +from .pgp import PGPSignature +from .pgp import PGPUID + +__all__ = ['__author__', + '__copyright__', + '__license__', + '__version__', + 'constants', + 'errors', + 'PGPKey', + 'PGPKeyring', + 'PGPMessage', + 'PGPSignature', + 'PGPUID', ] diff --git a/src/leap/mx/vendor/pgpy/_author.py b/src/leap/mx/vendor/pgpy/_author.py new file mode 100644 index 0000000..ae17b45 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/_author.py @@ -0,0 +1,18 @@ +"""_author.py + +Canonical location for authorship information +__version__ is a PEP-386 compliant version string, +making use of distutils.version.LooseVersion +""" + +from distutils.version import LooseVersion + +__all__ = ['__author__', + '__copyright__', + '__license__', + '__version__'] + +__author__ = "Michael Greene" +__copyright__ = "Copyright (c) 2014 Michael Greene" +__license__ = "BSD" +__version__ = str(LooseVersion("0.4.1")) diff --git a/src/leap/mx/vendor/pgpy/_curves.py b/src/leap/mx/vendor/pgpy/_curves.py new file mode 100644 index 0000000..9503075 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/_curves.py @@ -0,0 +1,54 @@ +""" _curves.py +specify some additional curves that OpenSSL provides but cryptography doesn't explicitly expose +""" + +from cryptography import utils + +from cryptography.hazmat.primitives.asymmetric import ec + +from cryptography.hazmat.bindings.openssl.binding import Binding + +__all__ = tuple() + +# TODO: investigate defining additional curves using EC_GROUP_new_curve +# https://wiki.openssl.org/index.php/Elliptic_Curve_Cryptography#Defining_Curves + + +def _openssl_get_supported_curves(): + if hasattr(_openssl_get_supported_curves, '_curves'): + return _openssl_get_supported_curves._curves + + # use cryptography's cffi bindings to get an array of curve names + b = Binding() + cn = b.lib.EC_get_builtin_curves(b.ffi.NULL, 0) + cs = b.ffi.new('EC_builtin_curve[]', cn) + b.lib.EC_get_builtin_curves(cs, cn) + + # store the result so we don't have to do all of this every time + curves = { b.ffi.string(b.lib.OBJ_nid2sn(c.nid)).decode('utf-8') for c in cs } + _openssl_get_supported_curves._curves = curves + return curves + + +@utils.register_interface(ec.EllipticCurve) +class BrainpoolP256R1(object): + name = 'brainpoolP256r1' + key_size = 256 + + +@utils.register_interface(ec.EllipticCurve) +class BrainpoolP384R1(object): + name = 'brainpoolP384r1' + key_size = 384 + + +@utils.register_interface(ec.EllipticCurve) +class BrainpoolP512R1(object): + name = 'brainpoolP512r1' + key_size = 512 + + +# add these curves to the _CURVE_TYPES list +for curve in [BrainpoolP256R1, BrainpoolP384R1, BrainpoolP512R1]: + if curve.name not in ec._CURVE_TYPES and curve.name in _openssl_get_supported_curves(): + ec._CURVE_TYPES[curve.name] = curve diff --git a/src/leap/mx/vendor/pgpy/constants.py b/src/leap/mx/vendor/pgpy/constants.py new file mode 100644 index 0000000..271b0d0 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/constants.py @@ -0,0 +1,484 @@ +""" constants.py +""" +import bz2 +import hashlib +import imghdr +import os +import time +import zlib + +from collections import namedtuple +from enum import Enum +from enum import IntEnum +from pyasn1.type.univ import ObjectIdentifier + +import six + +from cryptography.hazmat.backends import openssl +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.ciphers import algorithms + +from .decorators import classproperty +from .types import FlagEnum +from ._curves import BrainpoolP256R1, BrainpoolP384R1, BrainpoolP512R1 + +__all__ = ['Backend', + 'EllipticCurveOID', + 'PacketTag', + 'SymmetricKeyAlgorithm', + 'PubKeyAlgorithm', + 'CompressionAlgorithm', + 'HashAlgorithm', + 'RevocationReason', + 'ImageEncoding', + 'SignatureType', + 'KeyServerPreferences', + 'String2KeyType', + 'TrustLevel', + 'KeyFlags', + 'Features', + 'RevocationKeyClass', + 'NotationDataFlags', + 'TrustFlags'] + + +# this is 50 KiB +_hashtunedata = bytearray([10, 11, 12, 13, 14, 15, 16, 17] * 128 * 50) + + +class Backend(Enum): + OpenSSL = openssl.backend + + +class EllipticCurveOID(Enum): + # these are specified as: + # id = (oid, curve) + Invalid = ('', ) + #: DJB's fast elliptic curve + #: + #: .. warning:: + #: This curve is not currently usable by PGPy + Curve25519 = ('1.3.6.1.4.1.3029.1.5.1', ) + #: Twisted Edwards variant of Curve25519 + #: + #: .. warning:: + #: This curve is not currently usable by PGPy + Ed25519 = ('1.3.6.1.4.1.11591.15.1', ) + #: NIST P-256, also known as SECG curve secp256r1 + NIST_P256 = ('1.2.840.10045.3.1.7', ec.SECP256R1) + #: NIST P-384, also known as SECG curve secp384r1 + NIST_P384 = ('1.3.132.0.34', ec.SECP384R1) + #: NIST P-521, also known as SECG curve secp521r1 + NIST_P521 = ('1.3.132.0.35', ec.SECP521R1) + #: Brainpool Standard Curve, 256-bit + #: + #: .. note:: + #: Requires OpenSSL >= 1.0.2 + Brainpool_P256 = ('1.3.36.3.3.2.8.1.1.7', BrainpoolP256R1) + #: Brainpool Standard Curve, 384-bit + #: + #: .. note:: + #: Requires OpenSSL >= 1.0.2 + Brainpool_P384 = ('1.3.36.3.3.2.8.1.1.11', BrainpoolP384R1) + #: Brainpool Standard Curve, 512-bit + #: + #: .. note:: + #: Requires OpenSSL >= 1.0.2 + Brainpool_P512 = ('1.3.36.3.3.2.8.1.1.13', BrainpoolP512R1) + #: SECG curve secp256k1 + SECP256K1 = ('1.3.132.0.10', ec.SECP256K1) + + def __new__(cls, oid, curve=None): + # preprocessing stage for enum members: + # - set enum_member.value to ObjectIdentifier(oid) + # - if curve is not None and curve.name is in ec._CURVE_TYPES, set enum_member.curve to curve + # - otherwise, set enum_member.curve to None + obj = object.__new__(cls) + obj._value_ = ObjectIdentifier(oid) + obj.curve = None + + if curve is not None and curve.name in ec._CURVE_TYPES: + obj.curve = curve + + return obj + + @property + def can_gen(self): + return self.curve is not None + + @property + def key_size(self): + if self.curve is not None: + return self.curve.key_size + + @property + def kdf_halg(self): + # return the hash algorithm to specify in the KDF fields when generating a key + algs = {256: HashAlgorithm.SHA256, + 384: HashAlgorithm.SHA384, + 512: HashAlgorithm.SHA512, + 521: HashAlgorithm.SHA512} + + return algs.get(self.key_size, None) + + @property + def kek_alg(self): + # return the AES algorithm to specify in the KDF fields when generating a key + algs = {256: SymmetricKeyAlgorithm.AES128, + 384: SymmetricKeyAlgorithm.AES192, + 512: SymmetricKeyAlgorithm.AES256, + 521: SymmetricKeyAlgorithm.AES256} + + return algs.get(self.key_size, None) + + +class PacketTag(IntEnum): + Invalid = 0 + PublicKeyEncryptedSessionKey = 1 + Signature = 2 + SymmetricKeyEncryptedSessionKey = 3 + OnePassSignature = 4 + SecretKey = 5 + PublicKey = 6 + SecretSubKey = 7 + CompressedData = 8 + SymmetricallyEncryptedData = 9 + Marker = 10 + LiteralData = 11 + Trust = 12 + UserID = 13 + PublicSubKey = 14 + UserAttribute = 17 + SymmetricallyEncryptedIntegrityProtectedData = 18 + ModificationDetectionCode = 19 + + +class SymmetricKeyAlgorithm(IntEnum): + """Supported symmetric key algorithms.""" + Plaintext = 0x00 + #: .. warning:: + #: IDEA is insecure. PGPy only allows it to be used for decryption, not encryption! + IDEA = 0x01 + #: Triple-DES with 168-bit key derived from 192 + TripleDES = 0x02 + #: CAST5 (or CAST-128) with 128-bit key + CAST5 = 0x03 + #: Blowfish with 128-bit key and 16 rounds + Blowfish = 0x04 + #: AES with 128-bit key + AES128 = 0x07 + #: AES with 192-bit key + AES192 = 0x08 + #: AES with 256-bit key + AES256 = 0x09 + # Twofish with 256-bit key - not currently supported + Twofish256 = 0x0A + #: Camellia with 128-bit key + Camellia128 = 0x0B + #: Camellia with 192-bit key + Camellia192 = 0x0C + #: Camellia with 256-bit key + Camellia256 = 0x0D + + @property + def cipher(self): + bs = {SymmetricKeyAlgorithm.IDEA: algorithms.IDEA, + SymmetricKeyAlgorithm.TripleDES: algorithms.TripleDES, + SymmetricKeyAlgorithm.CAST5: algorithms.CAST5, + SymmetricKeyAlgorithm.Blowfish: algorithms.Blowfish, + SymmetricKeyAlgorithm.AES128: algorithms.AES, + SymmetricKeyAlgorithm.AES192: algorithms.AES, + SymmetricKeyAlgorithm.AES256: algorithms.AES, + SymmetricKeyAlgorithm.Twofish256: namedtuple('Twofish256', ['block_size'])(block_size=128), + SymmetricKeyAlgorithm.Camellia128: algorithms.Camellia, + SymmetricKeyAlgorithm.Camellia192: algorithms.Camellia, + SymmetricKeyAlgorithm.Camellia256: algorithms.Camellia} + + if self in bs: + return bs[self] + + raise NotImplementedError(repr(self)) + + @property + def is_insecure(self): + insecure_ciphers = {SymmetricKeyAlgorithm.IDEA} + return self in insecure_ciphers + + @property + def block_size(self): + return self.cipher.block_size + + @property + def key_size(self): + ks = {SymmetricKeyAlgorithm.IDEA: 128, + SymmetricKeyAlgorithm.TripleDES: 192, + SymmetricKeyAlgorithm.CAST5: 128, + SymmetricKeyAlgorithm.Blowfish: 128, + SymmetricKeyAlgorithm.AES128: 128, + SymmetricKeyAlgorithm.AES192: 192, + SymmetricKeyAlgorithm.AES256: 256, + SymmetricKeyAlgorithm.Twofish256: 256, + SymmetricKeyAlgorithm.Camellia128: 128, + SymmetricKeyAlgorithm.Camellia192: 192, + SymmetricKeyAlgorithm.Camellia256: 256} + + if self in ks: + return ks[self] + + raise NotImplementedError(repr(self)) + + def gen_iv(self): + return os.urandom(self.block_size // 8) + + def gen_key(self): + return os.urandom(self.key_size // 8) + + +class PubKeyAlgorithm(IntEnum): + Invalid = 0x00 + #: Signifies that a key is an RSA key. + RSAEncryptOrSign = 0x01 + RSAEncrypt = 0x02 # deprecated + RSASign = 0x03 # deprecated + #: Signifies that a key is an ElGamal key. + ElGamal = 0x10 + #: Signifies that a key is a DSA key. + DSA = 0x11 + #: Signifies that a key is an ECDH key. + ECDH = 0x12 + #: Signifies that a key is an ECDSA key. + ECDSA = 0x13 + FormerlyElGamalEncryptOrSign = 0x14 # deprecated - do not generate + # DiffieHellman = 0x15 # X9.42 + + @property + def can_gen(self): + return self in {PubKeyAlgorithm.RSAEncryptOrSign, + PubKeyAlgorithm.DSA, + PubKeyAlgorithm.ECDSA, + PubKeyAlgorithm.ECDH} + + @property + def can_encrypt(self): # pragma: no cover + return self in {PubKeyAlgorithm.RSAEncryptOrSign, PubKeyAlgorithm.ElGamal, PubKeyAlgorithm.ECDH} + + @property + def can_sign(self): + return self in {PubKeyAlgorithm.RSAEncryptOrSign, PubKeyAlgorithm.DSA, PubKeyAlgorithm.ECDSA} + + @property + def deprecated(self): + return self in {PubKeyAlgorithm.RSAEncrypt, + PubKeyAlgorithm.RSASign, + PubKeyAlgorithm.FormerlyElGamalEncryptOrSign} + + +class CompressionAlgorithm(IntEnum): + #: No compression + Uncompressed = 0x00 + #: ZIP DEFLATE + ZIP = 0x01 + #: ZIP DEFLATE with zlib headers + ZLIB = 0x02 + #: Bzip2 + BZ2 = 0x03 + + def compress(self, data): + if self is CompressionAlgorithm.Uncompressed: + return data + + if self is CompressionAlgorithm.ZIP: + return zlib.compress(data)[2:-4] + + if self is CompressionAlgorithm.ZLIB: + return zlib.compress(data) + + if self is CompressionAlgorithm.BZ2: + return bz2.compress(data) + + raise NotImplementedError(self) + + def decompress(self, data): + if six.PY2: + data = bytes(data) + + if self is CompressionAlgorithm.Uncompressed: + return data + + if self is CompressionAlgorithm.ZIP: + return zlib.decompress(data, -15) + + if self is CompressionAlgorithm.ZLIB: + return zlib.decompress(data) + + if self is CompressionAlgorithm.BZ2: + return bz2.decompress(data) + + raise NotImplementedError(self) + + +class HashAlgorithm(IntEnum): + Invalid = 0x00 + MD5 = 0x01 + SHA1 = 0x02 + RIPEMD160 = 0x03 + _reserved_1 = 0x04 + _reserved_2 = 0x05 + _reserved_3 = 0x06 + _reserved_4 = 0x07 + SHA256 = 0x08 + SHA384 = 0x09 + SHA512 = 0x0A + SHA224 = 0x0B + + def __init__(self, *args): + super(self.__class__, self).__init__() + self._tuned_count = 0 + + @property + def hasher(self): + return hashlib.new(self.name) + + @property + def digest_size(self): + return self.hasher.digest_size + + @property + def tuned_count(self): + if self._tuned_count == 0: + self.tune_count() + + return self._tuned_count + + def tune_count(self): + start = end = 0 + htd = _hashtunedata[:] + + while start == end: + # potentially do this multiple times in case the resolution of time.time is low enough that + # hashing 100 KiB isn't enough time to produce a measurable difference + # (e.g. if the timer for time.time doesn't have enough precision) + htd = htd + htd + h = self.hasher + + start = time.time() + h.update(htd) + end = time.time() + + # now calculate how many bytes need to be hashed to reach our expected time period + # GnuPG tunes for about 100ms, so we'll do that as well + _TIME = 0.100 + ct = int(len(htd) * (_TIME / (end - start))) + c1 = ((ct >> (ct.bit_length() - 5)) - 16) + c2 = (ct.bit_length() - 11) + c = ((c2 << 4) + c1) + + # constrain self._tuned_count to be between 0 and 255 + self._tuned_count = max(min(c, 255), 0) + + +class RevocationReason(IntEnum): + #: No reason was specified. This is the default reason. + NotSpecified = 0x00 + #: The key was superseded by a new key. Only meaningful when revoking a key. + Superseded = 0x01 + #: Key material has been compromised. Only meaningful when revoking a key. + Compromised = 0x02 + #: Key is retired and no longer used. Only meaningful when revoking a key. + Retired = 0x03 + #: User ID information is no longer valid. Only meaningful when revoking a certification of a user id. + UserID = 0x20 + + +class ImageEncoding(IntEnum): + Unknown = 0x00 + JPEG = 0x01 + + @classmethod + def encodingof(cls, imagebytes): + type = imghdr.what(None, h=imagebytes) + if type == 'jpeg': + return ImageEncoding.JPEG + return ImageEncoding.Unknown # pragma: no cover + + +class SignatureType(IntEnum): + BinaryDocument = 0x00 + CanonicalDocument = 0x01 + Standalone = 0x02 + Generic_Cert = 0x10 + Persona_Cert = 0x11 + Casual_Cert = 0x12 + Positive_Cert = 0x13 + Subkey_Binding = 0x18 + PrimaryKey_Binding = 0x19 + DirectlyOnKey = 0x1F + KeyRevocation = 0x20 + SubkeyRevocation = 0x28 + CertRevocation = 0x30 + Timestamp = 0x40 + ThirdParty_Confirmation = 0x50 + + +class KeyServerPreferences(IntEnum): + Unknown = 0x00 + NoModify = 0x80 + + +class String2KeyType(IntEnum): + Simple = 0 + Salted = 1 + Reserved = 2 + Iterated = 3 + + +class TrustLevel(IntEnum): + Unknown = 0 + Expired = 1 + Undefined = 2 + Never = 3 + Marginal = 4 + Fully = 5 + Ultimate = 6 + + +class KeyFlags(FlagEnum): + #: Signifies that a key may be used to certify keys and user ids. Primary keys always have this, even if it is not specified. + Certify = 0x01 + #: Signifies that a key may be used to sign messages and documents. + Sign = 0x02 + #: Signifies that a key may be used to encrypt messages. + EncryptCommunications = 0x04 + #: Signifies that a key may be used to encrypt storage. Currently equivalent to :py:obj:`~pgpy.constants.EncryptCommunications`. + EncryptStorage = 0x08 + #: Signifies that the private component of a given key may have been split by a secret-sharing mechanism. Split + #: keys are not currently supported by PGPy. + Split = 0x10 + #: Signifies that a key may be used for authentication. + Authentication = 0x20 + #: Signifies that the private component of a key may be in the possession of more than one person. + MultiPerson = 0x80 + + +class Features(FlagEnum): + ModificationDetection = 0x01 + + @classproperty + def pgpy_features(cls): + return Features.ModificationDetection + + +class RevocationKeyClass(FlagEnum): + Sensitive = 0x40 + Normal = 0x80 + + +class NotationDataFlags(FlagEnum): + HumanReadable = 0x80 + + +class TrustFlags(FlagEnum): + Revoked = 0x20 + SubRevoked = 0x40 + Disabled = 0x80 + PendingCheck = 0x100 diff --git a/src/leap/mx/vendor/pgpy/decorators.py b/src/leap/mx/vendor/pgpy/decorators.py new file mode 100644 index 0000000..d2b9926 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/decorators.py @@ -0,0 +1,131 @@ +""" decorators.py +""" +import contextlib +import functools +import six +import warnings + +try: + from singledispatch import singledispatch + +except ImportError: # pragma: no cover + from functools import singledispatch + + +from .errors import PGPError + +__all__ = ['classproperty', + 'sdmethod', + 'sdproperty', + 'KeyAction'] + + +def classproperty(fget): + class ClassProperty(object): + def __init__(self, fget): + self.fget = fget + self.__doc__ = fget.__doc__ + + def __get__(self, cls, owner): + return self.fget(owner) + + def __set__(self, obj, value): # pragma: no cover + raise AttributeError("Read-only attribute") + + def __delete__(self, obj): # pragma: no cover + raise AttributeError("Read-only attribute") + + return ClassProperty(fget) + + +def sdmethod(meth): + """ + This is a hack to monkey patch sdproperty to work as expected with instance methods. + """ + sd = singledispatch(meth) + + def wrapper(obj, *args, **kwargs): + return sd.dispatch(args[0].__class__)(obj, *args, **kwargs) + + wrapper.register = sd.register + wrapper.dispatch = sd.dispatch + wrapper.registry = sd.registry + wrapper._clear_cache = sd._clear_cache + functools.update_wrapper(wrapper, meth) + return wrapper + + +def sdproperty(fget): + def defset(obj, val): # pragma: no cover + raise TypeError(str(val.__class__)) + + class SDProperty(property): + def register(self, cls=None, fset=None): + return self.fset.register(cls, fset) + + def setter(self, fset): + self.register(object, fset) + return type(self)(self.fget, self.fset, self.fdel, self.__doc__) + + return SDProperty(fget, sdmethod(defset)) + + +class KeyAction(object): + def __init__(self, *usage, **conditions): + super(KeyAction, self).__init__() + self.flags = set(usage) + self.conditions = conditions + + @contextlib.contextmanager + def usage(self, key, user): + def _preiter(first, iterable): + yield first + for item in iterable: + yield item + + em = {} + em['keyid'] = key.fingerprint.keyid + em['flags'] = ', '.join(flag.name for flag in self.flags) + + if len(self.flags): + for _key in _preiter(key, key.subkeys.values()): + if self.flags & set(_key._get_key_flags(user)): + break + + else: # pragma: no cover + raise PGPError("Key {keyid:s} does not have the required usage flag {flags:s}".format(**em)) + + else: + _key = key + + if _key is not key: + em['subkeyid'] = _key.fingerprint.keyid + warnings.warn("Key {keyid:s} does not have the required usage flag {flags:s}; using subkey {subkeyid:s}" + "".format(**em), stacklevel=4) + + yield _key + + def check_attributes(self, key): + for attr, expected in self.conditions.items(): + if getattr(key, attr) != expected: + raise PGPError("Expected: {attr:s} == {eval:s}. Got: {got:s}" + "".format(attr=attr, eval=str(expected), got=str(getattr(key, attr)))) + + def __call__(self, action): + # @functools.wraps(action) + @six.wraps(action) + def _action(key, *args, **kwargs): + if key._key is None: + raise PGPError("No key!") + + # if a key is in the process of being created, it needs to be allowed to certify its own user id + if len(key._uids) == 0 and key.is_primary and action is not key.certify.__wrapped__: + raise PGPError("Key is not complete - please add a User ID!") + + with self.usage(key, kwargs.get('user', None)) as _key: + self.check_attributes(key) + + # do the thing + return action(_key, *args, **kwargs) + + return _action diff --git a/src/leap/mx/vendor/pgpy/errors.py b/src/leap/mx/vendor/pgpy/errors.py new file mode 100644 index 0000000..6af0612 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/errors.py @@ -0,0 +1,39 @@ +""" errors.py +""" + +__all__ = ('PGPError', + 'PGPEncryptionError', + 'PGPDecryptionError', + 'PGPOpenSSLCipherNotSupported', + 'PGPInsecureCipher', + 'WontImplementError',) + + +class PGPError(Exception): + """Raised as a general error in PGPy""" + pass + + +class PGPEncryptionError(Exception): + """Raised when encryption fails""" + pass + + +class PGPDecryptionError(Exception): + """Raised when decryption fails""" + pass + + +class PGPOpenSSLCipherNotSupported(Exception): + """Raised when OpenSSL does not support the requested cipher""" + pass + + +class PGPInsecureCipher(Exception): + """Raised when a cipher known to be insecure is attempted to be used to encrypt data""" + pass + + +class WontImplementError(NotImplementedError): + """Raised when something that is not implemented, will not be implemented""" + pass diff --git a/src/leap/mx/vendor/pgpy/memoryview.py b/src/leap/mx/vendor/pgpy/memoryview.py new file mode 100644 index 0000000..35f3801 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/memoryview.py @@ -0,0 +1,128 @@ +""" util.py +""" +import six + +__all__ = ('memoryview', ) + +memoryview = memoryview + +if six.PY2: + # because Python2's memoryview can't be released directly, nor can it be used as a context manager + # this wrapper object should hopefully make the behavior more uniform to python 3's + import __builtin__ + import functools + + # this decorator will raise a ValueError if the wrapped memoryview object has been "released" + def notreleased(meth): + @functools.wraps(meth) + def _inner(self, *args, **kwargs): + if self._mem is None: + raise ValueError("operation forbidden on released memoryview object") + return meth(self, *args, **kwargs) + + return _inner + + class memoryview(object): # flake8: noqa + @property + @notreleased + def obj(self): + """The underlying object of the memoryview.""" + return self._obj + + @property + @notreleased + def nbytes(self): + # nbytes == product(shape) * itemsize == len(m.tobytes()) + nb = 1 + for dim in self.shape: + nb *= dim + return nb * self.itemsize + + # TODO: c_contiguous -> (self.ndim == 0 or ???) + # TODO: f_contiguous -> (self.ndim == 0 or ???) + # TODO: contiguous -> return self.c_contiguous or self.f_contiguous + + def __new__(cls, obj, parent=None): + memview = object.__new__(cls) + memview._obj = obj if parent is None else parent.obj + return memview + + def __init__(self, obj): + if not hasattr(self, '_mem'): + if not isinstance(obj, __builtin__.memoryview): + obj = __builtin__.memoryview(obj) + self._mem = obj + + def __dir__(self): + # so dir(...) looks like a memoryview object, and also + # contains our additional methods and properties, but not our instance members + return sorted(set(self.__class__.__dict__) | set(dir(self._mem))) + + @notreleased + def __getitem__(self, item): + # if this is a slice, it'll return another real memoryview object + # we'll need to wrap that subview in another memoryview wrapper + if isinstance(item, slice): + return memoryview(self._mem.__getitem__(item)) + + return self._mem.__getitem__(item) + + @notreleased + def __setitem__(self, key, value): + self._mem.__setitem__(key, value) + + @notreleased + def __delitem__(self, key): + raise TypeError("cannot delete memory") + + def __getattribute__(self, item): + try: + return object.__getattribute__(self, item) + + except AttributeError: + if object.__getattribute__(self, '_mem') is None: + raise ValueError("operation forbidden on released memoryview object") + + return object.__getattribute__(self, '_mem').__getattribute__(item) + + def __setattr__(self, key, value): + if key not in self.__dict__ and hasattr(__builtin__.memoryview, key): + # there are no writable attributes on memoryview objects + # changing indexed values is handled by __setitem__ + raise AttributeError("attribute '{}' of 'memoryview' objects is not writable".format(key)) + + else: + object.__setattr__(self, key, value) + + @notreleased + def __len__(self): + return len(self._mem) + + def __eq__(self, other): + if isinstance(other, memoryview): + return self._mem == other._mem + + return self._mem == other + + @notreleased + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.release() + + def __repr__(self): + return '<{}memory at 0x{:02X}>'.format('' if self._mem else 'released ', id(self)) + + def release(self): + """Release the underlying buffer exposed by the memoryview object""" + # this should effectively do the same job as memoryview.release() in Python 3 + self._mem = None + self._obj = None + + @notreleased + def hex(self): + """Return the data in the buffer as a string of hexadecimal numbers.""" + return ''.join(('{:02X}'.format(ord(c)) for c in self._mem)) + + # TODO: cast diff --git a/src/leap/mx/vendor/pgpy/packet/__init__.py b/src/leap/mx/vendor/pgpy/packet/__init__.py new file mode 100644 index 0000000..36120dd --- /dev/null +++ b/src/leap/mx/vendor/pgpy/packet/__init__.py @@ -0,0 +1,11 @@ +from .types import Key +from .types import Opaque +from .types import Packet +from .types import Primary +from .types import Private +from .types import Public +from .types import Sub + +from .packets import * # NOQA + +__all__ = ['Key', 'Opaque', 'Packet', 'Primary', 'Private', 'Public', 'Sub'] diff --git a/src/leap/mx/vendor/pgpy/packet/fields.py b/src/leap/mx/vendor/pgpy/packet/fields.py new file mode 100644 index 0000000..b5b5f2e --- /dev/null +++ b/src/leap/mx/vendor/pgpy/packet/fields.py @@ -0,0 +1,1514 @@ +""" fields.py +""" +from __future__ import absolute_import, division + +import abc +import binascii +import collections +import copy +import hashlib +import itertools +import math +import os + +from pyasn1.codec.der import decoder +from pyasn1.codec.der import encoder +from pyasn1.type.univ import Integer +from pyasn1.type.univ import Sequence + +from cryptography.exceptions import InvalidSignature + +from cryptography.hazmat.backends import default_backend + +from cryptography.hazmat.primitives import hashes + +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.asymmetric import dsa +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric import padding + +from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash + +from cryptography.hazmat.primitives.keywrap import aes_key_wrap +from cryptography.hazmat.primitives.keywrap import aes_key_unwrap + +from cryptography.hazmat.primitives.padding import PKCS7 + +from .subpackets import Signature as SignatureSP +from .subpackets import UserAttribute +from .subpackets import signature +from .subpackets import userattribute + +from .types import MPI +from .types import MPIs + +from ..constants import EllipticCurveOID +from ..constants import HashAlgorithm +from ..constants import PubKeyAlgorithm +from ..constants import String2KeyType +from ..constants import SymmetricKeyAlgorithm + +from ..decorators import sdproperty + +from ..errors import PGPDecryptionError +from ..errors import PGPError + +from ..symenc import _decrypt +from ..symenc import _encrypt + +from ..types import Field + +__all__ = ['SubPackets', + 'UserAttributeSubPackets', + 'Signature', + 'RSASignature', + 'DSASignature', + 'ECDSASignature', + 'PubKey', + 'OpaquePubKey', + 'RSAPub', + 'DSAPub', + 'ElGPub', + 'ECDSAPub', + 'ECDHPub', + 'String2Key', + 'ECKDF', + 'PrivKey', + 'OpaquePrivKey', + 'RSAPriv', + 'DSAPriv', + 'ElGPriv', + 'ECDSAPriv', + 'ECDHPriv', + 'CipherText', + 'RSACipherText', + 'ElGCipherText', + 'ECDHCipherText', ] + + +class SubPackets(collections.MutableMapping, Field): + _spmodule = signature + + def __init__(self): + super(SubPackets, self).__init__() + self._hashed_sp = collections.OrderedDict() + self._unhashed_sp = collections.OrderedDict() + + def __bytearray__(self): + _bytes = bytearray() + _bytes += self.__hashbytearray__() + _bytes += self.__unhashbytearray__() + return _bytes + + def __hashbytearray__(self): + _bytes = bytearray() + _bytes += self.int_to_bytes(sum(len(sp) for sp in self._hashed_sp.values()), 2) + for hsp in self._hashed_sp.values(): + _bytes += hsp.__bytearray__() + return _bytes + + def __unhashbytearray__(self): + _bytes = bytearray() + _bytes += self.int_to_bytes(sum(len(sp) for sp in self._unhashed_sp.values()), 2) + for uhsp in self._unhashed_sp.values(): + _bytes += uhsp.__bytearray__() + return _bytes + + def __len__(self): # pragma: no cover + return sum(sp.header.length for sp in itertools.chain(self._hashed_sp.values(), self._unhashed_sp.values())) + 4 + + def __iter__(self): + for sp in itertools.chain(self._hashed_sp.values(), self._unhashed_sp.values()): + yield sp + + def __setitem__(self, key, val): + # the key provided should always be the classname for the subpacket + # but, there can be multiple subpackets of the same type + # so, it should be stored in the format: [h_]<key>_<seqid> + # where: + # - <key> is the classname of val + # - <seqid> is a sequence id, starting at 0, for a given classname + + i = 0 + if isinstance(key, tuple): # pragma: no cover + key, i = key + + d = self._unhashed_sp + if key.startswith('h_'): + d, key = self._hashed_sp, key[2:] + + while (key, i) in d: + i += 1 + + d[(key, i)] = val + + def __getitem__(self, key): + if isinstance(key, tuple): # pragma: no cover + return self._hashed_sp.get(key, self._unhashed_sp.get(key)) + + if key.startswith('h_'): + return [v for k, v in self._hashed_sp.items() if key[2:] == k[0]] + + else: + return [v for k, v in itertools.chain(self._hashed_sp.items(), self._unhashed_sp.items()) if key == k[0]] + + def __delitem__(self, key): + ##TODO: this + raise NotImplementedError + + def __contains__(self, key): + return key in set(k for k, _ in itertools.chain(self._hashed_sp, self._unhashed_sp)) + + def __copy__(self): + sp = SubPackets() + sp._hashed_sp = self._hashed_sp.copy() + sp._unhashed_sp = self._unhashed_sp.copy() + + return sp + + def addnew(self, spname, hashed=False, **kwargs): + nsp = getattr(self._spmodule, spname)() + for p, v in kwargs.items(): + if hasattr(nsp, p): + setattr(nsp, p, v) + nsp.update_hlen() + if hashed: + self['h_' + spname] = nsp + + else: + self[spname] = nsp + + def update_hlen(self): + for sp in self: + sp.update_hlen() + + def parse(self, packet): + hl = self.bytes_to_int(packet[:2]) + del packet[:2] + + # we do it this way because we can't ensure that subpacket headers are sized appropriately + # for their contents, but we can at least output that correctly + # so instead of tracking how many bytes we can now output, we track how many bytes have we parsed so far + plen = len(packet) + while plen - len(packet) < hl: + sp = SignatureSP(packet) + self['h_' + sp.__class__.__name__] = sp + + uhl = self.bytes_to_int(packet[:2]) + del packet[:2] + + plen = len(packet) + while plen - len(packet) < uhl: + sp = SignatureSP(packet) + self[sp.__class__.__name__] = sp + + +class UserAttributeSubPackets(SubPackets): + """ + This is nearly the same as just the unhashed subpackets from above, + except that there isn't a length specifier. So, parse will only parse one packet, + appending that one packet to self.__unhashed_sp. + """ + _spmodule = userattribute + + def __bytearray__(self): + _bytes = bytearray() + for uhsp in self._unhashed_sp.values(): + _bytes += uhsp.__bytearray__() + return _bytes + + def __len__(self): # pragma: no cover + return sum(len(sp) for sp in self._unhashed_sp.values()) + + def parse(self, packet): + # parse just one packet and add it to the unhashed subpacket ordereddict + # I actually have yet to come across a User Attribute packet with more than one subpacket + # which makes sense, given that there is only one defined subpacket + sp = UserAttribute(packet) + self[sp.__class__.__name__] = sp + + +class Signature(MPIs): + def __init__(self): + for i in self.__mpis__: + setattr(self, i, MPI(0)) + + def __bytearray__(self): + _bytes = bytearray() + for i in self: + _bytes += i.to_mpibytes() + return _bytes + + @abc.abstractproperty + def __sig__(self): + """return the signature bytes in a format that can be understood by the signature verifier""" + + @abc.abstractmethod + def from_signer(self, sig): + """create and parse a concrete Signature class instance""" + + +class RSASignature(Signature): + __mpis__ = ('md_mod_n', ) + + def __sig__(self): + return self.md_mod_n.to_mpibytes()[2:] + + def parse(self, packet): + self.md_mod_n = MPI(packet) + + def from_signer(self, sig): + self.md_mod_n = MPI(self.bytes_to_int(sig)) + + +class DSASignature(Signature): + __mpis__ = ('r', 's') + + def __sig__(self): + # return the signature data into an ASN.1 sequence of integers in DER format + seq = Sequence() + for i in self: + seq.setComponentByPosition(len(seq), Integer(i)) + + return encoder.encode(seq) + + def from_signer(self, sig): + ##TODO: just use pyasn1 for this + def _der_intf(_asn): + if _asn[0] != 0x02: # pragma: no cover + raise ValueError("Expected: Integer (0x02). Got: 0x{:02X}".format(_asn[0])) + del _asn[0] + + if _asn[0] & 0x80: # pragma: no cover + llen = _asn[0] & 0x7F + del _asn[0] + + flen = self.bytes_to_int(_asn[:llen]) + del _asn[:llen] + + else: + flen = _asn[0] & 0x7F + del _asn[0] + + i = self.bytes_to_int(_asn[:flen]) + del _asn[:flen] + return i + + if isinstance(sig, bytes): + sig = bytearray(sig) + + # this is a very limited asn1 decoder - it is only intended to decode a DER encoded sequence of integers + if not sig[0] == 0x30: + raise NotImplementedError("Expected: Sequence (0x30). Got: 0x{:02X}".format(sig[0])) + del sig[0] + + # skip the sequence length field + if sig[0] & 0x80: # pragma: no cover + llen = sig[0] & 0x7F + del sig[:llen + 1] + + else: + del sig[0] + + self.r = MPI(_der_intf(sig)) + self.s = MPI(_der_intf(sig)) + + def parse(self, packet): + self.r = MPI(packet) + self.s = MPI(packet) + + +class ECDSASignature(DSASignature): + def from_signer(self, sig): + seq, _ = decoder.decode(sig) + self.r = MPI(seq[0]) + self.s = MPI(seq[1]) + + +class PubKey(MPIs): + __pubfields__ = () + + @property + def __mpis__(self): + for i in self.__pubfields__: + yield i + + def __init__(self): + super(PubKey, self).__init__() + for field in self.__pubfields__: + if isinstance(field, tuple): # pragma: no cover + field, val = field + + else: + val = MPI(0) + + setattr(self, field, val) + + @abc.abstractmethod + def __pubkey__(self): + """return the requisite *PublicKey class from the cryptography library""" + + def __len__(self): + return sum(len(getattr(self, i)) for i in self.__pubfields__) + + def __bytearray__(self): + _bytes = bytearray() + for field in self.__pubfields__: + _bytes += getattr(self, field).to_mpibytes() + + return _bytes + + def publen(self): + return len(self) + + def verify(self, subj, sigbytes, hash_alg): + return NotImplemented # pragma: no cover + + +class OpaquePubKey(PubKey): # pragma: no cover + def __init__(self): + super(OpaquePubKey, self).__init__() + self.data = bytearray() + + def __iter__(self): + yield self.data + + def __pubkey__(self): + return NotImplemented + + def __bytearray__(self): + return self.data + + def parse(self, packet): + ##TODO: this needs to be length-bounded to the end of the packet + self.data = packet + + +class RSAPub(PubKey): + __pubfields__ = ('n', 'e') + + def __pubkey__(self): + return rsa.RSAPublicNumbers(self.e, self.n).public_key(default_backend()) + + def verify(self, subj, sigbytes, hash_alg): + # zero-pad sigbytes if necessary + sigbytes = (b'\x00' * (self.n.byte_length() - len(sigbytes))) + sigbytes + verifier = self.__pubkey__().verifier(sigbytes, padding.PKCS1v15(), hash_alg) + verifier.update(subj) + + try: + verifier.verify() + + except InvalidSignature: + return False + + return True + + def parse(self, packet): + self.n = MPI(packet) + self.e = MPI(packet) + + +class DSAPub(PubKey): + __pubfields__ = ('p', 'q', 'g', 'y') + + def __pubkey__(self): + params = dsa.DSAParameterNumbers(self.p, self.q, self.g) + return dsa.DSAPublicNumbers(self.y, params).public_key(default_backend()) + + def verify(self, subj, sigbytes, hash_alg): + verifier = self.__pubkey__().verifier(sigbytes, hash_alg) + verifier.update(subj) + + try: + verifier.verify() + + except InvalidSignature: + return False + + return True + + def parse(self, packet): + self.p = MPI(packet) + self.q = MPI(packet) + self.g = MPI(packet) + self.y = MPI(packet) + + +class ElGPub(PubKey): + __pubfields__ = ('p', 'g', 'y') + + def __pubkey__(self): + raise NotImplementedError() + + def parse(self, packet): + self.p = MPI(packet) + self.g = MPI(packet) + self.y = MPI(packet) + + +class ECDSAPub(PubKey): + __pubfields__ = ('x', 'y') + + def __init__(self): + super(ECDSAPub, self).__init__() + self.oid = None + + def __len__(self): + return sum([len(getattr(self, i)) - 2 for i in self.__pubfields__] + + [3, len(encoder.encode(self.oid.value)) - 1]) + + def __pubkey__(self): + return ec.EllipticCurvePublicNumbers(self.x, self.y, self.oid.curve()).public_key(default_backend()) + + def __bytearray__(self): + _b = bytearray() + _b += encoder.encode(self.oid.value)[1:] + # 0x04 || x || y + # where x and y are the same length + _xy = b'\x04' + self.x.to_mpibytes()[2:] + self.y.to_mpibytes()[2:] + _b += MPI(self.bytes_to_int(_xy, 'big')).to_mpibytes() + + return _b + + def __copy__(self): + pkt = super(ECDSAPub, self).__copy__() + pkt.oid = self.oid + return pkt + + def verify(self, subj, sigbytes, hash_alg): + verifier = self.__pubkey__().verifier(sigbytes, ec.ECDSA(hash_alg)) + verifier.update(subj) + + try: + verifier.verify() + + except InvalidSignature: + return False + + return True + + def parse(self, packet): + oidlen = packet[0] + del packet[0] + _oid = bytearray(b'\x06') + _oid.append(oidlen) + _oid += bytearray(packet[:oidlen]) + # try: + oid, _ = decoder.decode(bytes(_oid)) + + # except: + # raise PGPError("Bad OID octet stream: b'{:s}'".format(''.join(['\\x{:02X}'.format(c) for c in _oid]))) + self.oid = EllipticCurveOID(oid) + del packet[:oidlen] + + # flen = (self.oid.bit_length // 8) + xy = bytearray(MPI(packet).to_mpibytes()[2:]) + # xy = bytearray(MPI(packet).to_bytes(flen, 'big')) + # the first byte is just \x04 + del xy[:1] + # now xy needs to be separated into x, y + xylen = len(xy) + x, y = xy[:xylen // 2], xy[xylen // 2:] + self.x = MPI(self.bytes_to_int(x)) + self.y = MPI(self.bytes_to_int(y)) + + +class ECDHPub(PubKey): + __pubfields__ = ('x', 'y') + + def __init__(self): + super(ECDHPub, self).__init__() + self.oid = None + self.kdf = ECKDF() + + def __len__(self): + return sum([len(getattr(self, i)) - 2 for i in self.__pubfields__] + + [3, + len(self.kdf), + len(encoder.encode(self.oid.value)) - 1]) + + def __pubkey__(self): + return ec.EllipticCurvePublicNumbers(self.x, self.y, self.oid.curve()).public_key(default_backend()) + + def __bytearray__(self): + _b = bytearray() + _b += encoder.encode(self.oid.value)[1:] + # 0x04 || x || y + # where x and y are the same length + _xy = b'\x04' + self.x.to_mpibytes()[2:] + self.y.to_mpibytes()[2:] + _b += MPI(self.bytes_to_int(_xy, 'big')).to_mpibytes() + _b += self.kdf.__bytearray__() + + return _b + + def __copy__(self): + pkt = super(ECDHPub, self).__copy__() + pkt.oid = self.oid + pkt.kdf = copy.copy(self.kdf) + return pkt + + def parse(self, packet): + """ + Algorithm-Specific Fields for ECDH keys: + + o a variable-length field containing a curve OID, formatted + as follows: + + - a one-octet size of the following field; values 0 and + 0xFF are reserved for future extensions + + - the octets representing a curve OID, defined in + Section 11 + + - MPI of an EC point representing a public key + + o a variable-length field containing KDF parameters, + formatted as follows: + + - a one-octet size of the following fields; values 0 and + 0xff are reserved for future extensions + + - a one-octet value 01, reserved for future extensions + + - a one-octet hash function ID used with a KDF + + - a one-octet algorithm ID for the symmetric algorithm + used to wrap the symmetric key used for the message + encryption; see Section 8 for details + """ + oidlen = packet[0] + del packet[0] + _oid = bytearray(b'\x06') + _oid.append(oidlen) + _oid += bytearray(packet[:oidlen]) + # try: + oid, _ = decoder.decode(bytes(_oid)) + + # except: + # raise PGPError("Bad OID octet stream: b'{:s}'".format(''.join(['\\x{:02X}'.format(c) for c in _oid]))) + self.oid = EllipticCurveOID(oid) + del packet[:oidlen] + + # flen = (self.oid.bit_length // 8) + xy = bytearray(MPI(packet).to_mpibytes()[2:]) + # xy = bytearray(MPI(packet).to_bytes(flen, 'big')) + # the first byte is just \x04 + del xy[:1] + # now xy needs to be separated into x, y + xylen = len(xy) + x, y = xy[:xylen // 2], xy[xylen // 2:] + self.x = MPI(self.bytes_to_int(x)) + self.y = MPI(self.bytes_to_int(y)) + + self.kdf.parse(packet) + + +class String2Key(Field): + """ + 3.7. String-to-Key (S2K) Specifiers + + String-to-key (S2K) specifiers are used to convert passphrase strings + into symmetric-key encryption/decryption keys. They are used in two + places, currently: to encrypt the secret part of private keys in the + private keyring, and to convert passphrases to encryption keys for + symmetrically encrypted messages. + + 3.7.1. String-to-Key (S2K) Specifier Types + + There are three types of S2K specifiers currently supported, and + some reserved values: + + ID S2K Type + -- -------- + 0 Simple S2K + 1 Salted S2K + 2 Reserved value + 3 Iterated and Salted S2K + 100 to 110 Private/Experimental S2K + + These are described in Sections 3.7.1.1 - 3.7.1.3. + + 3.7.1.1. Simple S2K + + This directly hashes the string to produce the key data. See below + for how this hashing is done. + + Octet 0: 0x00 + Octet 1: hash algorithm + + Simple S2K hashes the passphrase to produce the session key. The + manner in which this is done depends on the size of the session key + (which will depend on the cipher used) and the size of the hash + algorithm's output. If the hash size is greater than the session key + size, the high-order (leftmost) octets of the hash are used as the + key. + + If the hash size is less than the key size, multiple instances of the + hash context are created -- enough to produce the required key data. + These instances are preloaded with 0, 1, 2, ... octets of zeros (that + is to say, the first instance has no preloading, the second gets + preloaded with 1 octet of zero, the third is preloaded with two + octets of zeros, and so forth). + + As the data is hashed, it is given independently to each hash + context. Since the contexts have been initialized differently, they + will each produce different hash output. Once the passphrase is + hashed, the output data from the multiple hashes is concatenated, + first hash leftmost, to produce the key data, with any excess octets + on the right discarded. + + 3.7.1.2. Salted S2K + + This includes a "salt" value in the S2K specifier -- some arbitrary + data -- that gets hashed along with the passphrase string, to help + prevent dictionary attacks. + + Octet 0: 0x01 + Octet 1: hash algorithm + Octets 2-9: 8-octet salt value + + Salted S2K is exactly like Simple S2K, except that the input to the + hash function(s) consists of the 8 octets of salt from the S2K + specifier, followed by the passphrase. + + 3.7.1.3. Iterated and Salted S2K + + This includes both a salt and an octet count. The salt is combined + with the passphrase and the resulting value is hashed repeatedly. + This further increases the amount of work an attacker must do to try + dictionary attacks. + + Octet 0: 0x03 + Octet 1: hash algorithm + Octets 2-9: 8-octet salt value + Octet 10: count, a one-octet, coded value + + The count is coded into a one-octet number using the following + formula: + + #define EXPBIAS 6 + count = ((Int32)16 + (c & 15)) << ((c >> 4) + EXPBIAS); + + The above formula is in C, where "Int32" is a type for a 32-bit + integer, and the variable "c" is the coded count, Octet 10. + + Iterated-Salted S2K hashes the passphrase and salt data multiple + times. The total number of octets to be hashed is specified in the + encoded count in the S2K specifier. Note that the resulting count + value is an octet count of how many octets will be hashed, not an + iteration count. + + Initially, one or more hash contexts are set up as with the other S2K + algorithms, depending on how many octets of key data are needed. + Then the salt, followed by the passphrase data, is repeatedly hashed + until the number of octets specified by the octet count has been + hashed. The one exception is that if the octet count is less than + the size of the salt plus passphrase, the full salt plus passphrase + will be hashed even though that is greater than the octet count. + After the hashing is done, the data is unloaded from the hash + context(s) as with the other S2K algorithms. + """ + @sdproperty + def encalg(self): + return self._encalg + + @encalg.register(int) + @encalg.register(SymmetricKeyAlgorithm) + def encalg_int(self, val): + self._encalg = SymmetricKeyAlgorithm(val) + + @sdproperty + def specifier(self): + return self._specifier + + @specifier.register(int) + @specifier.register(String2KeyType) + def specifier_int(self, val): + self._specifier = String2KeyType(val) + + @sdproperty + def halg(self): + return self._halg + + @halg.register(int) + @halg.register(HashAlgorithm) + def halg_int(self, val): + self._halg = HashAlgorithm(val) + + @sdproperty + def count(self): + return (16 + (self._count & 15)) << ((self._count >> 4) + 6) + + @count.register(int) + def count_int(self, val): + if val < 0 or val > 255: # pragma: no cover + raise ValueError("count must be between 0 and 256") + self._count = val + + def __init__(self): + super(String2Key, self).__init__() + self.usage = 0 + self.encalg = 0 + self.specifier = 0 + self.iv = None + + # specifier-specific fields + # simple, salted, iterated + self.halg = 0 + + # salted, iterated + self.salt = bytearray() + + # iterated + self.count = 0 + + def __bytearray__(self): + _bytes = bytearray() + _bytes.append(self.usage) + if bool(self): + _bytes.append(self.encalg) + _bytes.append(self.specifier) + if self.specifier >= String2KeyType.Simple: + _bytes.append(self.halg) + if self.specifier >= String2KeyType.Salted: + _bytes += self.salt + if self.specifier == String2KeyType.Iterated: + _bytes.append(self._count) + if self.iv is not None: + _bytes += self.iv + return _bytes + + def __len__(self): + return len(self.__bytearray__()) + + def __bool__(self): + return self.usage in [254, 255] + + def __nonzero__(self): + return self.__bool__() + + def __copy__(self): + s2k = String2Key() + s2k.usage = self.usage + s2k.encalg = self.encalg + s2k.specifier = self.specifier + s2k.iv = self.iv + s2k.halg = self.halg + s2k.salt = copy.copy(self.salt) + s2k.count = self._count + return s2k + + def parse(self, packet, iv=True): + self.usage = packet[0] + del packet[0] + + if bool(self): + self.encalg = packet[0] + del packet[0] + + self.specifier = packet[0] + del packet[0] + + if self.specifier >= String2KeyType.Simple: + # this will always be true + self.halg = packet[0] + del packet[0] + + if self.specifier >= String2KeyType.Salted: + self.salt = packet[:8] + del packet[:8] + + if self.specifier == String2KeyType.Iterated: + self.count = packet[0] + del packet[0] + + if iv: + self.iv = packet[:(self.encalg.block_size // 8)] + del packet[:(self.encalg.block_size // 8)] + + def derive_key(self, passphrase): + ##TODO: raise an exception if self.usage is not 254 or 255 + keylen = self.encalg.key_size + hashlen = self.halg.digest_size * 8 + + ctx = int(math.ceil((keylen / hashlen))) + + # Simple S2K - always done + hsalt = b'' + hpass = passphrase.encode('latin-1') + + # salted, iterated S2K + if self.specifier >= String2KeyType.Salted: + hsalt = bytes(self.salt) + + count = len(hsalt + hpass) + if self.specifier == String2KeyType.Iterated and self.count > len(hsalt + hpass): + count = self.count + + hcount = (count // len(hsalt + hpass)) + hleft = count - (hcount * len(hsalt + hpass)) + + hashdata = ((hsalt + hpass) * hcount) + (hsalt + hpass)[:hleft] + + h = [] + for i in range(0, ctx): + _h = self.halg.hasher + _h.update(b'\x00' * i) + _h.update(hashdata) + h.append(_h) + + # GC some stuff + del hsalt + del hpass + del hashdata + + # and return the key! + return b''.join(hc.digest() for hc in h)[:(keylen // 8)] + + +class ECKDF(Field): + """ + o a variable-length field containing KDF parameters, + formatted as follows: + + - a one-octet size of the following fields; values 0 and + 0xff are reserved for future extensions + + - a one-octet value 01, reserved for future extensions + + - a one-octet hash function ID used with a KDF + + - a one-octet algorithm ID for the symmetric algorithm + used to wrap the symmetric key used for the message + encryption; see Section 8 for details + """ + @sdproperty + def halg(self): + return self._halg + + @halg.register(int) + @halg.register(HashAlgorithm) + def halg_int(self, val): + self._halg = HashAlgorithm(val) + + @sdproperty + def encalg(self): + return self._encalg + + @encalg.register(int) + @encalg.register(SymmetricKeyAlgorithm) + def encalg_int(self, val): + self._encalg = SymmetricKeyAlgorithm(val) + + def __init__(self): + super(ECKDF, self).__init__() + self.halg = 0 + self.encalg = 0 + + def __bytearray__(self): + _bytes = bytearray() + _bytes.append(len(self) - 1) + _bytes.append(0x01) + _bytes.append(self.halg) + _bytes.append(self.encalg) + return _bytes + + def __len__(self): + return 4 + + def parse(self, packet): + # packet[0] should always be 3 + # packet[1] should always be 1 + # TODO: this assert is likely not necessary, but we should raise some kind of exception + # if parsing fails due to these fields being incorrect + assert packet[:2] == b'\x03\x01' + del packet[:2] + + self.halg = packet[0] + del packet[0] + + self.encalg = packet[0] + del packet[0] + + def derive_key(self, s, curve, pkalg, fingerprint): + # wrapper around the Concatenation KDF method provided by cryptography + # assemble the additional data as defined in RFC 6637: + # Param = curve_OID_len || curve_OID || public_key_alg_ID || 03 || 01 || KDF_hash_ID || KEK_alg_ID for AESKeyWrap || "Anonymous + data = bytearray() + data += encoder.encode(curve.value)[1:] + data.append(pkalg) + data += b'\x03\x01' + data.append(self.halg) + data.append(self.encalg) + data += b'Anonymous Sender ' + data += binascii.unhexlify(fingerprint.replace(' ', '')) + + ckdf = ConcatKDFHash(algorithm=getattr(hashes, self.halg.name)(), length=self.encalg.key_size // 8, otherinfo=bytes(data), backend=default_backend()) + return ckdf.derive(s) + + +class PrivKey(PubKey): + __privfields__ = () + + @property + def __mpis__(self): + for i in super(PrivKey, self).__mpis__: + yield i + + for i in self.__privfields__: + yield i + + def __init__(self): + super(PrivKey, self).__init__() + + self.s2k = String2Key() + self.encbytes = bytearray() + self.chksum = bytearray() + + for field in self.__privfields__: + setattr(self, field, MPI(0)) + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(PrivKey, self).__bytearray__() + + _bytes += self.s2k.__bytearray__() + if self.s2k: + _bytes += self.encbytes + + else: + for field in self.__privfields__: + _bytes += getattr(self, field).to_mpibytes() + + if self.s2k.usage == 0: + _bytes += self.chksum + + return _bytes + + def __len__(self): + l = super(PrivKey, self).__len__() + len(self.s2k) + len(self.chksum) + if self.s2k: + l += len(self.encbytes) + + else: + l += sum(len(getattr(self, i)) for i in self.__privfields__) + + return l + + def __copy__(self): + pk = super(PrivKey, self).__copy__() + pk.s2k = copy.copy(self.s2k) + pk.encbytes = copy.copy(self.encbytes) + pk.chksum = copy.copy(self.chksum) + return pk + + @abc.abstractmethod + def __privkey__(self): + """return the requisite *PrivateKey class from the cryptography library""" + + @abc.abstractmethod + def _generate(self, key_size): + """Generate a new PrivKey""" + + def _compute_chksum(self): + chs = sum(sum(bytearray(c.to_mpibytes())) for c in self) % 65536 + self.chksum = bytearray(self.int_to_bytes(chs, 2)) + + def publen(self): + return super(PrivKey, self).__len__() + + def encrypt_keyblob(self, passphrase, enc_alg, hash_alg): + # PGPy will only ever use iterated and salted S2k mode + self.s2k.usage = 254 + self.s2k.encalg = enc_alg + self.s2k.specifier = String2KeyType.Iterated + self.s2k.iv = enc_alg.gen_iv() + self.s2k.halg = hash_alg + self.s2k.salt = bytearray(os.urandom(8)) + self.s2k.count = hash_alg.tuned_count + + # now that String-to-Key is ready to go, derive sessionkey from passphrase + # and then unreference passphrase + sessionkey = self.s2k.derive_key(passphrase) + del passphrase + + pt = bytearray() + for pf in self.__privfields__: + pt += getattr(self, pf).to_mpibytes() + + # append a SHA-1 hash of the plaintext so far to the plaintext + pt += hashlib.new('sha1', pt).digest() + + # encrypt + self.encbytes = bytearray(_encrypt(bytes(pt), bytes(sessionkey), enc_alg, bytes(self.s2k.iv))) + + # delete pt and clear self + del pt + self.clear() + + @abc.abstractmethod + def decrypt_keyblob(self, passphrase): + if not self.s2k: # pragma: no cover + # not encrypted + return + + # Encryption/decryption of the secret data is done in CFB mode using + # the key created from the passphrase and the Initial Vector from the + # packet. A different mode is used with V3 keys (which are only RSA) + # than with other key formats. (...) + # + # With V4 keys, a simpler method is used. All secret MPI values are + # encrypted in CFB mode, including the MPI bitcount prefix. + + # derive the session key from our passphrase, and then unreference passphrase + sessionkey = self.s2k.derive_key(passphrase) + del passphrase + + # attempt to decrypt this key + pt = _decrypt(bytes(self.encbytes), bytes(sessionkey), self.s2k.encalg, bytes(self.s2k.iv)) + + # check the hash to see if we decrypted successfully or not + if self.s2k.usage == 254 and not pt[-20:] == hashlib.new('sha1', pt[:-20]).digest(): + # if the usage byte is 254, key material is followed by a 20-octet sha-1 hash of the rest + # of the key material block + raise PGPDecryptionError("Passphrase was incorrect!") + + if self.s2k.usage == 255 and not self.bytes_to_int(pt[-2:]) == (sum(bytearray(pt[:-2])) % 65536): # pragma: no cover + # if the usage byte is 255, key material is followed by a 2-octet checksum of the rest + # of the key material block + raise PGPDecryptionError("Passphrase was incorrect!") + + return bytearray(pt) + + def sign(self, sigdata, hash_alg): + return NotImplemented # pragma: no cover + + def clear(self): + """delete and re-initialize all private components to zero""" + for field in self.__privfields__: + delattr(self, field) + setattr(self, field, MPI(0)) + + +class OpaquePrivKey(PrivKey, OpaquePubKey): # pragma: no cover + def __privkey__(self): + return NotImplemented + + def _generate(self, key_size): + # return NotImplemented + raise NotImplementedError() + + def decrypt_keyblob(self, passphrase): + return NotImplemented + + +class RSAPriv(PrivKey, RSAPub): + __privfields__ = ('d', 'p', 'q', 'u') + + def __privkey__(self): + return rsa.RSAPrivateNumbers(self.p, self.q, self.d, + rsa.rsa_crt_dmp1(self.d, self.p), + rsa.rsa_crt_dmq1(self.d, self.q), + rsa.rsa_crt_iqmp(self.p, self.q), + rsa.RSAPublicNumbers(self.e, self.n)).private_key(default_backend()) + + def _generate(self, key_size): + if any(c != 0 for c in self): # pragma: no cover + raise PGPError("key is already populated") + + # generate some big numbers! + pk = rsa.generate_private_key(65537, key_size, default_backend()) + pkn = pk.private_numbers() + + self.n = MPI(pkn.public_numbers.n) + self.e = MPI(pkn.public_numbers.e) + self.d = MPI(pkn.d) + self.p = MPI(pkn.p) + self.q = MPI(pkn.q) + # from the RFC: + # "- MPI of u, the multiplicative inverse of p, mod q." + # or, simply, p^-1 mod p + # rsa.rsa_crt_iqmp(p, q) normally computes q^-1 mod p, + # so if we swap the values around we get the answer we want + self.u = MPI(rsa.rsa_crt_iqmp(pkn.q, pkn.p)) + + del pkn + del pk + + self._compute_chksum() + + def parse(self, packet): + super(RSAPriv, self).parse(packet) + self.s2k.parse(packet) + + if not self.s2k: + self.d = MPI(packet) + self.p = MPI(packet) + self.q = MPI(packet) + self.u = MPI(packet) + + if self.s2k.usage == 0: + self.chksum = packet[:2] + del packet[:2] + + else: + ##TODO: this needs to be bounded to the length of the encrypted key material + self.encbytes = packet + + def decrypt_keyblob(self, passphrase): + kb = super(RSAPriv, self).decrypt_keyblob(passphrase) + del passphrase + + self.d = MPI(kb) + self.p = MPI(kb) + self.q = MPI(kb) + self.u = MPI(kb) + + if self.s2k.usage in [254, 255]: + self.chksum = kb + del kb + + def sign(self, sigdata, hash_alg): + signer = self.__privkey__().signer(padding.PKCS1v15(), hash_alg) + signer.update(sigdata) + return signer.finalize() + + +class DSAPriv(PrivKey, DSAPub): + __privfields__ = ('x',) + + def __privkey__(self): + params = dsa.DSAParameterNumbers(self.p, self.q, self.g) + pn = dsa.DSAPublicNumbers(self.y, params) + return dsa.DSAPrivateNumbers(self.x, pn).private_key(default_backend()) + + def _generate(self, key_size): + if any(c != 0 for c in self): # pragma: no cover + raise PGPError("key is already populated") + + # generate some big numbers! + pk = dsa.generate_private_key(key_size, default_backend()) + pkn = pk.private_numbers() + + self.p = MPI(pkn.public_numbers.parameter_numbers.p) + self.q = MPI(pkn.public_numbers.parameter_numbers.q) + self.g = MPI(pkn.public_numbers.parameter_numbers.g) + self.y = MPI(pkn.public_numbers.y) + self.x = MPI(pkn.x) + + del pkn + del pk + + self._compute_chksum() + + def parse(self, packet): + super(DSAPriv, self).parse(packet) + self.s2k.parse(packet) + + if not self.s2k: + self.x = MPI(packet) + + else: + self.encbytes = packet + + if self.s2k.usage in [0, 255]: + self.chksum = packet[:2] + del packet[:2] + + def decrypt_keyblob(self, passphrase): + kb = super(DSAPriv, self).decrypt_keyblob(passphrase) + del passphrase + + self.x = MPI(kb) + + if self.s2k.usage in [254, 255]: + self.chksum = kb + del kb + + def sign(self, sigdata, hash_alg): + signer = self.__privkey__().signer(hash_alg) + signer.update(sigdata) + return signer.finalize() + + +class ElGPriv(PrivKey, ElGPub): + __privfields__ = ('x', ) + + def __privkey__(self): + raise NotImplementedError() + + def _generate(self, key_size): + raise NotImplementedError(PubKeyAlgorithm.ElGamal) + + def parse(self, packet): + super(ElGPriv, self).parse(packet) + self.s2k.parse(packet) + + if not self.s2k: + self.x = MPI(packet) + + else: + self.encbytes = packet + + if self.s2k.usage in [0, 255]: + self.chksum = packet[:2] + del packet[:2] + + def decrypt_keyblob(self, passphrase): + kb = super(ElGPriv, self).decrypt_keyblob(passphrase) + del passphrase + + self.x = MPI(kb) + + if self.s2k.usage in [254, 255]: + self.chksum = kb + del kb + + +class ECDSAPriv(PrivKey, ECDSAPub): + __privfields__ = ('s', ) + + def __privkey__(self): + ecp = ec.EllipticCurvePublicNumbers(self.x, self.y, self.oid.curve()) + return ec.EllipticCurvePrivateNumbers(self.s, ecp).private_key(default_backend()) + + def _generate(self, oid): + if any(c != 0 for c in self): # pragma: no cover + raise PGPError("Key is already populated!") + + self.oid = EllipticCurveOID(oid) + + if not self.oid.can_gen: + raise ValueError("Curve not currently supported: {}".format(oid.name)) + + pk = ec.generate_private_key(self.oid.curve(), default_backend()) + pubn = pk.public_key().public_numbers() + self.x = MPI(pubn.x) + self.y = MPI(pubn.y) + self.s = MPI(pk.private_numbers().private_value) + + def parse(self, packet): + super(ECDSAPriv, self).parse(packet) + self.s2k.parse(packet) + + if not self.s2k: + self.s = MPI(packet) + + if self.s2k.usage == 0: + self.chksum = packet[:2] + del packet[:2] + + else: + ##TODO: this needs to be bounded to the length of the encrypted key material + self.encbytes = packet + + def decrypt_keyblob(self, passphrase): + kb = super(ECDSAPriv, self).decrypt_keyblob(passphrase) + del passphrase + + self.s = MPI(kb) + + def sign(self, sigdata, hash_alg): + signer = self.__privkey__().signer(ec.ECDSA(hash_alg)) + signer.update(sigdata) + return signer.finalize() + + +class ECDHPriv(ECDSAPriv, ECDHPub): + def __bytearray__(self): + _b = ECDHPub.__bytearray__(self) + _b += self.s2k.__bytearray__() + if not self.s2k: + _b += self.s.to_mpibytes() + + if self.s2k.usage == 0: + _b += self.chksum + + else: + _b += self.encbytes + + return _b + + def __len__(self): + # because of the inheritance used for this, ECDSAPub.__len__ is called instead of ECDHPub.__len__ + # the only real difference is self.kdf, so we can just add that + return super(ECDHPriv, self).__len__() + len(self.kdf) + + def _generate(self, oid): + ECDSAPriv._generate(self, oid) + self.kdf.halg = self.oid.kdf_halg + self.kdf.encalg = self.oid.kek_alg + + def publen(self): + return ECDHPub.__len__(self) + + def parse(self, packet): + ECDHPub.parse(self, packet) + self.s2k.parse(packet) + + if not self.s2k: + self.s = MPI(packet) + + if self.s2k.usage == 0: + self.chksum = packet[:2] + del packet[:2] + + else: + ##TODO: this needs to be bounded to the length of the encrypted key material + self.encbytes = packet + + +class CipherText(MPIs): + def __init__(self): + super(CipherText, self).__init__() + for i in self.__mpis__: + setattr(self, i, MPI(0)) + + @classmethod + @abc.abstractmethod + def encrypt(cls, encfn, *args): + """create and populate a concrete CipherText class instance""" + + @abc.abstractmethod + def decrypt(self, decfn, *args): + """decrypt the ciphertext contained in this CipherText instance""" + + def __bytearray__(self): + _bytes = bytearray() + for i in self: + _bytes += i.to_mpibytes() + return _bytes + + +class RSACipherText(CipherText): + __mpis__ = ('me_mod_n', ) + + @classmethod + def encrypt(cls, encfn, *args): + ct = cls() + ct.me_mod_n = MPI(cls.bytes_to_int(encfn(*args))) + return ct + + def decrypt(self, decfn, *args): + return decfn(*args) + + def parse(self, packet): + self.me_mod_n = MPI(packet) + + +class ElGCipherText(CipherText): + __mpis__ = ('gk_mod_p', 'myk_mod_p') + + @classmethod + def encrypt(cls, encfn, *args): + raise NotImplementedError() + + def decrypt(self, decfn, *args): + raise NotImplementedError() + + def parse(self, packet): + self.gk_mod_p = MPI(packet) + self.myk_mod_p = MPI(packet) + + +class ECDHCipherText(CipherText): + __mpis__ = ('vX', 'vY') + + @classmethod + def encrypt(cls, pk, *args): + """ + For convenience, the synopsis of the encoding method is given below; + however, this section, [NIST-SP800-56A], and [RFC3394] are the + normative sources of the definition. + + Obtain the authenticated recipient public key R + Generate an ephemeral key pair {v, V=vG} + Compute the shared point S = vR; + m = symm_alg_ID || session key || checksum || pkcs5_padding; + curve_OID_len = (byte)len(curve_OID); + Param = curve_OID_len || curve_OID || public_key_alg_ID || 03 + || 01 || KDF_hash_ID || KEK_alg_ID for AESKeyWrap || "Anonymous + Sender " || recipient_fingerprint; + Z_len = the key size for the KEK_alg_ID used with AESKeyWrap + Compute Z = KDF( S, Z_len, Param ); + Compute C = AESKeyWrap( Z, m ) as per [RFC3394] + VB = convert point V to the octet string + Output (MPI(VB) || len(C) || C). + + The decryption is the inverse of the method given. Note that the + recipient obtains the shared secret by calculating + """ + # *args should be: + # - m + # + _m, = args + + # m may need to be PKCS5-padded + padder = PKCS7(64).padder() + m = padder.update(_m) + padder.finalize() + + km = pk.keymaterial + + ct = cls() + + # generate ephemeral key pair, then store it in ct + v = ec.generate_private_key(km.oid.curve(), default_backend()) + ct.vX = MPI(v.public_key().public_numbers().x) + ct.vY = MPI(v.public_key().public_numbers().y) + + # compute the shared point S + s = v.exchange(ec.ECDH(), km.__pubkey__()) + + # derive the wrapping key + z = km.kdf.derive_key(s, km.oid, PubKeyAlgorithm.ECDH, pk.fingerprint) + + # compute C + ct.c = aes_key_wrap(z, m, default_backend()) + + return ct + + def decrypt(self, pk, *args): + km = pk.keymaterial + # assemble the public component of ephemeral key v + v = ec.EllipticCurvePublicNumbers(self.vX, self.vY, km.oid.curve()).public_key(default_backend()) + + # compute s using the inverse of how it was derived during encryption + s = km.__privkey__().exchange(ec.ECDH(), v) + + # derive the wrapping key + z = km.kdf.derive_key(s, km.oid, PubKeyAlgorithm.ECDH, pk.fingerprint) + + # unwrap and unpad m + _m = aes_key_unwrap(z, self.c, default_backend()) + + padder = PKCS7(64).unpadder() + return padder.update(_m) + padder.finalize() + + def __init__(self): + super(ECDHCipherText, self).__init__() + self.c = bytearray(0) + + def __bytearray__(self): + _bytes = bytearray() + _xy = b'\x04' + self.vX.to_mpibytes()[2:] + self.vY.to_mpibytes()[2:] + _bytes += MPI(self.bytes_to_int(_xy, 'big')).to_mpibytes() + _bytes.append(len(self.c)) + _bytes += self.c + + return _bytes + + def parse(self, packet): + # self.v = MPI(packet) + xy = bytearray(MPI(packet).to_mpibytes()[2:]) + del xy[:1] + xylen = len(xy) + x, y = xy[:xylen // 2], xy[xylen // 2:] + self.vX = MPI(self.bytes_to_int(x)) + self.vY = MPI(self.bytes_to_int(y)) + + clen = packet[0] + del packet[0] + + self.c += packet[:clen] + del packet[:clen] diff --git a/src/leap/mx/vendor/pgpy/packet/packets.py b/src/leap/mx/vendor/pgpy/packet/packets.py new file mode 100644 index 0000000..38a7200 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/packet/packets.py @@ -0,0 +1,1605 @@ +""" packet.py +""" +import abc +import binascii +import calendar +import copy +import hashlib +import os +import re + +from datetime import datetime + +import six + +from cryptography.hazmat.primitives import constant_time +from cryptography.hazmat.primitives.asymmetric import padding + +from .fields import DSAPriv, DSAPub, DSASignature +from .fields import ECDSAPub, ECDSAPriv, ECDSASignature +from .fields import ECDHPub, ECDHPriv, ECDHCipherText +from .fields import ElGCipherText, ElGPriv, ElGPub +from .fields import OpaquePubKey +from .fields import OpaquePrivKey +from .fields import RSACipherText, RSAPriv, RSAPub, RSASignature +from .fields import String2Key +from .fields import SubPackets +from .fields import UserAttributeSubPackets + +from .types import Packet +from .types import Primary +from .types import Private +from .types import Public +from .types import Sub +from .types import VersionedPacket + +from ..constants import CompressionAlgorithm +from ..constants import HashAlgorithm +from ..constants import PubKeyAlgorithm +from ..constants import SignatureType +from ..constants import SymmetricKeyAlgorithm +from ..constants import TrustFlags +from ..constants import TrustLevel + +from ..decorators import sdproperty + +from ..errors import PGPDecryptionError + +from ..symenc import _decrypt +from ..symenc import _encrypt + +from ..types import Fingerprint + +__all__ = ['PKESessionKey', + 'PKESessionKeyV3', + 'Signature', + 'SignatureV4', + 'SKESessionKey', + 'SKESessionKeyV4', + 'OnePassSignature', + 'OnePassSignatureV3', + 'PrivKey', + 'PubKey', + 'PubKeyV4', + 'PrivKeyV4', + 'PrivSubKey', + 'PrivSubKeyV4', + 'CompressedData', + 'SKEData', + 'Marker', + 'LiteralData', + 'Trust', + 'UserID', + 'PubSubKey', + 'PubSubKeyV4', + 'UserAttribute', + 'IntegrityProtectedSKEData', + 'IntegrityProtectedSKEDataV1', + 'MDC'] + + +class PKESessionKey(VersionedPacket): + __typeid__ = 0x01 + __ver__ = 0 + + @abc.abstractmethod + def decrypt_sk(self, pk): + raise NotImplementedError() + + @abc.abstractmethod + def encrypt_sk(self, pk, symalg, symkey): + raise NotImplementedError() + + +class PKESessionKeyV3(PKESessionKey): + """ + 5.1. Public-Key Encrypted Session Key Packets (Tag 1) + + A Public-Key Encrypted Session Key packet holds the session key used + to encrypt a message. Zero or more Public-Key Encrypted Session Key + packets and/or Symmetric-Key Encrypted Session Key packets may + precede a Symmetrically Encrypted Data Packet, which holds an + encrypted message. The message is encrypted with the session key, + and the session key is itself encrypted and stored in the Encrypted + Session Key packet(s). The Symmetrically Encrypted Data Packet is + preceded by one Public-Key Encrypted Session Key packet for each + OpenPGP key to which the message is encrypted. The recipient of the + message finds a session key that is encrypted to their public key, + decrypts the session key, and then uses the session key to decrypt + the message. + + The body of this packet consists of: + + - A one-octet number giving the version number of the packet type. + The currently defined value for packet version is 3. + + - An eight-octet number that gives the Key ID of the public key to + which the session key is encrypted. If the session key is + encrypted to a subkey, then the Key ID of this subkey is used + here instead of the Key ID of the primary key. + + - A one-octet number giving the public-key algorithm used. + + - A string of octets that is the encrypted session key. This + string takes up the remainder of the packet, and its contents are + dependent on the public-key algorithm used. + + Algorithm Specific Fields for RSA encryption + + - multiprecision integer (MPI) of RSA encrypted value m**e mod n. + + Algorithm Specific Fields for Elgamal encryption: + + - MPI of Elgamal (Diffie-Hellman) value g**k mod p. + + - MPI of Elgamal (Diffie-Hellman) value m * y**k mod p. + + The value "m" in the above formulas is derived from the session key + as follows. First, the session key is prefixed with a one-octet + algorithm identifier that specifies the symmetric encryption + algorithm used to encrypt the following Symmetrically Encrypted Data + Packet. Then a two-octet checksum is appended, which is equal to the + sum of the preceding session key octets, not including the algorithm + identifier, modulo 65536. This value is then encoded as described in + PKCS#1 block encoding EME-PKCS1-v1_5 in Section 7.2.1 of [RFC3447] to + form the "m" value used in the formulas above. See Section 13.1 of + this document for notes on OpenPGP's use of PKCS#1. + + Note that when an implementation forms several PKESKs with one + session key, forming a message that can be decrypted by several keys, + the implementation MUST make a new PKCS#1 encoding for each key. + + An implementation MAY accept or use a Key ID of zero as a "wild card" + or "speculative" Key ID. In this case, the receiving implementation + would try all available private keys, checking for a valid decrypted + session key. This format helps reduce traffic analysis of messages. + """ + __ver__ = 3 + + @sdproperty + def encrypter(self): + return self._encrypter + + @encrypter.register(bytearray) + def encrypter_bin(self, val): + self._encrypter = binascii.hexlify(val).upper().decode('latin-1') + + @sdproperty + def pkalg(self): + return self._pkalg + + @pkalg.register(int) + @pkalg.register(PubKeyAlgorithm) + def pkalg_int(self, val): + self._pkalg = PubKeyAlgorithm(val) + + _c = {PubKeyAlgorithm.RSAEncryptOrSign: RSACipherText, + PubKeyAlgorithm.RSAEncrypt: RSACipherText, + PubKeyAlgorithm.ElGamal: ElGCipherText, + PubKeyAlgorithm.FormerlyElGamalEncryptOrSign: ElGCipherText, + PubKeyAlgorithm.ECDH: ECDHCipherText} + + ct = _c.get(self._pkalg, None) + self.ct = ct() if ct is not None else ct + + def __init__(self): + super(PKESessionKeyV3, self).__init__() + self.encrypter = bytearray(8) + self.pkalg = 0 + self.ct = None + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(PKESessionKeyV3, self).__bytearray__() + _bytes += binascii.unhexlify(self.encrypter.encode()) + _bytes += bytearray([self.pkalg]) + _bytes += self.ct.__bytearray__() if self.ct is not None else b'\x00' * (self.header.length - 10) + return _bytes + + def __copy__(self): + sk = self.__class__() + sk.header = copy.copy(self.header) + sk._encrypter = self._encrypter + sk.pkalg = self.pkalg + if self.ct is not None: + sk.ct = copy.copy(self.ct) + + return sk + + def decrypt_sk(self, pk): + if self.pkalg == PubKeyAlgorithm.RSAEncryptOrSign: + # pad up ct with null bytes if necessary + ct = self.ct.me_mod_n.to_mpibytes()[2:] + ct = b'\x00' * ((pk.keymaterial.__privkey__().key_size // 8) - len(ct)) + ct + + decrypter = pk.keymaterial.__privkey__().decrypt + decargs = (ct, padding.PKCS1v15(),) + + elif self.pkalg == PubKeyAlgorithm.ECDH: + decrypter = pk + decargs = () + + else: + raise NotImplementedError(self.pkalg) + + m = bytearray(self.ct.decrypt(decrypter, *decargs)) + + """ + The value "m" in the above formulas is derived from the session key + as follows. First, the session key is prefixed with a one-octet + algorithm identifier that specifies the symmetric encryption + algorithm used to encrypt the following Symmetrically Encrypted Data + Packet. Then a two-octet checksum is appended, which is equal to the + sum of the preceding session key octets, not including the algorithm + identifier, modulo 65536. This value is then encoded as described in + PKCS#1 block encoding EME-PKCS1-v1_5 in Section 7.2.1 of [RFC3447] to + form the "m" value used in the formulas above. See Section 13.1 of + this document for notes on OpenPGP's use of PKCS#1. + """ + + symalg = SymmetricKeyAlgorithm(m[0]) + del m[0] + + symkey = m[:symalg.key_size // 8] + del m[:symalg.key_size // 8] + + checksum = self.bytes_to_int(m[:2]) + del m[:2] + + if not sum(symkey) % 65536 == checksum: # pragma: no cover + raise PGPDecryptionError("{:s} decryption failed".format(self.pkalg.name)) + + return (symalg, symkey) + + def encrypt_sk(self, pk, symalg, symkey): + m = bytearray(self.int_to_bytes(symalg) + symkey) + m += self.int_to_bytes(sum(bytearray(symkey)) % 65536, 2) + + if self.pkalg == PubKeyAlgorithm.RSAEncryptOrSign: + encrypter = pk.keymaterial.__pubkey__().encrypt + encargs = (bytes(m), padding.PKCS1v15(),) + + elif self.pkalg == PubKeyAlgorithm.ECDH: + encrypter = pk + encargs = (bytes(m),) + + else: + raise NotImplementedError(self.pkalg) + + self.ct = self.ct.encrypt(encrypter, *encargs) + self.update_hlen() + + def parse(self, packet): + super(PKESessionKeyV3, self).parse(packet) + self.encrypter = packet[:8] + del packet[:8] + + self.pkalg = packet[0] + del packet[0] + + if self.ct is not None: + self.ct.parse(packet) + + else: # pragma: no cover + del packet[:(self.header.length - 18)] + + +class Signature(VersionedPacket): + __typeid__ = 0x02 + __ver__ = 0 + + +class SignatureV4(Signature): + """ + 5.2.3. Version 4 Signature Packet Format + + The body of a version 4 Signature packet contains: + + - One-octet version number (4). + + - One-octet signature type. + + - One-octet public-key algorithm. + + - One-octet hash algorithm. + + - Two-octet scalar octet count for following hashed subpacket data. + Note that this is the length in octets of all of the hashed + subpackets; a pointer incremented by this number will skip over + the hashed subpackets. + + - Hashed subpacket data set (zero or more subpackets). + + - Two-octet scalar octet count for the following unhashed subpacket + data. Note that this is the length in octets of all of the + unhashed subpackets; a pointer incremented by this number will + skip over the unhashed subpackets. + + - Unhashed subpacket data set (zero or more subpackets). + + - Two-octet field holding the left 16 bits of the signed hash + value. + + - One or more multiprecision integers comprising the signature. + This portion is algorithm specific, as described above. + + The concatenation of the data being signed and the signature data + from the version number through the hashed subpacket data (inclusive) + is hashed. The resulting hash value is what is signed. The left 16 + bits of the hash are included in the Signature packet to provide a + quick test to reject some invalid signatures. + + There are two fields consisting of Signature subpackets. The first + field is hashed with the rest of the signature data, while the second + is unhashed. The second set of subpackets is not cryptographically + protected by the signature and should include only advisory + information. + + The algorithms for converting the hash function result to a signature + are described in a section below. + """ + __ver__ = 4 + + @sdproperty + def sigtype(self): + return self._sigtype + + @sigtype.register(int) + @sigtype.register(SignatureType) + def sigtype_int(self, val): + self._sigtype = SignatureType(val) + + @sdproperty + def pubalg(self): + return self._pubalg + + @pubalg.register(int) + @pubalg.register(PubKeyAlgorithm) + def pubalg_int(self, val): + self._pubalg = PubKeyAlgorithm(val) + + sigs = {PubKeyAlgorithm.RSAEncryptOrSign: RSASignature, + PubKeyAlgorithm.RSAEncrypt: RSASignature, + PubKeyAlgorithm.RSASign: RSASignature, + PubKeyAlgorithm.DSA: DSASignature, + PubKeyAlgorithm.ECDSA: ECDSASignature, } + + if self.pubalg in sigs: + self.signature = sigs[self.pubalg]() + + @sdproperty + def halg(self): + return self._halg + + @halg.register(int) + @halg.register(HashAlgorithm) + def halg_int(self, val): + try: + self._halg = HashAlgorithm(val) + + except ValueError: # pragma: no cover + self._halg = val + + @property + def signature(self): + return self._signature + + @signature.setter + def signature(self, val): + self._signature = val + + @property + def signer(self): + return self.subpackets['Issuer'][-1].issuer + + def __init__(self): + super(Signature, self).__init__() + self._sigtype = None + self._pubalg = None + self._halg = None + self.subpackets = SubPackets() + self.hash2 = bytearray(2) + self.signature = None + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(Signature, self).__bytearray__() + _bytes += self.int_to_bytes(self.sigtype) + _bytes += self.int_to_bytes(self.pubalg) + _bytes += self.int_to_bytes(self.halg) + _bytes += self.subpackets.__bytearray__() + _bytes += self.hash2 + _bytes += self.signature.__bytearray__() + + return _bytes + + def __copy__(self): + spkt = SignatureV4() + spkt.header = copy.copy(self.header) + spkt._sigtype = self._sigtype + spkt._pubalg = self._pubalg + spkt._halg = self._halg + + spkt.subpackets = copy.copy(self.subpackets) + spkt.hash2 = copy.copy(self.hash2) + spkt.signature = copy.copy(self.signature) + + return spkt + + def update_hlen(self): + self.subpackets.update_hlen() + super(SignatureV4, self).update_hlen() + + def parse(self, packet): + super(Signature, self).parse(packet) + self.sigtype = packet[0] + del packet[0] + + self.pubalg = packet[0] + del packet[0] + + self.halg = packet[0] + del packet[0] + + self.subpackets.parse(packet) + + self.hash2 = packet[:2] + del packet[:2] + + self.signature.parse(packet) + + +class SKESessionKey(VersionedPacket): + __typeid__ = 0x03 + __ver__ = 0 + + @abc.abstractmethod + def decrypt_sk(self, passphrase): + raise NotImplementedError() + + @abc.abstractmethod + def encrypt_sk(self, passphrase, sk): + raise NotImplementedError() + + +class SKESessionKeyV4(SKESessionKey): + """ + 5.3. Symmetric-Key Encrypted Session Key Packets (Tag 3) + + The Symmetric-Key Encrypted Session Key packet holds the + symmetric-key encryption of a session key used to encrypt a message. + Zero or more Public-Key Encrypted Session Key packets and/or + Symmetric-Key Encrypted Session Key packets may precede a + Symmetrically Encrypted Data packet that holds an encrypted message. + The message is encrypted with a session key, and the session key is + itself encrypted and stored in the Encrypted Session Key packet or + the Symmetric-Key Encrypted Session Key packet. + + If the Symmetrically Encrypted Data packet is preceded by one or + more Symmetric-Key Encrypted Session Key packets, each specifies a + passphrase that may be used to decrypt the message. This allows a + message to be encrypted to a number of public keys, and also to one + or more passphrases. This packet type is new and is not generated + by PGP 2.x or PGP 5.0. + + The body of this packet consists of: + + - A one-octet version number. The only currently defined version + is 4. + + - A one-octet number describing the symmetric algorithm used. + + - A string-to-key (S2K) specifier, length as defined above. + + - Optionally, the encrypted session key itself, which is decrypted + with the string-to-key object. + + If the encrypted session key is not present (which can be detected + on the basis of packet length and S2K specifier size), then the S2K + algorithm applied to the passphrase produces the session key for + decrypting the file, using the symmetric cipher algorithm from the + Symmetric-Key Encrypted Session Key packet. + + If the encrypted session key is present, the result of applying the + S2K algorithm to the passphrase is used to decrypt just that + encrypted session key field, using CFB mode with an IV of all zeros. + The decryption result consists of a one-octet algorithm identifier + that specifies the symmetric-key encryption algorithm used to + encrypt the following Symmetrically Encrypted Data packet, followed + by the session key octets themselves. + + Note: because an all-zero IV is used for this decryption, the S2K + specifier MUST use a salt value, either a Salted S2K or an + Iterated-Salted S2K. The salt value will ensure that the decryption + key is not repeated even if the passphrase is reused. + """ + __ver__ = 4 + + @property + def symalg(self): + return self.s2k.encalg + + def __init__(self): + super(SKESessionKeyV4, self).__init__() + self.s2k = String2Key() + self.ct = bytearray() + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(SKESessionKeyV4, self).__bytearray__() + _bytes += self.s2k.__bytearray__()[1:] + _bytes += self.ct + return _bytes + + def __copy__(self): + sk = self.__class__() + sk.header = copy.copy(self.header) + sk.s2k = copy.copy(self.s2k) + sk.ct = self.ct[:] + + return sk + + def parse(self, packet): + super(SKESessionKeyV4, self).parse(packet) + # prepend a valid usage identifier so this parses correctly + packet.insert(0, 255) + self.s2k.parse(packet, iv=False) + + ctend = self.header.length - len(self.s2k) + self.ct = packet[:ctend] + del packet[:ctend] + + def decrypt_sk(self, passphrase): + # derive the first session key from our passphrase + sk = self.s2k.derive_key(passphrase) + del passphrase + + # if there is no ciphertext, then the first session key is the session key being used + if len(self.ct) == 0: + return self.symalg, sk + + # otherwise, we now need to decrypt the encrypted session key + m = bytearray(_decrypt(bytes(self.ct), sk, self.symalg)) + del sk + + symalg = SymmetricKeyAlgorithm(m[0]) + del m[0] + + return symalg, bytes(m) + + def encrypt_sk(self, passphrase, sk): + # generate the salt and derive the key to encrypt sk with from it + self.s2k.salt = bytearray(os.urandom(8)) + esk = self.s2k.derive_key(passphrase) + del passphrase + + self.ct = _encrypt(self.int_to_bytes(self.symalg) + sk, esk, self.symalg) + + # update header length and return sk + self.update_hlen() + + +class OnePassSignature(VersionedPacket): + __typeid__ = 0x04 + __ver__ = 0 + + +class OnePassSignatureV3(OnePassSignature): + """ + 5.4. One-Pass Signature Packets (Tag 4) + + The One-Pass Signature packet precedes the signed data and contains + enough information to allow the receiver to begin calculating any + hashes needed to verify the signature. It allows the Signature + packet to be placed at the end of the message, so that the signer + can compute the entire signed message in one pass. + + A One-Pass Signature does not interoperate with PGP 2.6.x or + earlier. + + The body of this packet consists of: + + - A one-octet version number. The current version is 3. + + - A one-octet signature type. Signature types are described in + Section 5.2.1. + + - A one-octet number describing the hash algorithm used. + + - A one-octet number describing the public-key algorithm used. + + - An eight-octet number holding the Key ID of the signing key. + + - A one-octet number holding a flag showing whether the signature + is nested. A zero value indicates that the next packet is + another One-Pass Signature packet that describes another + signature to be applied to the same message data. + + Note that if a message contains more than one one-pass signature, + then the Signature packets bracket the message; that is, the first + Signature packet after the message corresponds to the last one-pass + packet and the final Signature packet corresponds to the first + one-pass packet. + """ + __ver__ = 3 + + @sdproperty + def sigtype(self): + return self._sigtype + + @sigtype.register(int) + @sigtype.register(SignatureType) + def sigtype_int(self, val): + self._sigtype = SignatureType(val) + + @sdproperty + def pubalg(self): + return self._pubalg + + @pubalg.register(int) + @pubalg.register(PubKeyAlgorithm) + def pubalg_int(self, val): + self._pubalg = PubKeyAlgorithm(val) + if self._pubalg in [PubKeyAlgorithm.RSAEncryptOrSign, PubKeyAlgorithm.RSAEncrypt, PubKeyAlgorithm.RSASign]: + self.signature = RSASignature() + + elif self._pubalg == PubKeyAlgorithm.DSA: + self.signature = DSASignature() + + @sdproperty + def halg(self): + return self._halg + + @halg.register(int) + @halg.register(HashAlgorithm) + def halg_int(self, val): + try: + self._halg = HashAlgorithm(val) + + except ValueError: # pragma: no cover + self._halg = val + + @sdproperty + def signer(self): + return self._signer + + @signer.register(str) + @signer.register(six.text_type) + def signer_str(self, val): + self._signer = val + + @signer.register(bytearray) + def signer_bin(self, val): + self._signer = binascii.hexlify(val).upper().decode('latin-1') + + def __init__(self): + super(OnePassSignatureV3, self).__init__() + self._sigtype = None + self._halg = None + self._pubalg = None + self._signer = b'\x00' * 8 + self.nested = False + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(OnePassSignatureV3, self).__bytearray__() + _bytes += bytearray([self.sigtype]) + _bytes += bytearray([self.halg]) + _bytes += bytearray([self.pubalg]) + _bytes += binascii.unhexlify(six.b(self.signer)) + _bytes += bytearray([int(self.nested)]) + return _bytes + + def parse(self, packet): + super(OnePassSignatureV3, self).parse(packet) + self.sigtype = packet[0] + del packet[0] + + self.halg = packet[0] + del packet[0] + + self.pubalg = packet[0] + del packet[0] + + self.signer = packet[:8] + del packet[:8] + + self.nested = (packet[0] == 1) + del packet[0] + + +class PrivKey(VersionedPacket, Primary, Private): + __typeid__ = 0x05 + __ver__ = 0 + + +class PubKey(VersionedPacket, Primary, Public): + __typeid__ = 0x06 + __ver__ = 0 + + @abc.abstractproperty + def fingerprint(self): + """compute and return the fingerprint of the key""" + + +class PubKeyV4(PubKey): + __ver__ = 4 + + @sdproperty + def created(self): + return self._created + + @created.register(datetime) + def created_datetime(self, val): + self._created = val + + @created.register(int) + def created_int(self, val): + self.created = datetime.utcfromtimestamp(val) + + @created.register(bytes) + @created.register(bytearray) + def created_bin(self, val): + self.created = self.bytes_to_int(val) + + @sdproperty + def pkalg(self): + return self._pkalg + + @pkalg.register(int) + @pkalg.register(PubKeyAlgorithm) + def pkalg_int(self, val): + self._pkalg = PubKeyAlgorithm(val) + + _c = { + # True means public + (True, PubKeyAlgorithm.RSAEncryptOrSign): RSAPub, + (True, PubKeyAlgorithm.RSAEncrypt): RSAPub, + (True, PubKeyAlgorithm.RSASign): RSAPub, + (True, PubKeyAlgorithm.DSA): DSAPub, + (True, PubKeyAlgorithm.ElGamal): ElGPub, + (True, PubKeyAlgorithm.FormerlyElGamalEncryptOrSign): ElGPub, + (True, PubKeyAlgorithm.ECDSA): ECDSAPub, + (True, PubKeyAlgorithm.ECDH): ECDHPub, + # False means private + (False, PubKeyAlgorithm.RSAEncryptOrSign): RSAPriv, + (False, PubKeyAlgorithm.RSAEncrypt): RSAPriv, + (False, PubKeyAlgorithm.RSASign): RSAPriv, + (False, PubKeyAlgorithm.DSA): DSAPriv, + (False, PubKeyAlgorithm.ElGamal): ElGPriv, + (False, PubKeyAlgorithm.FormerlyElGamalEncryptOrSign): ElGPriv, + (False, PubKeyAlgorithm.ECDSA): ECDSAPriv, + (False, PubKeyAlgorithm.ECDH): ECDHPriv, + } + + k = (self.public, self.pkalg) + km = _c.get(k, None) + + self.keymaterial = (km or (OpaquePubKey if self.public else OpaquePrivKey))() + + # km = _c.get(k, None) + # self.keymaterial = km() if km is not None else km + + @property + def public(self): + return isinstance(self, PubKey) and not isinstance(self, PrivKey) + + @property + def fingerprint(self): + # A V4 fingerprint is the 160-bit SHA-1 hash of the octet 0x99, followed by the two-octet packet length, + # followed by the entire Public-Key packet starting with the version field. The Key ID is the + # low-order 64 bits of the fingerprint. + fp = hashlib.new('sha1') + + plen = self.keymaterial.publen() + bcde_len = self.int_to_bytes(6 + plen, 2) + + # a.1) 0x99 (1 octet) + # a.2) high-order length octet + # a.3) low-order length octet + fp.update(b'\x99' + bcde_len[:1] + bcde_len[-1:]) + # b) version number = 4 (1 octet); + fp.update(b'\x04') + # c) timestamp of key creation (4 octets); + fp.update(self.int_to_bytes(calendar.timegm(self.created.timetuple()), 4)) + # d) algorithm (1 octet): 17 = DSA (example); + fp.update(self.int_to_bytes(self.pkalg)) + # e) Algorithm-specific fields. + fp.update(self.keymaterial.__bytearray__()[:plen]) + + # and return the digest + return Fingerprint(fp.hexdigest().upper()) + + def __init__(self): + super(PubKeyV4, self).__init__() + self.created = datetime.utcnow() + self.pkalg = 0 + self.keymaterial = None + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(PubKeyV4, self).__bytearray__() + _bytes += self.int_to_bytes(calendar.timegm(self.created.timetuple()), 4) + _bytes += self.int_to_bytes(self.pkalg) + _bytes += self.keymaterial.__bytearray__() + return _bytes + + def __copy__(self): + pk = self.__class__() + pk.header = copy.copy(self.header) + pk.created = self.created + pk.pkalg = self.pkalg + pk.keymaterial = copy.copy(self.keymaterial) + + return pk + + def verify(self, subj, sigbytes, hash_alg): + return self.keymaterial.verify(subj, sigbytes, hash_alg) + + def parse(self, packet): + super(PubKeyV4, self).parse(packet) + + self.created = packet[:4] + del packet[:4] + + self.pkalg = packet[0] + del packet[0] + + # bound keymaterial to the remaining length of the packet + pend = self.header.length - 6 + self.keymaterial.parse(packet[:pend]) + del packet[:pend] + + +class PrivKeyV4(PrivKey, PubKeyV4): + __ver__ = 4 + + @classmethod + def new(cls, key_algorithm, key_size): + # build a key packet + pk = PrivKeyV4() + pk.pkalg = key_algorithm + if pk.keymaterial is None: + raise NotImplementedError(key_algorithm) + pk.keymaterial._generate(key_size) + pk.update_hlen() + return pk + + def pubkey(self): + # return a copy of ourselves, but just the public half + pk = PubKeyV4() if not isinstance(self, PrivSubKeyV4) else PubSubKeyV4() + pk.created = self.created + pk.pkalg = self.pkalg + + # copy over MPIs + for pm in self.keymaterial.__pubfields__: + setattr(pk.keymaterial, pm, copy.copy(getattr(self.keymaterial, pm))) + + if self.pkalg == PubKeyAlgorithm.ECDSA: + pk.keymaterial.oid = self.keymaterial.oid + + if self.pkalg == PubKeyAlgorithm.ECDH: + pk.keymaterial.oid = self.keymaterial.oid + pk.keymaterial.kdf = copy.copy(self.keymaterial.kdf) + + pk.update_hlen() + return pk + + @property + def protected(self): + return bool(self.keymaterial.s2k) + + @property + def unlocked(self): + if self.protected: + return 0 not in list(self.keymaterial) + return True # pragma: no cover + + def protect(self, passphrase, enc_alg, hash_alg): + self.keymaterial.encrypt_keyblob(passphrase, enc_alg, hash_alg) + del passphrase + self.update_hlen() + + def unprotect(self, passphrase): + self.keymaterial.decrypt_keyblob(passphrase) + del passphrase + + def sign(self, sigdata, hash_alg): + return self.keymaterial.sign(sigdata, hash_alg) + + +class PrivSubKey(VersionedPacket, Sub, Private): + __typeid__ = 0x07 + __ver__ = 0 + + +class PrivSubKeyV4(PrivSubKey, PrivKeyV4): + __ver__ = 4 + + +class CompressedData(Packet): + """ + 5.6. Compressed Data Packet (Tag 8) + + The Compressed Data packet contains compressed data. Typically, this + packet is found as the contents of an encrypted packet, or following + a Signature or One-Pass Signature packet, and contains a literal data + packet. + + The body of this packet consists of: + + - One octet that gives the algorithm used to compress the packet. + + - Compressed data, which makes up the remainder of the packet. + + A Compressed Data Packet's body contains an block that compresses + some set of packets. See section "Packet Composition" for details on + how messages are formed. + + ZIP-compressed packets are compressed with raw RFC 1951 [RFC1951] + DEFLATE blocks. Note that PGP V2.6 uses 13 bits of compression. If + an implementation uses more bits of compression, PGP V2.6 cannot + decompress it. + + ZLIB-compressed packets are compressed with RFC 1950 [RFC1950] ZLIB- + style blocks. + + BZip2-compressed packets are compressed using the BZip2 [BZ2] + algorithm. + """ + __typeid__ = 0x08 + + @sdproperty + def calg(self): + return self._calg + + @calg.register(int) + @calg.register(CompressionAlgorithm) + def calg_int(self, val): + self._calg = CompressionAlgorithm(val) + + def __init__(self): + super(CompressedData, self).__init__() + self._calg = None + self.packets = [] + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(CompressedData, self).__bytearray__() + _bytes += bytearray([self.calg]) + + _pb = bytearray() + for pkt in self.packets: + _pb += pkt.__bytearray__() + _bytes += self.calg.compress(bytes(_pb)) + + return _bytes + + def parse(self, packet): + super(CompressedData, self).parse(packet) + self.calg = packet[0] + del packet[0] + + cdata = bytearray(self.calg.decompress(packet[:self.header.length - 1])) + del packet[:self.header.length - 1] + + while len(cdata) > 0: + self.packets.append(Packet(cdata)) + + +class SKEData(Packet): + """ + 5.7. Symmetrically Encrypted Data Packet (Tag 9) + + The Symmetrically Encrypted Data packet contains data encrypted with + a symmetric-key algorithm. When it has been decrypted, it contains + other packets (usually a literal data packet or compressed data + packet, but in theory other Symmetrically Encrypted Data packets or + sequences of packets that form whole OpenPGP messages). + + The body of this packet consists of: + + - Encrypted data, the output of the selected symmetric-key cipher + operating in OpenPGP's variant of Cipher Feedback (CFB) mode. + + The symmetric cipher used may be specified in a Public-Key or + Symmetric-Key Encrypted Session Key packet that precedes the + Symmetrically Encrypted Data packet. In that case, the cipher + algorithm octet is prefixed to the session key before it is + encrypted. If no packets of these types precede the encrypted data, + the IDEA algorithm is used with the session key calculated as the MD5 + hash of the passphrase, though this use is deprecated. + + The data is encrypted in CFB mode, with a CFB shift size equal to the + cipher's block size. The Initial Vector (IV) is specified as all + zeros. Instead of using an IV, OpenPGP prefixes a string of length + equal to the block size of the cipher plus two to the data before it + is encrypted. The first block-size octets (for example, 8 octets for + a 64-bit block length) are random, and the following two octets are + copies of the last two octets of the IV. For example, in an 8-octet + block, octet 9 is a repeat of octet 7, and octet 10 is a repeat of + octet 8. In a cipher of length 16, octet 17 is a repeat of octet 15 + and octet 18 is a repeat of octet 16. As a pedantic clarification, + in both these examples, we consider the first octet to be numbered 1. + + After encrypting the first block-size-plus-two octets, the CFB state + is resynchronized. The last block-size octets of ciphertext are + passed through the cipher and the block boundary is reset. + + The repetition of 16 bits in the random data prefixed to the message + allows the receiver to immediately check whether the session key is + incorrect. See the "Security Considerations" section for hints on + the proper use of this "quick check". + """ + __typeid__ = 0x09 + + def __init__(self): + super(SKEData, self).__init__() + self.ct = bytearray() + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(SKEData, self).__bytearray__() + _bytes += self.ct + return _bytes + + def __copy__(self): + skd = self.__class__() + skd.ct = self.ct[:] + return skd + + def parse(self, packet): + super(SKEData, self).parse(packet) + self.ct = packet[:self.header.length] + del packet[:self.header.length] + + def decrypt(self, key, alg): # pragma: no cover + pt = _decrypt(bytes(self.ct), bytes(key), alg) + + iv = bytes(pt[:alg.block_size // 8]) + del pt[:alg.block_size // 8] + + ivl2 = bytes(pt[:2]) + del pt[:2] + + if not constant_time.bytes_eq(iv[-2:], ivl2): + raise PGPDecryptionError("Decryption failed") + + return pt + + +class Marker(Packet): + __typeid__ = 0x0a + + def __init__(self): + super(Marker, self).__init__() + self.data = b'PGP' + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(Marker, self).__bytearray__() + _bytes += self.data + return _bytes + + def parse(self, packet): + super(Marker, self).parse(packet) + self.data = packet[:self.header.length] + del packet[:self.header.length] + + +class LiteralData(Packet): + """ + 5.9. Literal Data Packet (Tag 11) + + A Literal Data packet contains the body of a message; data that is + not to be further interpreted. + + The body of this packet consists of: + + - A one-octet field that describes how the data is formatted. + + If it is a 'b' (0x62), then the Literal packet contains binary data. + If it is a 't' (0x74), then it contains text data, and thus may need + line ends converted to local form, or other text-mode changes. The + tag 'u' (0x75) means the same as 't', but also indicates that + implementation believes that the literal data contains UTF-8 text. + + Early versions of PGP also defined a value of 'l' as a 'local' mode + for machine-local conversions. RFC 1991 [RFC1991] incorrectly stated + this local mode flag as '1' (ASCII numeral one). Both of these local + modes are deprecated. + + - File name as a string (one-octet length, followed by a file + name). This may be a zero-length string. Commonly, if the + source of the encrypted data is a file, this will be the name of + the encrypted file. An implementation MAY consider the file name + in the Literal packet to be a more authoritative name than the + actual file name. + + If the special name "_CONSOLE" is used, the message is considered to + be "for your eyes only". This advises that the message data is + unusually sensitive, and the receiving program should process it more + carefully, perhaps avoiding storing the received data to disk, for + example. + + - A four-octet number that indicates a date associated with the + literal data. Commonly, the date might be the modification date + of a file, or the time the packet was created, or a zero that + indicates no specific time. + + - The remainder of the packet is literal data. + + Text data is stored with <CR><LF> text endings (i.e., network- + normal line endings). These should be converted to native line + endings by the receiving software. + """ + __typeid__ = 0x0B + + @sdproperty + def mtime(self): + return self._mtime + + @mtime.register(datetime) + def mtime_datetime(self, val): + self._mtime = val + + @mtime.register(int) + def mtime_int(self, val): + self.mtime = datetime.utcfromtimestamp(val) + + @mtime.register(bytes) + @mtime.register(bytearray) + def mtime_bin(self, val): + self.mtime = self.bytes_to_int(val) + + @property + def contents(self): + if self.format == 't': + return self._contents.decode('latin-1') + + if self.format == 'u': + return self._contents.decode('utf-8') + + return self._contents + + def __init__(self): + super(LiteralData, self).__init__() + self.format = 'b' + self.filename = '' + self.mtime = datetime.utcnow() + self._contents = bytearray() + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(LiteralData, self).__bytearray__() + _bytes += self.format.encode('latin-1') + _bytes += bytearray([len(self.filename)]) + _bytes += self.filename.encode('latin-1') + _bytes += self.int_to_bytes(calendar.timegm(self.mtime.timetuple()), 4) + _bytes += self._contents + return _bytes + + def __copy__(self): + pkt = LiteralData() + pkt.header = copy.copy(self.header) + pkt.format = self.format + pkt.filename = self.filename + pkt.mtime = self.mtime + pkt._contents = self._contents[:] + + return pkt + + def parse(self, packet): + super(LiteralData, self).parse(packet) + self.format = chr(packet[0]) + del packet[0] + + fnl = packet[0] + del packet[0] + + self.filename = packet[:fnl].decode() + del packet[:fnl] + + self.mtime = packet[:4] + del packet[:4] + + self._contents = packet[:self.header.length - (6 + fnl)] + del packet[:self.header.length - (6 + fnl)] + + +class Trust(Packet): + """ + 5.10. Trust Packet (Tag 12) + + The Trust packet is used only within keyrings and is not normally + exported. Trust packets contain data that record the user's + specifications of which key holders are trustworthy introducers, + along with other information that implementing software uses for + trust information. The format of Trust packets is defined by a given + implementation. + + Trust packets SHOULD NOT be emitted to output streams that are + transferred to other users, and they SHOULD be ignored on any input + other than local keyring files. + """ + __typeid__ = 0x0C + + @sdproperty + def trustlevel(self): + return self._trustlevel + + @trustlevel.register(int) + @trustlevel.register(TrustLevel) + def trustlevel_int(self, val): + self._trustlevel = TrustLevel(val & 0x0F) + + @sdproperty + def trustflags(self): + return self._trustflags + + @trustflags.register(list) + def trustflags_list(self, val): + self._trustflags = val + + @trustflags.register(int) + def trustflags_int(self, val): + self._trustflags = TrustFlags & val + + def __init__(self): + super(Trust, self).__init__() + self.trustlevel = TrustLevel.Unknown + self.trustflags = [] + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(Trust, self).__bytearray__() + _bytes += self.int_to_bytes(self.trustlevel + sum(self.trustflags), 2) + return _bytes + + def parse(self, packet): + super(Trust, self).parse(packet) + # self.trustlevel = packet[0] & 0x1f + t = self.bytes_to_int(packet[:2]) + del packet[:2] + + self.trustlevel = t + self.trustflags = t + + +class UserID(Packet): + """ + 5.11. User ID Packet (Tag 13) + + A User ID packet consists of UTF-8 text that is intended to represent + the name and email address of the key holder. By convention, it + includes an RFC 2822 [RFC2822] mail name-addr, but there are no + restrictions on its content. The packet length in the header + specifies the length of the User ID. + """ + __typeid__ = 0x0D + + def __init__(self): + super(UserID, self).__init__() + self.name = "" + self.comment = "" + self.email = "" + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(UserID, self).__bytearray__() + _bytes += self.text_to_bytes(self.name) + if self.comment: + _bytes += b' (' + self.text_to_bytes(self.comment) + b')' + + if self.email: + _bytes += b' <' + self.text_to_bytes(self.email) + b'>' + + return _bytes + + def __copy__(self): + uid = UserID() + uid.header = copy.copy(self.header) + uid.name = self.name + uid.comment = self.comment + uid.email = self.email + return uid + + def parse(self, packet): + super(UserID, self).parse(packet) + + uid_text = packet[:self.header.length].decode('latin-1') + del packet[:self.header.length] + + # came across a UID packet with no payload. If that happens, don't bother trying to parse anything! + if self.header.length > 0: + uid = re.match(r"""^ + # name should always match something + (?P<name>.+?) + # comment *optionally* matches text in parens following name + # this should never come after email and must be followed immediately by + # either the email field, or the end of the packet. + (\ \((?P<comment>.+?)\)(?=(\ <|$)))? + # email *optionally* matches text in angle brackets following name or comment + # this should never come before a comment, if comment exists, + # but can immediately follow name if comment does not exist + (\ <(?P<email>.+)>)? + $ + """, uid_text, flags=re.VERBOSE).groupdict() + + self.name = uid['name'] + self.comment = uid['comment'] or "" + self.email = uid['email'] or "" + + +class PubSubKey(VersionedPacket, Sub, Public): + __typeid__ = 0x0E + __ver__ = 0 + + +class PubSubKeyV4(PubSubKey, PubKeyV4): + __ver__ = 4 + + +class UserAttribute(Packet): + """ + 5.12. User Attribute Packet (Tag 17) + + The User Attribute packet is a variation of the User ID packet. It + is capable of storing more types of data than the User ID packet, + which is limited to text. Like the User ID packet, a User Attribute + packet may be certified by the key owner ("self-signed") or any other + key owner who cares to certify it. Except as noted, a User Attribute + packet may be used anywhere that a User ID packet may be used. + + While User Attribute packets are not a required part of the OpenPGP + standard, implementations SHOULD provide at least enough + compatibility to properly handle a certification signature on the + User Attribute packet. A simple way to do this is by treating the + User Attribute packet as a User ID packet with opaque contents, but + an implementation may use any method desired. + + The User Attribute packet is made up of one or more attribute + subpackets. Each subpacket consists of a subpacket header and a + body. The header consists of: + + - the subpacket length (1, 2, or 5 octets) + + - the subpacket type (1 octet) + + and is followed by the subpacket specific data. + + The only currently defined subpacket type is 1, signifying an image. + An implementation SHOULD ignore any subpacket of a type that it does + not recognize. Subpacket types 100 through 110 are reserved for + private or experimental use. + """ + __typeid__ = 0x11 + + @property + def image(self): + if 'Image' not in self.subpackets: + self.subpackets.addnew('Image') + return next(iter(self.subpackets['Image'])) + + def __init__(self): + super(UserAttribute, self).__init__() + self.subpackets = UserAttributeSubPackets() + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(UserAttribute, self).__bytearray__() + _bytes += self.subpackets.__bytearray__() + return _bytes + + def parse(self, packet): + super(UserAttribute, self).parse(packet) + + plen = len(packet) + while self.header.length > (plen - len(packet)): + self.subpackets.parse(packet) + + def update_hlen(self): + self.subpackets.update_hlen() + super(UserAttribute, self).update_hlen() + + +class IntegrityProtectedSKEData(VersionedPacket): + __typeid__ = 0x12 + __ver__ = 0 + + +class IntegrityProtectedSKEDataV1(IntegrityProtectedSKEData): + """ + 5.13. Sym. Encrypted Integrity Protected Data Packet (Tag 18) + + The Symmetrically Encrypted Integrity Protected Data packet is a + variant of the Symmetrically Encrypted Data packet. It is a new + feature created for OpenPGP that addresses the problem of detecting a + modification to encrypted data. It is used in combination with a + Modification Detection Code packet. + + There is a corresponding feature in the features Signature subpacket + that denotes that an implementation can properly use this packet + type. An implementation MUST support decrypting these packets and + SHOULD prefer generating them to the older Symmetrically Encrypted + Data packet when possible. Since this data packet protects against + modification attacks, this standard encourages its proliferation. + While blanket adoption of this data packet would create + interoperability problems, rapid adoption is nevertheless important. + An implementation SHOULD specifically denote support for this packet, + but it MAY infer it from other mechanisms. + + For example, an implementation might infer from the use of a cipher + such as Advanced Encryption Standard (AES) or Twofish that a user + supports this feature. It might place in the unhashed portion of + another user's key signature a Features subpacket. It might also + present a user with an opportunity to regenerate their own self- + signature with a Features subpacket. + + This packet contains data encrypted with a symmetric-key algorithm + and protected against modification by the SHA-1 hash algorithm. When + it has been decrypted, it will typically contain other packets (often + a Literal Data packet or Compressed Data packet). The last decrypted + packet in this packet's payload MUST be a Modification Detection Code + packet. + + The body of this packet consists of: + + - A one-octet version number. The only currently defined value is + 1. + + - Encrypted data, the output of the selected symmetric-key cipher + operating in Cipher Feedback mode with shift amount equal to the + block size of the cipher (CFB-n where n is the block size). + + The symmetric cipher used MUST be specified in a Public-Key or + Symmetric-Key Encrypted Session Key packet that precedes the + Symmetrically Encrypted Data packet. In either case, the cipher + algorithm octet is prefixed to the session key before it is + encrypted. + + The data is encrypted in CFB mode, with a CFB shift size equal to the + cipher's block size. The Initial Vector (IV) is specified as all + zeros. Instead of using an IV, OpenPGP prefixes an octet string to + the data before it is encrypted. The length of the octet string + equals the block size of the cipher in octets, plus two. The first + octets in the group, of length equal to the block size of the cipher, + are random; the last two octets are each copies of their 2nd + preceding octet. For example, with a cipher whose block size is 128 + bits or 16 octets, the prefix data will contain 16 random octets, + then two more octets, which are copies of the 15th and 16th octets, + respectively. Unlike the Symmetrically Encrypted Data Packet, no + special CFB resynchronization is done after encrypting this prefix + data. See "OpenPGP CFB Mode" below for more details. + + The repetition of 16 bits in the random data prefixed to the message + allows the receiver to immediately check whether the session key is + incorrect. + + The plaintext of the data to be encrypted is passed through the SHA-1 + hash function, and the result of the hash is appended to the + plaintext in a Modification Detection Code packet. The input to the + hash function includes the prefix data described above; it includes + all of the plaintext, and then also includes two octets of values + 0xD3, 0x14. These represent the encoding of a Modification Detection + Code packet tag and length field of 20 octets. + + The resulting hash value is stored in a Modification Detection Code + (MDC) packet, which MUST use the two octet encoding just given to + represent its tag and length field. The body of the MDC packet is + the 20-octet output of the SHA-1 hash. + + The Modification Detection Code packet is appended to the plaintext + and encrypted along with the plaintext using the same CFB context. + + During decryption, the plaintext data should be hashed with SHA-1, + including the prefix data as well as the packet tag and length field + of the Modification Detection Code packet. The body of the MDC + packet, upon decryption, is compared with the result of the SHA-1 + hash. + + Any failure of the MDC indicates that the message has been modified + and MUST be treated as a security problem. Failures include a + difference in the hash values, but also the absence of an MDC packet, + or an MDC packet in any position other than the end of the plaintext. + Any failure SHOULD be reported to the user. + + Note: future designs of new versions of this packet should consider + rollback attacks since it will be possible for an attacker to change + the version back to 1. + """ + __ver__ = 1 + + def __init__(self): + super(IntegrityProtectedSKEDataV1, self).__init__() + self.ct = bytearray() + + def __bytearray__(self): + _bytes = bytearray() + _bytes += super(IntegrityProtectedSKEDataV1, self).__bytearray__() + _bytes += self.ct + return _bytes + + def __copy__(self): + skd = self.__class__() + skd.ct = self.ct[:] + return skd + + def parse(self, packet): + super(IntegrityProtectedSKEDataV1, self).parse(packet) + self.ct = packet[:self.header.length - 1] + del packet[:self.header.length - 1] + + def encrypt(self, key, alg, data): + iv = alg.gen_iv() + data = iv + iv[-2:] + data + + mdc = MDC() + mdc.mdc = binascii.hexlify(hashlib.new('SHA1', data + b'\xd3\x14').digest()) + mdc.update_hlen() + + data += mdc.__bytes__() + self.ct = _encrypt(data, key, alg) + self.update_hlen() + + def decrypt(self, key, alg): + # iv, ivl2, pt = super(IntegrityProtectedSKEDataV1, self).decrypt(key, alg) + pt = _decrypt(bytes(self.ct), bytes(key), alg) + + # do the MDC checks + _expected_mdcbytes = b'\xd3\x14' + hashlib.new('SHA1', pt[:-20]).digest() + if not constant_time.bytes_eq(bytes(pt[-22:]), _expected_mdcbytes): + raise PGPDecryptionError("Decryption failed") # pragma: no cover + + iv = bytes(pt[:alg.block_size // 8]) + del pt[:alg.block_size // 8] + + ivl2 = bytes(pt[:2]) + del pt[:2] + + if not constant_time.bytes_eq(iv[-2:], ivl2): + raise PGPDecryptionError("Decryption failed") # pragma: no cover + + return pt + + +class MDC(Packet): + """ + 5.14. Modification Detection Code Packet (Tag 19) + + The Modification Detection Code packet contains a SHA-1 hash of + plaintext data, which is used to detect message modification. It is + only used with a Symmetrically Encrypted Integrity Protected Data + packet. The Modification Detection Code packet MUST be the last + packet in the plaintext data that is encrypted in the Symmetrically + Encrypted Integrity Protected Data packet, and MUST appear in no + other place. + + A Modification Detection Code packet MUST have a length of 20 octets. + The body of this packet consists of: + + - A 20-octet SHA-1 hash of the preceding plaintext data of the + Symmetrically Encrypted Integrity Protected Data packet, + including prefix data, the tag octet, and length octet of the + Modification Detection Code packet. + + Note that the Modification Detection Code packet MUST always use a + new format encoding of the packet tag, and a one-octet encoding of + the packet length. The reason for this is that the hashing rules for + modification detection include a one-octet tag and one-octet length + in the data hash. While this is a bit restrictive, it reduces + complexity. + """ + __typeid__ = 0x13 + + def __init__(self): + super(MDC, self).__init__() + self.mdc = '' + + def __bytearray__(self): + return super(MDC, self).__bytearray__() + binascii.unhexlify(self.mdc) + + def parse(self, packet): + super(MDC, self).parse(packet) + self.mdc = binascii.hexlify(packet[:20]) + del packet[:20] diff --git a/src/leap/mx/vendor/pgpy/packet/subpackets/__init__.py b/src/leap/mx/vendor/pgpy/packet/subpackets/__init__.py new file mode 100644 index 0000000..b1e90a5 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/packet/subpackets/__init__.py @@ -0,0 +1,7 @@ +from .types import Signature as Signature +from .types import UserAttribute as UserAttribute + +from .signature import * # NOQA +from .userattribute import * # NOQA + +__all__ = ['Signature', 'UserAttribute'] diff --git a/src/leap/mx/vendor/pgpy/packet/subpackets/signature.py b/src/leap/mx/vendor/pgpy/packet/subpackets/signature.py new file mode 100644 index 0000000..8993009 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/packet/subpackets/signature.py @@ -0,0 +1,887 @@ +""" signature.py + +Signature SubPackets +""" +import binascii +import calendar + +from datetime import datetime +from datetime import timedelta + +import six + +from .types import EmbeddedSignatureHeader +from .types import Signature + +from ...constants import CompressionAlgorithm +from ...constants import Features as _Features +from ...constants import HashAlgorithm +from ...constants import KeyFlags as _KeyFlags +from ...constants import KeyServerPreferences as _KeyServerPreferences +from ...constants import NotationDataFlags +from ...constants import PubKeyAlgorithm +from ...constants import RevocationKeyClass +from ...constants import RevocationReason +from ...constants import SymmetricKeyAlgorithm + +from ...decorators import sdproperty + +from ...types import Fingerprint + + +__all__ = ['URI', + 'FlagList', + 'ByteFlag', + 'Boolean', + 'CreationTime', + 'SignatureExpirationTime', + 'ExportableCertification', + 'TrustSignature', + 'RegularExpression', + 'Revocable', + 'KeyExpirationTime', + 'PreferredSymmetricAlgorithms', + 'RevocationKey', + 'Issuer', + 'NotationData', + 'PreferredHashAlgorithms', + 'PreferredCompressionAlgorithms', + 'KeyServerPreferences', + 'PreferredKeyServer', + 'PrimaryUserID', + 'Policy', + 'KeyFlags', + 'SignersUserID', + 'ReasonForRevocation', + 'Features', + 'EmbeddedSignature'] + + +class URI(Signature): + @sdproperty + def uri(self): + return self._uri + + @uri.register(str) + @uri.register(six.text_type) + def uri_str(self, val): + self._uri = val + + @uri.register(bytearray) + def uri_bytearray(self, val): + self.uri = val.decode('latin-1') + + def __init__(self): + super(URI, self).__init__() + self.uri = "" + + def __bytearray__(self): + _bytes = super(URI, self).__bytearray__() + _bytes += self.uri.encode() + return _bytes + + def parse(self, packet): + super(URI, self).parse(packet) + self.uri = packet[:(self.header.length - 1)] + del packet[:(self.header.length - 1)] + + +class FlagList(Signature): + __flags__ = None + + @sdproperty + def flags(self): + return self._flags + + @flags.register(list) + @flags.register(tuple) + def flags_list(self, val): + self._flags = list(val) + + @flags.register(int) + @flags.register(CompressionAlgorithm) + @flags.register(HashAlgorithm) + @flags.register(PubKeyAlgorithm) + @flags.register(SymmetricKeyAlgorithm) + def flags_int(self, val): + if self.__flags__ is None: # pragma: no cover + raise AttributeError("Error: __flags__ not set!") + + self._flags.append(self.__flags__(val)) + + @flags.register(bytearray) + def flags_bytearray(self, val): + self.flags = self.bytes_to_int(val) + + def __init__(self): + super(FlagList, self).__init__() + self.flags = [] + + def __bytearray__(self): + _bytes = super(FlagList, self).__bytearray__() + _bytes += b''.join(self.int_to_bytes(b) for b in self.flags) + return _bytes + + def parse(self, packet): + super(FlagList, self).parse(packet) + for i in range(0, self.header.length - 1): + self.flags = packet[:1] + del packet[:1] + + +class ByteFlag(Signature): + __flags__ = None + + @sdproperty + def flags(self): + return self._flags + + @flags.register(set) + @flags.register(list) + def flags_seq(self, val): + self._flags = set(val) + + @flags.register(int) + @flags.register(_KeyFlags) + @flags.register(_Features) + def flags_int(self, val): + if self.__flags__ is None: # pragma: no cover + raise AttributeError("Error: __flags__ not set!") + + self._flags |= (self.__flags__ & val) + + @flags.register(bytearray) + def flags_bytearray(self, val): + self.flags = self.bytes_to_int(val) + + def __init__(self): + super(ByteFlag, self).__init__() + self.flags = [] + + def __bytearray__(self): + _bytes = super(ByteFlag, self).__bytearray__() + _bytes += self.int_to_bytes(sum(self.flags)) + # null-pad _bytes if they are not up to the end now + if len(_bytes) < len(self): + _bytes += b'\x00' * (len(self) - len(_bytes)) + return _bytes + + def parse(self, packet): + super(ByteFlag, self).parse(packet) + for i in range(0, self.header.length - 1): + self.flags = packet[:1] + del packet[:1] + + +class Boolean(Signature): + @sdproperty + def bflag(self): + return self._bool + + @bflag.register(bool) + def bflag_bool(self, val): + self._bool = val + + @bflag.register(bytearray) + def bflag_bytearray(self, val): + self.bool = bool(self.bytes_to_int(val)) + + def __init__(self): + super(Boolean, self).__init__() + self.bflag = False + + def __bytearray__(self): + _bytes = super(Boolean, self).__bytearray__() + _bytes += self.int_to_bytes(int(self.bflag)) + return _bytes + + def __bool__(self): + return self.bflag + + def __nonzero__(self): + return self.__bool__() + + def parse(self, packet): + super(Boolean, self).parse(packet) + self.bflag = packet[:1] + del packet[:1] + + +class CreationTime(Signature): + """ + 5.2.3.4. Signature Creation Time + + (4-octet time field) + + The time the signature was made. + + MUST be present in the hashed area. + """ + __typeid__ = 0x02 + + @sdproperty + def created(self): + return self._created + + @created.register(datetime) + def created_datetime(self, val): + self._created = val + + @created.register(int) + def created_int(self, val): + self.created = datetime.utcfromtimestamp(val) + + @created.register(bytearray) + def created_bytearray(self, val): + self.created = self.bytes_to_int(val) + + def __init__(self): + super(CreationTime, self).__init__() + self.created = datetime.utcnow() + + def __bytearray__(self): + _bytes = super(CreationTime, self).__bytearray__() + _bytes += self.int_to_bytes(calendar.timegm(self.created.timetuple()), 4) + return _bytes + + def parse(self, packet): + super(CreationTime, self).parse(packet) + self.created = packet[:4] + del packet[:4] + + +class SignatureExpirationTime(Signature): + """ + 5.2.3.10. Signature Expiration Time + + (4-octet time field) + + The validity period of the signature. This is the number of seconds + after the signature creation time that the signature expires. If + this is not present or has a value of zero, it never expires. + """ + __typeid__ = 0x03 + + @sdproperty + def expires(self): + return self._expires + + @expires.register(timedelta) + def expires_timedelta(self, val): + self._expires = val + + @expires.register(int) + def expires_int(self, val): + self.expires = timedelta(seconds=val) + + @expires.register(bytearray) + def expires_bytearray(self, val): + self.expires = self.bytes_to_int(val) + + def __init__(self): + super(SignatureExpirationTime, self).__init__() + self.expires = 0 + + def __bytearray__(self): + _bytes = super(SignatureExpirationTime, self).__bytearray__() + _bytes += self.int_to_bytes(int(self.expires.total_seconds()), 4) + return _bytes + + def parse(self, packet): + super(SignatureExpirationTime, self).parse(packet) + self.expires = packet[:4] + del packet[:4] + + +class ExportableCertification(Boolean): + """ + 5.2.3.11. Exportable Certification + + (1 octet of exportability, 0 for not, 1 for exportable) + + This subpacket denotes whether a certification signature is + "exportable", to be used by other users than the signature's issuer. + The packet body contains a Boolean flag indicating whether the + signature is exportable. If this packet is not present, the + certification is exportable; it is equivalent to a flag containing a + 1. + + Non-exportable, or "local", certifications are signatures made by a + user to mark a key as valid within that user's implementation only. + Thus, when an implementation prepares a user's copy of a key for + transport to another user (this is the process of "exporting" the + key), any local certification signatures are deleted from the key. + + The receiver of a transported key "imports" it, and likewise trims + any local certifications. In normal operation, there won't be any, + assuming the import is performed on an exported key. However, there + are instances where this can reasonably happen. For example, if an + implementation allows keys to be imported from a key database in + addition to an exported key, then this situation can arise. + + Some implementations do not represent the interest of a single user + (for example, a key server). Such implementations always trim local + certifications from any key they handle. + """ + __typeid__ = 0x04 + + +class TrustSignature(Signature): + """ + 5.2.3.13. Trust Signature + + (1 octet "level" (depth), 1 octet of trust amount) + + Signer asserts that the key is not only valid but also trustworthy at + the specified level. Level 0 has the same meaning as an ordinary + validity signature. Level 1 means that the signed key is asserted to + be a valid trusted introducer, with the 2nd octet of the body + specifying the degree of trust. Level 2 means that the signed key is + asserted to be trusted to issue level 1 trust signatures, i.e., that + it is a "meta introducer". Generally, a level n trust signature + asserts that a key is trusted to issue level n-1 trust signatures. + The trust amount is in a range from 0-255, interpreted such that + values less than 120 indicate partial trust and values of 120 or + greater indicate complete trust. Implementations SHOULD emit values + of 60 for partial trust and 120 for complete trust. + """ + __typeid__ = 0x05 + + @sdproperty + def level(self): + return self._level + + @level.register(int) + def level_int(self, val): + self._level = val + + @level.register(bytearray) + def level_bytearray(self, val): + self.level = self.bytes_to_int(val) + + @sdproperty + def amount(self): + return self._amount + + @amount.register(int) + def amount_int(self, val): + # clamp 'val' to the range 0-255 + self._amount = max(0, min(val, 255)) + + @amount.register(bytearray) + def amount_bytearray(self, val): + self.amount = self.bytes_to_int(val) + + def __init__(self): + super(TrustSignature, self).__init__() + self.level = 0 + self.amount = 0 + + def __bytearray__(self): + _bytes = super(TrustSignature, self).__bytearray__() + _bytes += self.int_to_bytes(self.level) + _bytes += self.int_to_bytes(self.amount) + return _bytes + + def parse(self, packet): + super(TrustSignature, self).parse(packet) + self.level = packet[:1] + del packet[:1] + self.amount = packet[:1] + del packet[:1] + + +class RegularExpression(Signature): + """ + 5.2.3.14. Regular Expression + + (null-terminated regular expression) + + Used in conjunction with trust Signature packets (of level > 0) to + limit the scope of trust that is extended. Only signatures by the + target key on User IDs that match the regular expression in the body + of this packet have trust extended by the trust Signature subpacket. + The regular expression uses the same syntax as the Henry Spencer's + "almost public domain" regular expression [REGEX] package. A + description of the syntax is found in Section 8 below. + """ + __typeid__ = 0x06 + + @sdproperty + def regex(self): + return self._regex + + @regex.register(str) + @regex.register(six.text_type) + def regex_str(self, val): + self._regex = val + + @regex.register(bytearray) + def regex_bytearray(self, val): + self.regex = val.decode('latin-1') + + def __init__(self): + super(RegularExpression, self).__init__() + self.regex = r'' + + def __bytearray__(self): + _bytes = super(RegularExpression, self).__bytearray__() + _bytes += self.regex.encode() + return _bytes + + def parse(self, packet): + super(RegularExpression, self).parse(packet) + self.regex = packet[:(self.header.length - 1)] + del packet[:(self.header.length - 1)] + + +class Revocable(Boolean): + """ + 5.2.3.12. Revocable + + (1 octet of revocability, 0 for not, 1 for revocable) + + Signature's revocability status. The packet body contains a Boolean + flag indicating whether the signature is revocable. Signatures that + are not revocable have any later revocation signatures ignored. They + represent a commitment by the signer that he cannot revoke his + signature for the life of his key. If this packet is not present, + the signature is revocable. + """ + __typeid__ = 0x07 + + +class KeyExpirationTime(SignatureExpirationTime): + """ + 5.2.3.6. Key Expiration Time + + (4-octet time field) + + The validity period of the key. This is the number of seconds after + the key creation time that the key expires. If this is not present + or has a value of zero, the key never expires. This is found only on + a self-signature. + """ + __typeid__ = 0x09 + + +class PreferredSymmetricAlgorithms(FlagList): + """ + 5.2.3.7. Preferred Symmetric Algorithms + + (array of one-octet values) + + Symmetric algorithm numbers that indicate which algorithms the key + holder prefers to use. The subpacket body is an ordered list of + octets with the most preferred listed first. It is assumed that only + algorithms listed are supported by the recipient's software. + Algorithm numbers are in Section 9. This is only found on a self- + signature. + """ + __typeid__ = 0x0B + __flags__ = SymmetricKeyAlgorithm + + +class RevocationKey(Signature): + """ + 5.2.3.15. Revocation Key + + (1 octet of class, 1 octet of public-key algorithm ID, 20 octets of + fingerprint) + + Authorizes the specified key to issue revocation signatures for this + key. Class octet must have bit 0x80 set. If the bit 0x40 is set, + then this means that the revocation information is sensitive. Other + bits are for future expansion to other kinds of authorizations. This + is found on a self-signature. + + If the "sensitive" flag is set, the keyholder feels this subpacket + contains private trust information that describes a real-world + sensitive relationship. If this flag is set, implementations SHOULD + NOT export this signature to other users except in cases where the + data needs to be available: when the signature is being sent to the + designated revoker, or when it is accompanied by a revocation + signature from that revoker. Note that it may be appropriate to + isolate this subpacket within a separate signature so that it is not + combined with other subpackets that need to be exported. + """ + __typeid__ = 0x0C + + @sdproperty + def keyclass(self): + return self._keyclass + + @keyclass.register(list) + def keyclass_list(self, val): + self._keyclass = val + + @keyclass.register(int) + @keyclass.register(RevocationKeyClass) + def keyclass_int(self, val): + self._keyclass += RevocationKeyClass & val + + @keyclass.register(bytearray) + def keyclass_bytearray(self, val): + self.keyclass = self.bytes_to_int(val) + + @sdproperty + def algorithm(self): + return self._algorithm + + @algorithm.register(int) + @algorithm.register(PubKeyAlgorithm) + def algorithm_int(self, val): + self._algorithm = PubKeyAlgorithm(val) + + @algorithm.register(bytearray) + def algorithm_bytearray(self, val): + self.algorithm = self.bytes_to_int(val) + + @sdproperty + def fingerprint(self): + return self._fingerprint + + @fingerprint.register(str) + @fingerprint.register(six.text_type) + @fingerprint.register(Fingerprint) + def fingerprint_str(self, val): + self._fingerprint = Fingerprint(val) + + @fingerprint.register(bytearray) + def fingerprint_bytearray(self, val): + self.fingerprint = ''.join('{:02x}'.format(c) for c in val).upper() + + def __init__(self): + super(RevocationKey, self).__init__() + self.keyclass = [] + self.algorithm = PubKeyAlgorithm.Invalid + self._fingerprint = "" + + def __bytearray__(self): + _bytes = super(RevocationKey, self).__bytearray__() + _bytes += self.int_to_bytes(sum(self.keyclass)) + _bytes += self.int_to_bytes(self.algorithm.value) + _bytes += self.fingerprint.__bytes__() + return _bytes + + def parse(self, packet): + super(RevocationKey, self).parse(packet) + self.keyclass = packet[:1] + del packet[:1] + self.algorithm = packet[:1] + del packet[:1] + self.fingerprint = packet[:20] + del packet[:20] + + +class Issuer(Signature): + __typeid__ = 0x10 + + @sdproperty + def issuer(self): + return self._issuer + + @issuer.register(bytearray) + def issuer_bytearray(self, val): + self._issuer = binascii.hexlify(val).upper().decode('latin-1') + + def __init__(self): + super(Issuer, self).__init__() + self.issuer = bytearray() + + def __bytearray__(self): + _bytes = super(Issuer, self).__bytearray__() + _bytes += binascii.unhexlify(self._issuer.encode()) + return _bytes + + def parse(self, packet): + super(Issuer, self).parse(packet) + self.issuer = packet[:8] + del packet[:8] + + +class NotationData(Signature): + __typeid__ = 0x14 + + @sdproperty + def flags(self): + return self._flags + + @flags.register(list) + def flags_list(self, val): + self._flags = val + + @flags.register(int) + @flags.register(NotationDataFlags) + def flags_int(self, val): + self.flags += NotationDataFlags & val + + @flags.register(bytearray) + def flags_bytearray(self, val): + self.flags = self.bytes_to_int(val) + + @sdproperty + def name(self): + return self._name + + @name.register(str) + @name.register(six.text_type) + def name_str(self, val): + self._name = val + + @name.register(bytearray) + def name_bytearray(self, val): + self.name = val.decode('latin-1') + + @sdproperty + def value(self): + return self._value + + @value.register(str) + @value.register(six.text_type) + def value_str(self, val): + self._value = val + + @value.register(bytearray) + def value_bytearray(self, val): + if NotationDataFlags.HumanReadable in self.flags: + self.value = val.decode('latin-1') + + else: # pragma: no cover + self._value = val + + def __init__(self): + super(NotationData, self).__init__() + self.flags = [0, 0, 0, 0] + self.name = "" + self.value = "" + + def __bytearray__(self): + _bytes = super(NotationData, self).__bytearray__() + _bytes += self.int_to_bytes(sum(self.flags)) + b'\x00\x00\x00' + _bytes += self.int_to_bytes(len(self.name), 2) + _bytes += self.int_to_bytes(len(self.value), 2) + _bytes += self.name.encode() + _bytes += self.value if isinstance(self.value, bytearray) else self.value.encode() + return bytes(_bytes) + + def parse(self, packet): + super(NotationData, self).parse(packet) + self.flags = packet[:1] + del packet[:4] + nlen = self.bytes_to_int(packet[:2]) + del packet[:2] + vlen = self.bytes_to_int(packet[:2]) + del packet[:2] + self.name = packet[:nlen] + del packet[:nlen] + self.value = packet[:vlen] + del packet[:vlen] + + +class PreferredHashAlgorithms(FlagList): + __typeid__ = 0x15 + __flags__ = HashAlgorithm + + +class PreferredCompressionAlgorithms(FlagList): + __typeid__ = 0x16 + __flags__ = CompressionAlgorithm + + +class KeyServerPreferences(FlagList): + __typeid__ = 0x17 + __flags__ = _KeyServerPreferences + + +class PreferredKeyServer(URI): + __typeid__ = 0x18 + + +class PrimaryUserID(Signature): + __typeid__ = 0x19 + + @sdproperty + def primary(self): + return self._primary + + @primary.register(bool) + def primary_bool(self, val): + self._primary = val + + @primary.register(bytearray) + def primary_byrearray(self, val): + self.primary = bool(self.bytes_to_int(val)) + + def __init__(self): + super(PrimaryUserID, self).__init__() + self.primary = True + + def __bytearray__(self): + _bytes = super(PrimaryUserID, self).__bytearray__() + _bytes += self.int_to_bytes(int(self.primary)) + return _bytes + + def __bool__(self): + return self.primary + + def __nonzero__(self): + return self.__bool__() + + def parse(self, packet): + super(PrimaryUserID, self).parse(packet) + self.primary = packet[:1] + del packet[:1] + + +class Policy(URI): + __typeid__ = 0x1a + + +class KeyFlags(ByteFlag): + __typeid__ = 0x1B + __flags__ = _KeyFlags + + +class SignersUserID(Signature): + __typeid__ = 0x1C + + @sdproperty + def userid(self): + return self._userid + + @userid.register(str) + @userid.register(six.text_type) + def userid_str(self, val): + self._userid = val + + @userid.register(bytearray) + def userid_bytearray(self, val): + self.userid = val.decode('latin-1') + + def __init__(self): + super(SignersUserID, self).__init__() + self.userid = "" + + def __bytearray__(self): + _bytes = super(SignersUserID, self).__bytearray__() + _bytes += self.userid.encode() + return _bytes + + def parse(self, packet): + super(SignersUserID, self).parse(packet) + self.userid = packet[:(self.header.length - 1)] + del packet[:(self.header.length - 1)] + + +class ReasonForRevocation(Signature): + __typeid__ = 0x1D + + @sdproperty + def code(self): + return self._code + + @code.register(int) + @code.register(RevocationReason) + def code_int(self, val): + self._code = RevocationReason(val) + + @code.register(bytearray) + def code_bytearray(self, val): + self.code = self.bytes_to_int(val) + + @sdproperty + def string(self): + return self._string + + @string.register(str) + @string.register(six.text_type) + def string_str(self, val): + self._string = val + + @string.register(bytearray) + def string_bytearray(self, val): + self.string = val.decode('latin-1') + + def __init__(self): + super(ReasonForRevocation, self).__init__() + self.code = 0x00 + self.string = "" + + def __bytearray__(self): + _bytes = super(ReasonForRevocation, self).__bytearray__() + _bytes += self.int_to_bytes(self.code) + _bytes += self.string.encode() + return _bytes + + def parse(self, packet): + super(ReasonForRevocation, self).parse(packet) + self.code = packet[:1] + del packet[:1] + self.string = packet[:(self.header.length - 2)] + del packet[:(self.header.length - 2)] + + +class Features(ByteFlag): + __typeid__ = 0x1E + __flags__ = _Features + + +##TODO: obtain subpacket type 0x1F - Signature Target + + +class EmbeddedSignature(Signature): + __typeid__ = 0x20 + + @sdproperty + def _sig(self): + return self._sigpkt + + @_sig.setter + def _sig(self, val): + esh = EmbeddedSignatureHeader() + esh.version = val.header.version + val.header = esh + val.update_hlen() + self._sigpkt = val + + @property + def sigtype(self): + return self._sig.sigtype + + @property + def pubalg(self): + return self._sig.pubalg + + @property + def halg(self): + return self._sig.halg + + @property + def subpackets(self): + return self._sig.subpackets + + @property + def hash2(self): # pragma: no cover + return self._sig.hash2 + + @property + def signature(self): + return self._sig.signature + + @property + def signer(self): + return self._sig.signer + + def __init__(self): + super(EmbeddedSignature, self).__init__() + from ..packets import SignatureV4 + self._sigpkt = SignatureV4() + self._sigpkt.header = EmbeddedSignatureHeader() + + def __bytearray__(self): + return super(EmbeddedSignature, self).__bytearray__() + self._sigpkt.__bytearray__() + + def parse(self, packet): + super(EmbeddedSignature, self).parse(packet) + self._sig.parse(packet) diff --git a/src/leap/mx/vendor/pgpy/packet/subpackets/types.py b/src/leap/mx/vendor/pgpy/packet/subpackets/types.py new file mode 100644 index 0000000..7cf58d1 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/packet/subpackets/types.py @@ -0,0 +1,136 @@ +""" subpacket.py +""" +import abc + +from ..types import VersionedHeader + +from ...decorators import sdproperty + +from ...types import Dispatchable +from ...types import Header as _Header + +__all__ = ['Header', + 'EmbeddedSignatureHeader', + 'SubPacket', + 'Signature', + 'UserAttribute', + 'Opaque'] + + +class Header(_Header): + @sdproperty + def critical(self): + return self._critical + + @critical.register(bool) + def critical_bool(self, val): + self._critical = val + + @sdproperty + def typeid(self): + return self._typeid + + @typeid.register(int) + def typeid_int(self, val): + self._typeid = val & 0x7f + + @typeid.register(bytes) + @typeid.register(bytearray) + def typeid_bin(self, val): + v = self.bytes_to_int(val) + self.typeid = v + self.critical = bool(v & 0x80) + + def __init__(self): + super(Header, self).__init__() + self.typeid = b'\x00' + self.critical = False + + def parse(self, packet): + self.length = packet + + self.typeid = packet[:1] + del packet[:1] + + def __len__(self): + return self.llen + 1 + + def __bytearray__(self): + _bytes = bytearray(self.encode_length(self.length)) + _bytes += self.int_to_bytes((int(self.critical) << 7) + self.typeid) + return _bytes + + +class EmbeddedSignatureHeader(VersionedHeader): + def __bytearray__(self): + return bytearray([self.version]) + + def parse(self, packet): + self.tag = 2 + super(EmbeddedSignatureHeader, self).parse(packet) + + +class SubPacket(Dispatchable): + __headercls__ = Header + + def __init__(self): + super(SubPacket, self).__init__() + self.header = Header() + + # if self.__typeid__ not in [-1, None]: + if (self.header.typeid == 0x00 and + (not hasattr(self.__typeid__, '__abstractmethod__')) and + (self.__typeid__ not in [-1, None])): + self.header.typeid = self.__typeid__ + + def __bytearray__(self): + return self.header.__bytearray__() + + def __len__(self): + return (self.header.llen + self.header.length) + + def __repr__(self): + return "<{} [0x{:02x}] at 0x{:x}>".format(self.__class__.__name__, self.header.typeid, id(self)) + + def update_hlen(self): + self.header.length = (len(self.__bytearray__()) - len(self.header)) + 1 + + @abc.abstractmethod + def parse(self, packet): # pragma: no cover + if self.header._typeid == 0: + self.header.parse(packet) + + +class Signature(SubPacket): + __typeid__ = -1 + + +class UserAttribute(SubPacket): + __typeid__ = -1 + + +class Opaque(Signature, UserAttribute): + __typeid__ = None + + @sdproperty + def payload(self): + return self._payload + + @payload.register(bytes) + @payload.register(bytearray) + def payload_bin(self, val): + self._payload = bytearray(val) + + def __init__(self): + super(Opaque, self).__init__() + self.payload = b'' + + def __bytearray__(self): + _bytes = super(Opaque, self).__bytearray__() + _bytes += self.payload + return _bytes + + def parse(self, packet): + super(Opaque, self).parse(packet) + self.payload = packet[:(self.header.length - 1)] + del packet[:(self.header.length - 1)] diff --git a/src/leap/mx/vendor/pgpy/packet/subpackets/userattribute.py b/src/leap/mx/vendor/pgpy/packet/subpackets/userattribute.py new file mode 100644 index 0000000..32f7410 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/packet/subpackets/userattribute.py @@ -0,0 +1,109 @@ +""" userattribute.py +""" +import struct + +from .types import UserAttribute + +from ...constants import ImageEncoding + +from ...decorators import sdproperty + +from ...memoryview import memoryview + + +__all__ = ('Image',) + + +class Image(UserAttribute): + """ + 5.12.1. The Image Attribute Subpacket + + The Image Attribute subpacket is used to encode an image, presumably + (but not required to be) that of the key owner. + + The Image Attribute subpacket begins with an image header. The first + two octets of the image header contain the length of the image + header. Note that unlike other multi-octet numerical values in this + document, due to a historical accident this value is encoded as a + little-endian number. The image header length is followed by a + single octet for the image header version. The only currently + defined version of the image header is 1, which is a 16-octet image + header. The first three octets of a version 1 image header are thus + 0x10, 0x00, 0x01. + + The fourth octet of a version 1 image header designates the encoding + format of the image. The only currently defined encoding format is + the value 1 to indicate JPEG. Image format types 100 through 110 are + reserved for private or experimental use. The rest of the version 1 + image header is made up of 12 reserved octets, all of which MUST be + set to 0. + + The rest of the image subpacket contains the image itself. As the + only currently defined image type is JPEG, the image is encoded in + the JPEG File Interchange Format (JFIF), a standard file format for + JPEG images [JFIF]. + + An implementation MAY try to determine the type of an image by + examination of the image data if it is unable to handle a particular + version of the image header or if a specified encoding format value + is not recognized. + """ + __typeid__ = 0x01 + + @sdproperty + def version(self): + return self._version + + @version.register(int) + def version_int(self, val): + self._version = val + + @sdproperty + def iencoding(self): + return self._iencoding + + @iencoding.register(int) + @iencoding.register(ImageEncoding) + def iencoding_int(self, val): + try: + self._iencoding = ImageEncoding(val) + + except ValueError: # pragma: no cover + self._iencoding = val + + @sdproperty + def image(self): + return self._image + + @image.register(bytes) + @image.register(bytearray) + def image_bin(self, val): + self._image = bytearray(val) + + def __init__(self): + super(Image, self).__init__() + self.version = 1 + self.iencoding = 1 + self.image = bytearray() + + def __bytearray__(self): + _bytes = super(Image, self).__bytearray__() + + if self.version == 1: + # v1 image header length is always 16 bytes + # and stored little-endian due to an 'historical accident' + _bytes += struct.pack('<hbbiii', 16, self.version, self.iencoding, 0, 0, 0) + + _bytes += self.image + return _bytes + + def parse(self, packet): + super(Image, self).parse(packet) + + # on Python 2, this will be the wrapper object from memoryview.py + with memoryview(packet) as _head: + _, self.version, self.iencoding, _, _, _ = struct.unpack_from('<hbbiii', _head[:16].tobytes()) + del packet[:16] + + self.image = packet[:(self.header.length - 17)] + del packet[:(self.header.length - 17)] diff --git a/src/leap/mx/vendor/pgpy/packet/types.py b/src/leap/mx/vendor/pgpy/packet/types.py new file mode 100644 index 0000000..e84ba67 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/packet/types.py @@ -0,0 +1,288 @@ +""" types.py +""" +from __future__ import division + +import abc +import copy + +import six + +from ..constants import PacketTag + +from ..decorators import sdproperty + +from ..types import Dispatchable +from ..types import Field +from ..types import Header as _Header + +__all__ = ['Header', + 'VersionedHeader', + 'Packet', + 'VersionedPacket', + 'Opaque', + 'Key', + 'Public', + 'Private', + 'Primary', + 'Sub', + 'MPI', + 'MPIs', ] + + +class Header(_Header): + @sdproperty + def tag(self): + return self._tag + + @tag.register(int) + @tag.register(PacketTag) + def tag_int(self, val): + _tag = (val & 0x3F) if self._lenfmt else ((val & 0x3C) >> 2) + try: + self._tag = PacketTag(_tag) + + except ValueError: # pragma: no cover + self._tag = _tag + + @property + def typeid(self): + return self.tag + + def __init__(self): + super(Header, self).__init__() + self.tag = 0x00 + + def __bytearray__(self): + tag = 0x80 | (self._lenfmt << 6) + tag |= (self.tag) if self._lenfmt else ((self.tag << 2) | {1: 0, 2: 1, 4: 2, 0: 3}[self.llen]) + + _bytes = bytearray(self.int_to_bytes(tag)) + _bytes += self.encode_length(self.length, self._lenfmt, self.llen) + return _bytes + + def __len__(self): + return 1 + self.llen + + def parse(self, packet): + """ + There are two formats for headers + + old style + --------- + + Old style headers can be 1, 2, 3, or 6 octets long and are composed of a Tag and a Length. + If the header length is 1 octet (length_type == 3), then there is no Length field. + + new style + --------- + + New style headers can be 2, 3, or 6 octets long and are also composed of a Tag and a Length. + + + Packet Tag + ---------- + + The packet tag is the first byte, comprising the following fields: + + +-------------+----------+---------------+---+---+---+---+----------+----------+ + | byte | 1 | + +-------------+----------+---------------+---+---+---+---+----------+----------+ + | bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | + +-------------+----------+---------------+---+---+---+---+----------+----------+ + | old-style | always 1 | packet format | packet tag | length type | + | description | | 0 = old-style | | 0 = 1 octet | + | | | 1 = new-style | | 1 = 2 octets | + | | | | | 2 = 5 octets | + | | | | | 3 = no length field | + +-------------+ + +---------------+---------------------+ + | new-style | | | packet tag | + | description | | | | + +-------------+----------+---------------+-------------------------------------+ + + :param packet: raw packet bytes + """ + self._lenfmt = ((packet[0] & 0x40) >> 6) + self.tag = packet[0] + if self._lenfmt == 0: + self.llen = (packet[0] & 0x03) + del packet[0] + + if (self._lenfmt == 0 and self.llen > 0) or self._lenfmt == 1: + self.length = packet + + else: + # indeterminate packet length + self.length = len(packet) + + +class VersionedHeader(Header): + @sdproperty + def version(self): + return self._version + + @version.register(int) + def version_int(self, val): + self._version = val + + def __init__(self): + super(VersionedHeader, self).__init__() + self.version = 0 + + def __bytearray__(self): + _bytes = bytearray(super(VersionedHeader, self).__bytearray__()) + _bytes += bytearray([self.version]) + return _bytes + + def parse(self, packet): # pragma: no cover + if self.tag == 0: + super(VersionedHeader, self).parse(packet) + + if self.version == 0: + self.version = packet[0] + del packet[0] + + +class Packet(Dispatchable): + __typeid__ = -1 + __headercls__ = Header + + def __init__(self): + super(Packet, self).__init__() + self.header = self.__headercls__() + if isinstance(self.__typeid__, six.integer_types): + self.header.tag = self.__typeid__ + + @abc.abstractmethod + def __bytearray__(self): + return self.header.__bytearray__() + + def __len__(self): + return len(self.header) + self.header.length + + def __repr__(self): + return "<{cls:s} [tag 0x{tag:02d}] at 0x{id:x}>".format(cls=self.__class__.__name__, tag=self.header.tag, id=id(self)) + + def update_hlen(self): + self.header.length = len(self.__bytearray__()) - len(self.header) + + @abc.abstractmethod + def parse(self, packet): + if self.header.tag == 0: + self.header.parse(packet) + + +class VersionedPacket(Packet): + __headercls__ = VersionedHeader + + def __init__(self): + super(VersionedPacket, self).__init__() + if isinstance(self.__ver__, six.integer_types): + self.header.version = self.__ver__ + + def __repr__(self): + return "<{cls:s} [tag 0x{tag:02d}][v{ver:d}] at 0x{id:x}>".format(cls=self.__class__.__name__, tag=self.header.tag, + ver=self.header.version, id=id(self)) + + +class Opaque(Packet): + __typeid__ = None + + @sdproperty + def payload(self): + return self._payload + + @payload.register(bytearray) + @payload.register(bytes) + def payload_bin(self, val): + self._payload = val + + def __init__(self): + super(Opaque, self).__init__() + self.payload = b'' + + def __bytearray__(self): + _bytes = super(Opaque, self).__bytearray__() + _bytes += self.payload + return _bytes + + def parse(self, packet): # pragma: no cover + super(Opaque, self).parse(packet) + pend = self.header.length + if hasattr(self.header, 'version'): + pend -= 1 + + self.payload = packet[:pend] + del packet[:pend] + + +# key marker classes for convenience +class Key(object): + pass + + +class Public(Key): + pass + + +class Private(Key): + pass + + +class Primary(Key): + pass + + +class Sub(Key): + pass + + +# This is required for class MPI to work in both Python 2 and 3 +if not six.PY2: + long = int + + +class MPI(long): + def __new__(cls, num): + mpi = num + + if isinstance(num, (bytes, bytearray)): + if isinstance(num, bytes): # pragma: no cover + num = bytearray(num) + + fl = ((MPIs.bytes_to_int(num[:2]) + 7) // 8) + del num[:2] + + mpi = MPIs.bytes_to_int(num[:fl]) + del num[:fl] + + return super(MPI, cls).__new__(cls, mpi) + + def byte_length(self): + return ((self.bit_length() + 7) // 8) + + def to_mpibytes(self): + return MPIs.int_to_bytes(self.bit_length(), 2) + MPIs.int_to_bytes(self, self.byte_length()) + + def __len__(self): + return self.byte_length() + 2 + + +class MPIs(Field): + # this differs from MPI in that it's subclasses hold/parse several MPI fields + # and, in the case of v4 private keys, also a String2Key specifier/information. + __mpis__ = () + + def __len__(self): + return sum(len(i) for i in self) + + def __iter__(self): + """yield all components of an MPI so it can be iterated over""" + for i in self.__mpis__: + yield getattr(self, i) + + def __copy__(self): + pk = self.__class__() + for m in self.__mpis__: + setattr(pk, m, copy.copy(getattr(self, m))) + + return pk diff --git a/src/leap/mx/vendor/pgpy/pgp.py b/src/leap/mx/vendor/pgpy/pgp.py new file mode 100644 index 0000000..82b3a0b --- /dev/null +++ b/src/leap/mx/vendor/pgpy/pgp.py @@ -0,0 +1,2527 @@ +""" pgp.py + +this is where the armorable PGP block objects live +""" +import binascii +import calendar +import collections +import contextlib +import copy +import functools +import itertools +import operator +import os +import re +import warnings +import weakref + +import six + +from datetime import datetime + +from cryptography.hazmat.primitives import hashes + +from .constants import CompressionAlgorithm +from .constants import Features +from .constants import HashAlgorithm +from .constants import ImageEncoding +from .constants import KeyFlags +from .constants import NotationDataFlags +from .constants import PacketTag +from .constants import PubKeyAlgorithm +from .constants import RevocationKeyClass +from .constants import RevocationReason +from .constants import SignatureType +from .constants import SymmetricKeyAlgorithm + +from .decorators import KeyAction + +from .errors import PGPDecryptionError +from .errors import PGPError + +from .packet import Key +from .packet import MDC +from .packet import Packet +from .packet import Primary +from .packet import Private +from .packet import PubKeyV4 +from .packet import PubSubKeyV4 +from .packet import PrivKeyV4 +from .packet import PrivSubKeyV4 +from .packet import Public +from .packet import Sub +from .packet import UserID +from .packet import UserAttribute + +from .packet.packets import CompressedData +from .packet.packets import IntegrityProtectedSKEData +from .packet.packets import IntegrityProtectedSKEDataV1 +from .packet.packets import LiteralData +from .packet.packets import OnePassSignature +from .packet.packets import OnePassSignatureV3 +from .packet.packets import PKESessionKey +from .packet.packets import PKESessionKeyV3 +from .packet.packets import Signature +from .packet.packets import SignatureV4 +from .packet.packets import SKEData +from .packet.packets import Marker +from .packet.packets import SKESessionKey +from .packet.packets import SKESessionKeyV4 + +from .packet.types import Opaque + +from .types import Armorable +from .types import Fingerprint +from .types import ParentRef +from .types import PGPObject +from .types import SignatureVerification +from .types import SorteDeque + +__all__ = ['PGPSignature', + 'PGPUID', + 'PGPMessage', + 'PGPKey', + 'PGPKeyring'] + + +class PGPSignature(Armorable, ParentRef, PGPObject): + @property + def __sig__(self): + return self._signature.signature.__sig__() + + @property + def cipherprefs(self): + """ + A ``list`` of preferred symmetric algorithms specified in this signature, if any. Otherwise, an empty ``list``. + """ + if 'PreferredSymmetricAlgorithms' in self._signature.subpackets: + return next(iter(self._signature.subpackets['h_PreferredSymmetricAlgorithms'])).flags + return [] + + @property + def compprefs(self): + """ + A ``list`` of preferred compression algorithms specified in this signature, if any. Otherwise, an empty ``list``. + """ + if 'PreferredCompressionAlgorithms' in self._signature.subpackets: + return next(iter(self._signature.subpackets['h_PreferredCompressionAlgorithms'])).flags + return [] + + @property + def created(self): + """ + A :py:obj:`~datetime.datetime` of when this signature was created. + """ + return self._signature.subpackets['h_CreationTime'][-1].created + + @property + def embedded(self): + return self.parent is not None + + @property + def expires_at(self): + """ + A :py:obj:`~datetime.datetime` of when this signature expires, if a signature expiration date is specified. + Otherwise, ``None`` + """ + if 'SignatureExpirationTime' in self._signature.subpackets: + expd = next(iter(self._signature.subpackets['SignatureExpirationTime'])).expires + return self.created + expd + return None + + @property + def exportable(self): + """ + ``False`` if this signature is marked as being not exportable. Otherwise, ``True``. + """ + if 'ExportableCertification' in self._signature.subpackets: + return bool(next(iter(self._signature.subpackets['ExportableCertification']))) + + return True + + @property + def features(self): + """ + A ``set`` of implementation features specified in this signature, if any. Otherwise, an empty ``set``. + """ + if 'Features' in self._signature.subpackets: + return next(iter(self._signature.subpackets['Features'])).flags + return set() + + @property + def hash2(self): + return self._signature.hash2 + + @property + def hashprefs(self): + """ + A ``list`` of preferred hash algorithms specified in this signature, if any. Otherwise, an empty ``list``. + """ + if 'PreferredHashAlgorithms' in self._signature.subpackets: + return next(iter(self._signature.subpackets['h_PreferredHashAlgorithms'])).flags + return [] + + @property + def hash_algorithm(self): + """ + The :py:obj:`~constants.HashAlgorithm` used when computing this signature. + """ + return self._signature.halg + + @property + def is_expired(self): + """ + ``True`` if the signature has an expiration date, and is expired. Otherwise, ``False`` + """ + expires_at = self.expires_at + if expires_at is not None and expires_at != self.created: + return expires_at < datetime.utcnow() + + return False + + @property + def key_algorithm(self): + """ + The :py:obj:`~constants.PubKeyAlgorithm` of the key that generated this signature. + """ + return self._signature.pubalg + + @property + def key_expiration(self): + if 'KeyExpirationTime' in self._signature.subpackets: + return next(iter(self._signature.subpackets['KeyExpirationTime'])).expires + return None + + @property + def key_flags(self): + """ + A ``set`` of :py:obj:`~constants.KeyFlags` specified in this signature, if any. Otherwise, an empty ``set``. + """ + if 'KeyFlags' in self._signature.subpackets: + return next(iter(self._signature.subpackets['h_KeyFlags'])).flags + return set() + + @property + def keyserver(self): + """ + The preferred key server specified in this signature, if any. Otherwise, an empty ``str``. + """ + if 'PreferredKeyServer' in self._signature.subpackets: + return next(iter(self._signature.subpackets['h_PreferredKeyServer'])).uri + return '' + + @property + def keyserverprefs(self): + """ + A ``list`` of :py:obj:`~constants.KeyServerPreferences` in this signature, if any. Otherwise, an empty ``list``. + """ + if 'KeyServerPreferences' in self._signature.subpackets: + return next(iter(self._signature.subpackets['h_KeyServerPreferences'])).flags + return [] + + @property + def magic(self): + return "SIGNATURE" + + @property + def notation(self): + """ + A ``dict`` of notation data in this signature, if any. Otherwise, an empty ``dict``. + """ + return dict((nd.name, nd.value) for nd in self._signature.subpackets['NotationData']) + + @property + def policy_uri(self): + """ + The policy URI specified in this signature, if any. Otherwise, an empty ``str``. + """ + if 'Policy' in self._signature.subpackets: + return next(iter(self._signature.subpackets['Policy'])).uri + return '' + + @property + def revocable(self): + """ + ``False`` if this signature is marked as being not revocable. Otherwise, ``True``. + """ + if 'Revocable' in self._signature.subpackets: + return bool(next(iter(self._signature.subpackets['Revocable']))) + return True + + @property + def revocation_key(self): + if 'RevocationKey' in self._signature.subpackets: + raise NotImplementedError() + return None + + @property + def signer(self): + """ + The 16-character Key ID of the key that generated this signature. + """ + return self._signature.signer + + @property + def target_signature(self): + return NotImplemented + + @property + def type(self): + """ + The :py:obj:`~constants.SignatureType` of this signature. + """ + return self._signature.sigtype + + @classmethod + def new(cls, sigtype, pkalg, halg, signer): + sig = PGPSignature() + + sigpkt = SignatureV4() + sigpkt.header.tag = 2 + sigpkt.header.version = 4 + sigpkt.subpackets.addnew('CreationTime', hashed=True, created=datetime.utcnow()) + sigpkt.subpackets.addnew('Issuer', _issuer=signer) + + sigpkt.sigtype = sigtype + sigpkt.pubalg = pkalg + + if halg is not None: + sigpkt.halg = halg + + sig._signature = sigpkt + return sig + + def __init__(self): + """ + PGPSignature objects represent OpenPGP compliant signatures. + + PGPSignature implements the ``__str__`` method, the output of which will be the signature object in + OpenPGP-compliant ASCII-armored format. + + PGPSignature implements the ``__bytes__`` method, the output of which will be the signature object in + OpenPGP-compliant binary format. + """ + super(PGPSignature, self).__init__() + self._signature = None + + def __bytearray__(self): + return self._signature.__bytearray__() + + def __repr__(self): + return "<PGPSignature [{:s}] object at 0x{:02x}>".format(self.type.name, id(self)) + + def __lt__(self, other): + return self.created < other.created + + def __or__(self, other): + if isinstance(other, Signature): + if self._signature is None: + self._signature = other + return self + + ##TODO: this is not a great way to do this + if other.__class__.__name__ == 'EmbeddedSignature': + self._signature = other + return self + + raise TypeError + + def __copy__(self): + # because the default shallow copy isn't actually all that useful, + # and deepcopy does too much work + sig = super(PGPSignature, self).__copy__() + # sig = PGPSignature() + # sig.ascii_headers = self.ascii_headers.copy() + sig |= copy.copy(self._signature) + return sig + + def hashdata(self, subject): + _data = bytearray() + + if isinstance(subject, six.string_types): + subject = subject.encode('latin-1') + + """ + All signatures are formed by producing a hash over the signature + data, and then using the resulting hash in the signature algorithm. + """ + + if self.type == SignatureType.BinaryDocument: + """ + For binary document signatures (type 0x00), the document data is + hashed directly. + """ + _data += bytearray(subject) + + if self.type == SignatureType.CanonicalDocument: + """ + For text document signatures (type 0x01), the + document is canonicalized by converting line endings to <CR><LF>, + and the resulting data is hashed. + """ + _data += re.subn(br'\r?\n', b'\r\n', subject)[0] + + if self.type in {SignatureType.Generic_Cert, SignatureType.Persona_Cert, SignatureType.Casual_Cert, + SignatureType.Positive_Cert, SignatureType.CertRevocation, SignatureType.Subkey_Binding, + SignatureType.PrimaryKey_Binding}: + """ + When a signature is made over a key, the hash data starts with the + octet 0x99, followed by a two-octet length of the key, and then body + of the key packet. (Note that this is an old-style packet header for + a key packet with two-octet length.) ... + Key revocation signatures (types 0x20 and 0x28) + hash only the key being revoked. + """ + _s = b'' + if isinstance(subject, PGPUID): + _s = subject._parent.hashdata + + elif isinstance(subject, PGPKey) and not subject.is_primary: + _s = subject._parent.hashdata + + elif isinstance(subject, PGPKey) and subject.is_primary: + _s = subject.hashdata + + if len(_s) > 0: + _data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s + + if self.type in {SignatureType.Subkey_Binding, SignatureType.PrimaryKey_Binding}: + """ + A subkey binding signature + (type 0x18) or primary key binding signature (type 0x19) then hashes + the subkey using the same format as the main key (also using 0x99 as + the first octet). + """ + if subject.is_primary: + _s = subject.subkeys[self.signer].hashdata + + else: + _s = subject.hashdata + + _data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s + + if self.type in {SignatureType.KeyRevocation, SignatureType.SubkeyRevocation, SignatureType.DirectlyOnKey}: + """ + The signature is calculated directly on the key being revoked. A + revoked key is not to be used. Only revocation signatures by the + key being revoked, or by an authorized revocation key, should be + considered valid revocation signatures. + + Subkey revocation signature + The signature is calculated directly on the subkey being revoked. + A revoked subkey is not to be used. Only revocation signatures + by the top-level signature key that is bound to this subkey, or + by an authorized revocation key, should be considered valid + revocation signatures. + + Signature directly on a key + This signature is calculated directly on a key. It binds the + information in the Signature subpackets to the key, and is + appropriate to be used for subpackets that provide information + about the key, such as the Revocation Key subpacket. It is also + appropriate for statements that non-self certifiers want to make + about the key itself, rather than the binding between a key and a + name. + """ + _s = subject.hashdata + _data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s + + if self.type in {SignatureType.Generic_Cert, SignatureType.Persona_Cert, SignatureType.Casual_Cert, + SignatureType.Positive_Cert, SignatureType.CertRevocation}: + """ + A certification signature (type 0x10 through 0x13) hashes the User + ID being bound to the key into the hash context after the above + data. ... A V4 certification + hashes the constant 0xB4 for User ID certifications or the constant + 0xD1 for User Attribute certifications, followed by a four-octet + number giving the length of the User ID or User Attribute data, and + then the User ID or User Attribute data. + + ... + + The [certificate revocation] signature + is computed over the same data as the certificate that it + revokes, and should have a later creation date than that + certificate. + """ + + _s = subject.hashdata + if subject.is_uid: + _data += b'\xb4' + + else: + _data += b'\xd1' + + _data += self.int_to_bytes(len(_s), 4) + _s + + # if this is a new signature, do update_hlen + if 0 in list(self._signature.signature): + self._signature.update_hlen() + + """ + Once the data body is hashed, then a trailer is hashed. (...) + A V4 signature hashes the packet body + starting from its first field, the version number, through the end + of the hashed subpacket data. Thus, the fields hashed are the + signature version, the signature type, the public-key algorithm, the + hash algorithm, the hashed subpacket length, and the hashed + subpacket body. + + V4 signatures also hash in a final trailer of six octets: the + version of the Signature packet, i.e., 0x04; 0xFF; and a four-octet, + big-endian number that is the length of the hashed data from the + Signature packet (note that this number does not include these final + six octets). + """ + + hcontext = bytearray() + hcontext.append(self._signature.header.version if not self.embedded else self._signature._sig.header.version) + hcontext.append(self.type) + hcontext.append(self.key_algorithm) + hcontext.append(self.hash_algorithm) + hcontext += self._signature.subpackets.__hashbytearray__() + hlen = len(hcontext) + _data += hcontext + _data += b'\x04\xff' + _data += self.int_to_bytes(hlen, 4) + return bytes(_data) + + def make_onepass(self): + onepass = OnePassSignatureV3() + onepass.sigtype = self.type + onepass.halg = self.hash_algorithm + onepass.pubalg = self.key_algorithm + onepass.signer = self.signer + onepass.update_hlen() + return onepass + + def parse(self, packet): + unarmored = self.ascii_unarmor(packet) + data = unarmored['body'] + + if unarmored['magic'] is not None and unarmored['magic'] != 'SIGNATURE': + raise ValueError('Expected: SIGNATURE. Got: {}'.format(str(unarmored['magic']))) + + if unarmored['headers'] is not None: + self.ascii_headers = unarmored['headers'] + + # load *one* packet from data + pkt = Packet(data) + if pkt.header.tag == PacketTag.Signature and not isinstance(pkt, Opaque): + self._signature = pkt + + else: + raise ValueError('Expected: Signature. Got: {:s}'.format(pkt.__class__.__name__)) + + +class PGPUID(ParentRef): + @property + def __sig__(self): + return list(self._signatures) + + @property + def name(self): + """If this is a User ID, the stored name. If this is not a User ID, this will be an empty string.""" + return self._uid.name if isinstance(self._uid, UserID) else "" + + @property + def comment(self): + """ + If this is a User ID, this will be the stored comment. If this is not a User ID, or there is no stored comment, + this will be an empty string., + """ + + return self._uid.comment if isinstance(self._uid, UserID) else "" + + @property + def email(self): + """ + If this is a User ID, this will be the stored email address. If this is not a User ID, or there is no stored + email address, this will be an empty string. + """ + return self._uid.email if isinstance(self._uid, UserID) else "" + + @property + def image(self): + """ + If this is a User Attribute, this will be the stored image. If this is not a User Attribute, this will be ``None``. + """ + return self._uid.image.image if isinstance(self._uid, UserAttribute) else None + + @property + def is_primary(self): + """ + If the most recent, valid self-signature specifies this as being primary, this will be True. Otherwise, Faqlse. + """ + return bool(next(iter(self.selfsig._signature.subpackets['h_PrimaryUserID']), False)) + + @property + def is_uid(self): + """ + ``True`` if this is a User ID, otherwise False. + """ + return isinstance(self._uid, UserID) + + @property + def is_ua(self): + """ + ``True`` if this is a User Attribute, otherwise False. + """ + return isinstance(self._uid, UserAttribute) + + @property + def selfsig(self): + """ + This will be the most recent, self-signature of this User ID or Attribute. If there isn't one, this will be ``None``. + """ + if self.parent is not None: + return next((sig for sig in reversed(self._signatures) if sig.signer == self.parent.fingerprint.keyid), None) + + @property + def signers(self): + """ + This will be a set of all of the key ids which have signed this User ID or Attribute. + """ + return set(s.signer for s in self.__sig__) + + @property + def hashdata(self): + if self.is_uid: + return self._uid.__bytearray__()[len(self._uid.header):] + + if self.is_ua: + return self._uid.subpackets.__bytearray__() + + @classmethod + def new(cls, pn, comment="", email=""): + """ + Create a new User ID or photo. + + :param pn: User ID name, or photo. If this is a ``bytearray``, it will be loaded as a photo. + Otherwise, it will be used as the name field for a User ID. + :type pn: ``bytearray``, ``str``, ``unicode`` + :param comment: The comment field for a User ID. Ignored if this is a photo. + :type comment: ``str``, ``unicode`` + :param email: The email address field for a User ID. Ignored if this is a photo. + :type email: ``str``, ``unicode`` + :returns: :py:obj:`PGPUID` + """ + uid = PGPUID() + if isinstance(pn, bytearray): + uid._uid = UserAttribute() + uid._uid.image.image = pn + uid._uid.image.iencoding = ImageEncoding.encodingof(pn) + uid._uid.update_hlen() + + else: + uid._uid = UserID() + uid._uid.name = pn + uid._uid.comment = comment + uid._uid.email = email + uid._uid.update_hlen() + + return uid + + def __init__(self): + """ + PGPUID objects represent User IDs and User Attributes for keys. + + PGPUID implements the ``__format__`` method for User IDs, returning a string in the format + 'name (comment) <email>', leaving out any comment or email fields that are not present. + """ + super(PGPUID, self).__init__() + self._uid = None + self._signatures = SorteDeque() + + def __repr__(self): + if self.selfsig is not None: + return "<PGPUID [{:s}][{}] at 0x{:02X}>".format(self._uid.__class__.__name__, self.selfsig.created, id(self)) + return "<PGPUID [{:s}] at 0x{:02X}>".format(self._uid.__class__.__name__, id(self)) + + def __lt__(self, other): # pragma: no cover + if self.is_uid == other.is_uid: + if self.is_primary == other.is_primary: + return self.selfsig > other.selfsig + + if self.is_primary: + return True + + return False + + if self.is_uid and other.is_ua: + return True + + if self.is_ua and other.is_uid: + return False + + def __or__(self, other): + if isinstance(other, PGPSignature): + self._signatures.insort(other) + if self.parent is not None and self in self.parent._uids: + self.parent._uids.resort(self) + + return self + + if isinstance(other, UserID) and self._uid is None: + self._uid = other + return self + + if isinstance(other, UserAttribute) and self._uid is None: + self._uid = other + return self + + raise TypeError("unsupported operand type(s) for |: '{:s}' and '{:s}'" + "".format(self.__class__.__name__, other.__class__.__name__)) + + def __copy__(self): + # because the default shallow copy isn't actually all that useful, + # and deepcopy does too much work + uid = PGPUID() + uid |= copy.copy(self._uid) + for sig in self._signatures: + uid |= copy.copy(sig) + return uid + + def __format__(self, format_spec): + if self.is_uid: + comment = six.u("") if self.comment == "" else six.u(" ({:s})").format(self.comment) + email = six.u("") if self.email == "" else six.u(" <{:s}>").format(self.email) + return six.u("{:s}{:s}{:s}").format(self.name, comment, email) + + raise NotImplementedError + + +class PGPMessage(Armorable, PGPObject): + @staticmethod + def dash_unescape(text): + return re.subn(r'^- -', '-', text, flags=re.MULTILINE)[0] + + @staticmethod + def dash_escape(text): + return re.subn(r'^-', '- -', text, flags=re.MULTILINE)[0] + + @property + def encrypters(self): + """A ``set`` containing all key ids (if any) to which this message was encrypted.""" + return set(m.encrypter for m in self._sessionkeys if isinstance(m, PKESessionKey)) + + @property + def filename(self): + """If applicable, returns the original filename of the message. Otherwise, returns an empty string.""" + if self.type == 'literal': + return self._message.filename + return '' + + @property + def is_compressed(self): + """``True`` if this message will be compressed when exported""" + return self._compression != CompressionAlgorithm.Uncompressed + + @property + def is_encrypted(self): + """``True`` if this message is encrypted; otherwise, ``False``""" + return isinstance(self._message, (SKEData, IntegrityProtectedSKEData)) + + @property + def is_sensitive(self): + """``True`` if this message is marked sensitive; otherwise ``False``""" + return self.type == 'literal' and self._message.filename == '_CONSOLE' + + @property + def is_signed(self): + """ + ``True`` if this message is signed; otherwise, ``False``. + Should always be ``False`` if the message is encrypted. + """ + return len(self._signatures) > 0 + + @property + def issuers(self): + """A ``set`` containing all key ids (if any) which have signed or encrypted this message.""" + return self.encrypters | self.signers + + @property + def magic(self): + if self.type == 'cleartext': + return "SIGNATURE" + return "MESSAGE" + + @property + def message(self): + """The message contents""" + if self.type == 'cleartext': + return self.bytes_to_text(self._message) + + if self.type == 'literal': + return self._message.contents + + if self.type == 'encrypted': + return self._message + + @property + def signatures(self): + """A ``set`` containing all key ids (if any) which have signed this message.""" + return list(self._signatures) + + @property + def signers(self): + """A ``set`` containing all key ids (if any) which have signed this message.""" + return set(m.signer for m in self._signatures) + + @property + def type(self): + ##TODO: it might be better to use an Enum for the output of this + if isinstance(self._message, (six.string_types, six.binary_type, bytearray)): + return 'cleartext' + + if isinstance(self._message, LiteralData): + return 'literal' + + if isinstance(self._message, (SKEData, IntegrityProtectedSKEData)): + return 'encrypted' + + raise NotImplementedError + + def __init__(self): + """ + PGPMessage objects represent OpenPGP message compositions. + + PGPMessage implements the `__str__` method, the output of which will be the message composition in + OpenPGP-compliant ASCII-armored format. + + PGPMessage implements the `__bytes__` method, the output of which will be the message composition in + OpenPGP-compliant binary format. + + Any signatures within the PGPMessage that are marked as being non-exportable will not be included in the output + of either of those methods. + """ + super(PGPMessage, self).__init__() + self._compression = CompressionAlgorithm.Uncompressed + self._message = None + self._mdc = None + self._signatures = SorteDeque() + self._sessionkeys = [] + + def __bytearray__(self): + if self.is_compressed: + comp = CompressedData() + comp.calg = self._compression + comp.packets = [pkt for pkt in self] + comp.update_hlen() + return comp.__bytearray__() + + _bytes = bytearray() + for pkt in self: + _bytes += pkt.__bytearray__() + return _bytes + + def __str__(self): + if self.type == 'cleartext': + tmpl = u"-----BEGIN PGP SIGNED MESSAGE-----\n" \ + u"{hhdr:s}\n" \ + u"{cleartext:s}\n" \ + u"{signature:s}" + + # only add a Hash: header if we actually have at least one signature + hashes = set(s.hash_algorithm.name for s in self.signatures) + hhdr = 'Hash: {hashes:s}\n'.format(hashes=','.join(sorted(hashes))) if hashes else '' + + return tmpl.format(hhdr=hhdr, + cleartext=self.dash_escape(self.bytes_to_text(self._message)), + signature=super(PGPMessage, self).__str__()) + + return super(PGPMessage, self).__str__() + + def __iter__(self): + if self.type == 'cleartext': + for sig in self._signatures: + yield sig + + elif self.is_encrypted: + for pkt in self._sessionkeys: + yield pkt + yield self.message + + else: + ##TODO: is it worth coming up with a way of disabling one-pass signing? + for sig in self._signatures: + ops = sig.make_onepass() + if sig is not self._signatures[-1]: + ops.nested = True + yield ops + + yield self._message + if self._mdc is not None: # pragma: no cover + yield self._mdc + + for sig in self._signatures: + yield sig + + def __or__(self, other): + if isinstance(other, Marker): + return self + + if isinstance(other, CompressedData): + self._compression = other.calg + for pkt in other.packets: + self |= pkt + return self + + if isinstance(other, (six.string_types, six.binary_type, bytearray)): + if self._message is None: + self._message = self.text_to_bytes(other) + return self + + if isinstance(other, (LiteralData, SKEData, IntegrityProtectedSKEData)): + if self._message is None: + self._message = other + return self + + if isinstance(other, MDC): + if self._mdc is None: + self._mdc = other + return self + + if isinstance(other, OnePassSignature): + # these are "generated" on the fly during composition + return self + + if isinstance(other, Signature): + other = PGPSignature() | other + + if isinstance(other, PGPSignature): + self._signatures.insort(other) + return self + + if isinstance(other, (PKESessionKey, SKESessionKey)): + self._sessionkeys.append(other) + return self + + if isinstance(other, PGPMessage): + self._message = other._message + self._mdc = other._mdc + self._compression = other._compression + self._sessionkeys += other._sessionkeys + self._signatures += other._signatures + return self + + raise NotImplementedError(str(type(other))) + + def __copy__(self): + msg = super(PGPMessage, self).__copy__() + msg._compression = self._compression + msg._message = copy.copy(self._message) + msg._mdc = copy.copy(self._mdc) + + for sig in self._signatures: + msg |= copy.copy(sig) + + for sk in self._sessionkeys: + msg |= copy.copy(sk) + + return msg + + @classmethod + def new(cls, message, **kwargs): + """ + Create a new PGPMessage object. + + :param message: The message to be stored. + :type message: ``str``, ``unicode``, ``bytes``, ``bytearray`` + :returns: :py:obj:`PGPMessage` + + The following optional keyword arguments can be used with :py:meth:`PGPMessage.new`: + + :keyword file: if True, ``message`` should be a path to a file. The contents of that file will be read and used + as the contents of the message. + :type file: ``bool`` + :keyword cleartext: if True, the message will be cleartext with inline signatures. + :type cleartext: ``bool`` + :keyword sensitive: if True, the filename will be set to '_CONSOLE' to signal other OpenPGP clients to treat + this message as being 'for your eyes only'. Ignored if cleartext is True. + :type sensitive: ``bool`` + :keyword format: Set the message format identifier. Ignored if cleartext is True. + :type format: ``str`` + :keyword compression: Set the compression algorithm for the new message. + Defaults to :py:obj:`CompressionAlgorithm.ZIP`. Ignored if cleartext is True. + :keyword encoding: Set the Charset header for the message. + :type encoding: ``str`` representing a valid codec in codecs + """ + # TODO: have 'codecs' above (in :type encoding:) link to python documentation page on codecs + cleartext = kwargs.pop('cleartext', False) + format = kwargs.pop('format', None) + sensitive = kwargs.pop('sensitive', False) + compression = kwargs.pop('compression', CompressionAlgorithm.ZIP) + file = kwargs.pop('file', False) + charset = kwargs.pop('encoding', None) + + filename = '' + mtime = datetime.utcnow() + + msg = PGPMessage() + + if charset: + msg.charset = charset + + # if format in 'tu' and isinstance(message, (six.binary_type, bytearray)): + # # if message format is text or unicode and we got binary data, we'll need to transcode it to UTF-8 + # message = + + if file and os.path.isfile(message): + filename = message + message = bytearray(os.path.getsize(filename)) + mtime = datetime.utcfromtimestamp(os.path.getmtime(filename)) + + with open(filename, 'rb') as mf: + mf.readinto(message) + + # if format is None, we can try to detect it + if format is None: + if isinstance(message, six.text_type): + # message is definitely UTF-8 already + format = 'u' + + elif cls.is_ascii(message): + # message is probably text + format = 't' + + else: + # message is probably binary + format = 'b' + + # if message is a binary type and we're building a textual message, we need to transcode the bytes to UTF-8 + if isinstance(message, (six.binary_type, bytearray)) and (cleartext or format in 'tu'): + message = message.decode(charset or 'utf-8') + + if cleartext: + msg |= message + + else: + # load literal data + lit = LiteralData() + lit._contents = bytearray(msg.text_to_bytes(message)) + lit.filename = '_CONSOLE' if sensitive else os.path.basename(filename) + lit.mtime = mtime + lit.format = format + + # if cls.is_ascii(message): + # lit.format = 't' + + lit.update_hlen() + + msg |= lit + msg._compression = compression + + return msg + + def encrypt(self, passphrase, sessionkey=None, **prefs): + """ + Encrypt the contents of this message using a passphrase. + :param passphrase: The passphrase to use for encrypting this message. + :type passphrase: ``str``, ``unicode``, ``bytes`` + + :optional param sessionkey: Provide a session key to use when encrypting something. Default is ``None``. + If ``None``, a session key of the appropriate length will be generated randomly. + + .. warning:: + + Care should be taken when making use of this option! Session keys *absolutely need* + to be unpredictable! Use the ``gen_key()`` method on the desired + :py:obj:`~constants.SymmetricKeyAlgorithm` to generate the session key! + + :type sessionkey: ``bytes``, ``str`` + :raises: :py:exc:`~errors.PGPEncryptionError` + :returns: A new :py:obj:`PGPMessage` containing the encrypted contents of this message. + """ + cipher_algo = prefs.pop('cipher', SymmetricKeyAlgorithm.AES256) + hash_algo = prefs.pop('hash', HashAlgorithm.SHA256) + + # set up a new SKESessionKeyV4 + skesk = SKESessionKeyV4() + skesk.s2k.usage = 255 + skesk.s2k.specifier = 3 + skesk.s2k.halg = hash_algo + skesk.s2k.encalg = cipher_algo + skesk.s2k.count = skesk.s2k.halg.tuned_count + + if sessionkey is None: + sessionkey = cipher_algo.gen_key() + skesk.encrypt_sk(passphrase, sessionkey) + del passphrase + + msg = PGPMessage() | skesk + + if not self.is_encrypted: + skedata = IntegrityProtectedSKEDataV1() + skedata.encrypt(sessionkey, cipher_algo, self.__bytes__()) + msg |= skedata + + else: + msg |= self + + return msg + + def decrypt(self, passphrase): + """ + Attempt to decrypt this message using a passphrase. + + :param passphrase: The passphrase to use to attempt to decrypt this message. + :type passphrase: ``str``, ``unicode``, ``bytes`` + :raises: :py:exc:`~errors.PGPDecryptionError` if decryption failed for any reason. + :returns: A new :py:obj:`PGPMessage` containing the decrypted contents of this message + """ + if not self.is_encrypted: + raise PGPError("This message is not encrypted!") + + for skesk in iter(sk for sk in self._sessionkeys if isinstance(sk, SKESessionKey)): + try: + symalg, key = skesk.decrypt_sk(passphrase) + decmsg = PGPMessage() + decmsg.parse(self.message.decrypt(key, symalg)) + + except (TypeError, ValueError, NotImplementedError, PGPDecryptionError): + continue + + else: + del passphrase + break + + else: + raise PGPDecryptionError("Decryption failed") + + return decmsg + + def parse(self, packet): + unarmored = self.ascii_unarmor(packet) + data = unarmored['body'] + + if unarmored['magic'] is not None and unarmored['magic'] not in ['MESSAGE', 'SIGNATURE']: + raise ValueError('Expected: MESSAGE. Got: {}'.format(str(unarmored['magic']))) + + if unarmored['headers'] is not None: + self.ascii_headers = unarmored['headers'] + + # cleartext signature + if unarmored['magic'] == 'SIGNATURE': + # the composition for this will be the 'cleartext' as a str, + # followed by one or more signatures (each one loaded into a PGPSignature) + self |= self.dash_unescape(unarmored['cleartext']) + while len(data) > 0: + pkt = Packet(data) + if not isinstance(pkt, Signature): # pragma: no cover + warnings.warn("Discarded unexpected packet: {:s}".format(pkt.__class__.__name__), stacklevel=2) + continue + self |= PGPSignature() | pkt + + else: + while len(data) > 0: + self |= Packet(data) + + +class PGPKey(Armorable, ParentRef, PGPObject): + """ + 11.1. Transferable Public Keys + + OpenPGP users may transfer public keys. The essential elements of a + transferable public key are as follows: + + - One Public-Key packet + + - Zero or more revocation signatures + - One or more User ID packets + + - After each User ID packet, zero or more Signature packets + (certifications) + + - Zero or more User Attribute packets + + - After each User Attribute packet, zero or more Signature packets + (certifications) + + - Zero or more Subkey packets + + - After each Subkey packet, one Signature packet, plus optionally a + revocation + + The Public-Key packet occurs first. Each of the following User ID + packets provides the identity of the owner of this public key. If + there are multiple User ID packets, this corresponds to multiple + means of identifying the same unique individual user; for example, a + user may have more than one email address, and construct a User ID + for each one. + + Immediately following each User ID packet, there are zero or more + Signature packets. Each Signature packet is calculated on the + immediately preceding User ID packet and the initial Public-Key + packet. The signature serves to certify the corresponding public key + and User ID. In effect, the signer is testifying to his or her + belief that this public key belongs to the user identified by this + User ID. + + Within the same section as the User ID packets, there are zero or + more User Attribute packets. Like the User ID packets, a User + Attribute packet is followed by zero or more Signature packets + calculated on the immediately preceding User Attribute packet and the + initial Public-Key packet. + + User Attribute packets and User ID packets may be freely intermixed + in this section, so long as the signatures that follow them are + maintained on the proper User Attribute or User ID packet. + + After the User ID packet or Attribute packet, there may be zero or + more Subkey packets. In general, subkeys are provided in cases where + the top-level public key is a signature-only key. However, any V4 + key may have subkeys, and the subkeys may be encryption-only keys, + signature-only keys, or general-purpose keys. V3 keys MUST NOT have + subkeys. + + Each Subkey packet MUST be followed by one Signature packet, which + should be a subkey binding signature issued by the top-level key. + For subkeys that can issue signatures, the subkey binding signature + MUST contain an Embedded Signature subpacket with a primary key + binding signature (0x19) issued by the subkey on the top-level key. + + Subkey and Key packets may each be followed by a revocation Signature + packet to indicate that the key is revoked. Revocation signatures + are only accepted if they are issued by the key itself, or by a key + that is authorized to issue revocations via a Revocation Key + subpacket in a self-signature by the top-level key. + + Transferable public-key packet sequences may be concatenated to allow + transferring multiple public keys in one operation. + + 11.2. Transferable Secret Keys + + OpenPGP users may transfer secret keys. The format of a transferable + secret key is the same as a transferable public key except that + secret-key and secret-subkey packets are used instead of the public + key and public-subkey packets. Implementations SHOULD include self- + signatures on any user IDs and subkeys, as this allows for a complete + public key to be automatically extracted from the transferable secret + key. Implementations MAY choose to omit the self-signatures, + especially if a transferable public key accompanies the transferable + secret key. + """ + @property + def __key__(self): + return self._key.keymaterial + + @property + def __sig__(self): + return list(self._signatures) + + @property + def created(self): + """A :py:obj:`~datetime.datetime` object of the creation date and time of the key, in UTC.""" + return self._key.created + + @property + def expires_at(self): + """A :py:obj:`~datetime.datetime` object of when this key is to be considered expired, if any. Otherwise, ``None``""" + try: + expires = min(sig.key_expiration for sig in itertools.chain(iter(uid.selfsig for uid in self.userids), self.self_signatures) + if sig.key_expiration is not None) + + except ValueError: + return None + + else: + return (self.created + expires) + + @property + def fingerprint(self): + """The fingerprint of this key, as a :py:obj:`~pgpy.types.Fingerprint` object.""" + if self._key: + return self._key.fingerprint + + @property + def hashdata(self): + # when signing a key, only the public portion of the keys is hashed + # if this is a private key, the private components of the key material need to be left out + if self.is_public: + return self._key.__bytearray__()[len(self._key.header):] + + pub = self._key.pubkey() + return pub.__bytearray__()[len(pub.header):] + + @property + def is_expired(self): + """``True`` if this key is expired, otherwise ``False``""" + expires = self.expires_at + if expires is not None: + return expires <= datetime.utcnow() + + return False + + @property + def is_primary(self): + """``True`` if this is a primary key; ``False`` if this is a subkey""" + return isinstance(self._key, Primary) and not isinstance(self._key, Sub) + + @property + def is_protected(self): + """``True`` if this is a private key that is protected with a passphrase, otherwise ``False``""" + if self.is_public: + return False + + return self._key.protected + + @property + def is_public(self): + """``True`` if this is a public key, otherwise ``False``""" + return isinstance(self._key, Public) and not isinstance(self._key, Private) + + @property + def is_unlocked(self): + """``False`` if this is a private key that is protected with a passphrase and has not yet been unlocked, otherwise ``True``""" + if self.is_public: + return True + + if not self.is_protected: + return True + + return self._key.unlocked + + @property + def key_algorithm(self): + """The :py:obj:`constants.PubKeyAlgorithm` pertaining to this key""" + return self._key.pkalg + + @property + def key_size(self): + """*new in 0.4.1* + The size pertaining to this key. ``int`` for non-EC key algorithms; :py:obj:`constants.EllipticCurveOID` for EC keys. + """ + if self.key_algorithm in {PubKeyAlgorithm.ECDSA, PubKeyAlgorithm.ECDH}: + return self._key.keymaterial.oid + return next(iter(self._key.keymaterial)).bit_length() + + @property + def magic(self): + return '{:s} KEY BLOCK'.format('PUBLIC' if (isinstance(self._key, Public) and not isinstance(self._key, Private)) else + 'PRIVATE' if isinstance(self._key, Private) else '') + + @property + def pubkey(self): + """If the :py:obj:`PGPKey` object is a private key, this method returns a corresponding public key object with + all the trimmings. Otherwise, returns ``None`` + """ + if not self.is_public: + if self._sibling is None or isinstance(self._sibling, weakref.ref): + # create a new key shell + pub = PGPKey() + pub.ascii_headers = self.ascii_headers.copy() + + # get the public half of the primary key + pub._key = self._key.pubkey() + + # get the public half of each subkey + for skid, subkey in self.subkeys.items(): + pub.subkeys[skid] = subkey.pubkey + + # copy user ids and user attributes + for uid in self._uids: + pub |= copy.copy(uid) + + # copy signatures that weren't copied with uids + for sig in self._signatures: + if sig.parent is None: + pub |= copy.copy(sig) + + # keep connect the two halves using a weak reference + self._sibling = weakref.ref(pub) + pub._sibling = weakref.ref(self) + + return self._sibling() + return None + + @pubkey.setter + def pubkey(self, pubkey): + if self.is_public: + raise TypeError("cannot add public sibling to pubkey") + + if not pubkey.is_public: + raise TypeError("sibling must be public") + + if self._sibling is not None and self._sibling() is not None: + raise ValueError("public key reference already set") + + if pubkey.fingerprint != self.fingerprint: + raise ValueError("key fingerprint mismatch") + + # TODO: sync packets with sibling + self._sibling = weakref.ref(pubkey) + pubkey._sibling = weakref.ref(self) + + @property + def self_signatures(self): + keyid, keytype = (self.fingerprint.keyid, SignatureType.DirectlyOnKey) if self.is_primary \ + else (self.parent.fingerprint.keyid, SignatureType.Subkey_Binding) + + ##TODO: filter out revoked signatures as well + for sig in iter(sig for sig in self._signatures + if all([sig.type == keytype, sig.signer == keyid, not sig.is_expired])): + yield sig + + @property + def signers(self): + """A ``set`` of key ids of keys that were used to sign this key""" + return {sig.signer for sig in self.__sig__} + + @property + def subkeys(self): + """An :py:obj:`~collections.OrderedDict` of subkeys bound to this primary key, if applicable, + selected by 16-character keyid.""" + return self._children + + @property + def userids(self): + """A ``list`` of :py:obj:`PGPUID` objects containing User ID information about this key""" + return [ u for u in self._uids if u.is_uid ] + + @property + def userattributes(self): + """A ``list`` of :py:obj:`PGPUID` objects containing one or more images associated with this key""" + return [u for u in self._uids if u.is_ua] + + @classmethod + def new(cls, key_algorithm, key_size): + """ + Generate a new PGP key + + :param key_algorithm: Key algorithm to use. + :type key_algorithm: A :py:obj:`~constants.PubKeyAlgorithm` + :param key_size: Key size in bits, unless `key_algorithm` is :py:obj:`~constants.PubKeyAlgorithm.ECDSA` or + :py:obj:`~constants.PubKeyAlgorithm.ECDH`, in which case it should be the Curve OID to use. + :type key_size: ``int`` or :py:obj:`~constants.EllipticCurveOID` + :return: A newly generated :py:obj:`PGPKey` + """ + # new private key shell first + key = PGPKey() + + if key_algorithm in {PubKeyAlgorithm.RSAEncrypt, PubKeyAlgorithm.RSASign}: # pragma: no cover + warnings.warn('{:s} is deprecated - generating key using RSAEncryptOrSign'.format(key_algorithm.name)) + key_algorithm = PubKeyAlgorithm.RSAEncryptOrSign + + # generate some key data to match key_algorithm and key_size + key._key = PrivKeyV4.new(key_algorithm, key_size) + + return key + + def __init__(self): + """ + PGPKey objects represent OpenPGP compliant keys along with all of their associated data. + + PGPKey implements the `__str__` method, the output of which will be the key composition in + OpenPGP-compliant ASCII-armored format. + + PGPKey implements the `__bytes__` method, the output of which will be the key composition in + OpenPGP-compliant binary format. + + Any signatures within the PGPKey that are marked as being non-exportable will not be included in the output + of either of those methods. + """ + super(PGPKey, self).__init__() + self._key = None + self._children = collections.OrderedDict() + self._signatures = SorteDeque() + self._uids = SorteDeque() + self._sibling = None + + def __bytearray__(self): + _bytes = bytearray() + # us + _bytes += self._key.__bytearray__() + # our signatures; ignore embedded signatures + for sig in iter(s for s in self._signatures if not s.embedded and s.exportable): + _bytes += sig.__bytearray__() + # one or more User IDs, followed by their signatures + for uid in self._uids: + _bytes += uid._uid.__bytearray__() + for s in [s for s in uid._signatures if s.exportable]: + _bytes += s.__bytearray__() + # subkeys + for sk in self._children.values(): + _bytes += sk.__bytearray__() + + return _bytes + + def __repr__(self): + if self._key is not None: + return "<PGPKey [{:s}][0x{:s}] at 0x{:02X}>" \ + "".format(self._key.__class__.__name__, self.fingerprint.keyid, id(self)) + + return "<PGPKey [unknown] at 0x{:02X}>" \ + "".format(id(self)) + + def __contains__(self, item): + if isinstance(item, PGPKey): # pragma: no cover + return item.fingerprint.keyid in self.subkeys + + if isinstance(item, Fingerprint): # pragma: no cover + return item.keyid in self.subkeys + + if isinstance(item, PGPUID): + return item in self._uids + + if isinstance(item, PGPSignature): + return item in self._signatures + + raise TypeError + + def __or__(self, other, from_sib=False): + if isinstance(other, Key) and self._key is None: + self._key = other + + elif isinstance(other, PGPKey) and not other.is_primary and other.is_public == self.is_public: + other._parent = self + self._children[other.fingerprint.keyid] = other + + elif isinstance(other, PGPSignature): + self._signatures.insort(other) + + # if this is a subkey binding signature that has embedded primary key binding signatures, add them to parent + if other.type == SignatureType.Subkey_Binding: + for es in iter(pkb for pkb in other._signature.subpackets['EmbeddedSignature']): + esig = PGPSignature() | es + esig._parent = other + self._signatures.insort(esig) + + elif isinstance(other, PGPUID): + other._parent = weakref.ref(self) + self._uids.insort(other) + + else: + raise TypeError("unsupported operand type(s) for |: '{:s}' and '{:s}'" + "".format(self.__class__.__name__, other.__class__.__name__)) + + if isinstance(self._sibling, weakref.ref) and not from_sib: + sib = self._sibling() + if sib is None: + self._sibling = None + + else: # pragma: no cover + sib.__or__(copy.copy(other), True) + + return self + + def __copy__(self): + key = super(PGPKey, self).__copy__() + key._key = copy.copy(self._key) + + for uid in self._uids: + key |= copy.copy(uid) + + for id, subkey in self._children.items(): + key |= copy.copy(subkey) + + for sig in self._signatures: + if sig.embedded: + # embedded signatures don't need to be explicitly copied + continue + + print(len(key._signatures)) + key |= copy.copy(sig) + + return key + + def protect(self, passphrase, enc_alg, hash_alg): + """ + Add a passphrase to a private key. If the key is already passphrase protected, it should be unlocked before + a new passphrase can be specified. + + Has no effect on public keys. + + :param passphrase: A passphrase to protect the key with + :type passphrase: ``str``, ``unicode`` + :param enc_alg: Symmetric encryption algorithm to use to protect the key + :type enc_alg: :py:obj:`~constants.SymmetricKeyAlgorithm` + :param hash_alg: Hash algorithm to use in the String-to-Key specifier + :type hash_alg: :py:obj:`~constants.HashAlgorithm` + """ + ##TODO: specify strong defaults for enc_alg and hash_alg + if self.is_public: + # we can't protect public keys because only private key material is ever protected + warnings.warn("Public keys cannot be passphrase-protected", stacklevel=2) + return + + if self.is_protected and not self.is_unlocked: + # we can't protect a key that is already protected unless it is unlocked first + warnings.warn("This key is already protected with a passphrase - " + "please unlock it before attempting to specify a new passphrase", stacklevel=2) + return + + for sk in itertools.chain([self], self.subkeys.values()): + sk._key.protect(passphrase, enc_alg, hash_alg) + + del passphrase + + @contextlib.contextmanager + def unlock(self, passphrase): + """ + Context manager method for unlocking passphrase-protected private keys. Has no effect if the key is not both + private and passphrase-protected. + + When the context managed block is exited, the unprotected private key material is removed. + + Example:: + + privkey = PGPKey() + privkey.parse(keytext) + + assert privkey.is_protected + assert privkey.is_unlocked is False + # privkey.sign("some text") <- this would raise an exception + + with privkey.unlock("TheCorrectPassphrase"): + # privkey is now unlocked + assert privkey.is_unlocked + # so you can do things with it + sig = privkey.sign("some text") + + # privkey is no longer unlocked + assert privkey.is_unlocked is False + + Emits a :py:obj:`~warnings.UserWarning` if the key is public or not passphrase protected. + + :param str passphrase: The passphrase to be used to unlock this key. + :raises: :py:exc:`~pgpy.errors.PGPDecryptionError` if the passphrase is incorrect + """ + if self.is_public: + # we can't unprotect public keys because only private key material is ever protected + warnings.warn("Public keys cannot be passphrase-protected", stacklevel=3) + yield self + return + + if not self.is_protected: + # we can't unprotect private keys that are not protected, because there is no ciphertext to decrypt + warnings.warn("This key is not protected with a passphrase", stacklevel=3) + yield self + return + + try: + for sk in itertools.chain([self], self.subkeys.values()): + sk._key.unprotect(passphrase) + del passphrase + yield self + + finally: + # clean up here by deleting the previously decrypted secret key material + for sk in itertools.chain([self], self.subkeys.values()): + sk._key.keymaterial.clear() + + def add_uid(self, uid, selfsign=True, **prefs): + """ + Add a User ID to this key. + + :param uid: The user id to add + :type uid: :py:obj:`~pgpy.PGPUID` + :param selfsign: Whether or not to self-sign the user id before adding it + :type selfsign: ``bool`` + + Valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPKey.certify`. + Any such keyword arguments are ignored if selfsign is ``False`` + """ + uid._parent = self + if selfsign: + uid |= self.certify(uid, SignatureType.Positive_Cert, **prefs) + + self |= uid + + def get_uid(self, search): + """ + Find and return a User ID that matches the search string given. + + :param search: A text string to match name, comment, or email address against + :type search: ``str``, ``unicode`` + :return: The first matching :py:obj:`~pgpy.PGPUID`, or ``None`` if no matches were found. + """ + if self.is_primary: + return next((u for u in self._uids if search in filter(lambda a: a is not None, (u.name, u.comment, u.email))), None) + return self.parent.get_uid(search) + + def del_uid(self, search): + """ + Find and remove a user id that matches the search string given. This method does not modify the corresponding + :py:obj:`~pgpy.PGPUID` object; it only removes it from the list of user ids on the key. + + :param search: A text string to match name, comment, or email address against + :type search: ``str``, ``unicode`` + """ + u = self.get_uid(search) + + if u is None: + raise KeyError("uid '{:s}' not found".format(search)) + + u._parent = None + self._uids.remove(u) + + def add_subkey(self, key, **prefs): + """ + Add a key as a subkey to this key. + :param key: A private :py:obj:`~pgpy.PGPKey` that does not have any subkeys of its own + + :keyword usage: A ``set`` of key usage flags, as :py:obj:`~constants.KeyFlags` for the subkey to be added. + :type usage: ``set`` + + Other valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPKey.certify` + """ + if self.is_public: + raise PGPError("Cannot add a subkey to a public key. Add the subkey to the private component first!") + + if key.is_public: + raise PGPError("Cannot add a public key as a subkey to this key") + + if key.is_primary: + if len(key._children) > 0: + raise PGPError("Cannot add a key that already has subkeys as a subkey!") + + # convert key into a subkey + npk = PrivSubKeyV4() + npk.pkalg = key._key.pkalg + npk.created = key._key.created + npk.keymaterial = key._key.keymaterial + key._key = npk + key._key.update_hlen() + + self._children[key.fingerprint.keyid] = key + key._parent = self + + ##TODO: skip this step if the key already has a subkey binding signature + bsig = self.bind(key, **prefs) + key |= bsig + + def _get_key_flags(self, user=None): + if self.is_primary: + if user is not None: + user = self.get_uid(user) + + elif len(self._uids) == 0: + return {KeyFlags.Certify} + + else: + user = next(iter(self.userids)) + + # RFC 4880 says that primary keys *must* be capable of certification + return {KeyFlags.Certify} | user.selfsig.key_flags + + return next(self.self_signatures).key_flags + + def _sign(self, subject, sig, **prefs): + """ + The actual signing magic happens here. + :param subject: The subject to sign + :param sig: The :py:obj:`PGPSignature` object the new signature is to be encapsulated within + :returns: ``sig``, after the signature is added to it. + """ + user = prefs.pop('user', None) + uid = None + if user is not None: + uid = self.get_uid(user) + + else: + uid = next(iter(self.userids), None) + if uid is None and self.parent is not None: + uid = next(iter(self.parent.userids), None) + + if sig.hash_algorithm is None: + sig._signature.halg = uid.selfsig.hashprefs[0] + + if uid is not None and sig.hash_algorithm not in uid.selfsig.hashprefs: + warnings.warn("Selected hash algorithm not in key preferences", stacklevel=4) + + # signature options that can be applied at any level + expires = prefs.pop('expires', None) + notation = prefs.pop('notation', None) + revocable = prefs.pop('revocable', True) + policy_uri = prefs.pop('policy_uri', None) + + if expires is not None: + # expires should be a timedelta, so if it's a datetime, turn it into a timedelta + if isinstance(expires, datetime): + expires = expires - self.created + + sig._signature.subpackets.addnew('SignatureExpirationTime', hashed=True, expires=expires) + + if revocable is False: + sig._signature.subpackets.addnew('Revocable', hashed=True, bflag=revocable) + + if notation is not None: + for name, value in notation.items(): + # mark all notations as human readable unless value is a bytearray + flags = NotationDataFlags.HumanReadable + if isinstance(value, bytearray): + flags = 0x00 + + sig._signature.subpackets.addnew('NotationData', hashed=True, flags=flags, name=name, value=value) + + if policy_uri is not None: + sig._signature.subpackets.addnew('Policy', hashed=True, uri=policy_uri) + + if user is not None and uid is not None: + signers_uid = "{:s}".format(uid) + sig._signature.subpackets.addnew('SignersUserID', hashed=True, userid=signers_uid) + + # handle an edge case for timestamp signatures vs standalone signatures + if sig.type == SignatureType.Timestamp and len(sig._signature.subpackets._hashed_sp) > 1: + sig._signature.sigtype = SignatureType.Standalone + + sigdata = sig.hashdata(subject) + h2 = sig.hash_algorithm.hasher + h2.update(sigdata) + sig._signature.hash2 = bytearray(h2.digest()[:2]) + + _sig = self._key.sign(sigdata, getattr(hashes, sig.hash_algorithm.name)()) + if _sig is NotImplemented: + raise NotImplementedError(self.key_algorithm) + + sig._signature.signature.from_signer(_sig) + sig._signature.update_hlen() + + return sig + + @KeyAction(KeyFlags.Sign, is_unlocked=True, is_public=False) + def sign(self, subject, **prefs): + """ + Sign text, a message, or a timestamp using this key. + + :param subject: The text to be signed + :type subject: ``str``, :py:obj:`~pgpy.PGPMessage`, ``None`` + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public + :returns: :py:obj:`PGPSignature` + + The following optional keyword arguments can be used with :py:meth:`PGPKey.sign`, as well as + :py:meth:`PGPKey.certify`, :py:meth:`PGPKey.revoke`, and :py:meth:`PGPKey.bind`: + + :keyword expires: Set an expiration date for this signature + :type expires: :py:obj:`~datetime.datetime`, :py:obj:`~datetime.timedelta` + :keyword notation: Add arbitrary notation data to this signature. + :type notation: ``dict`` + :keyword policy_uri: Add a URI to the signature that should describe the policy under which the signature + was issued. + :type policy_uri: ``str`` + :keyword revocable: If ``False``, this signature will be marked non-revocable + :type revocable: ``bool`` + :keyword user: Specify which User ID to use when creating this signature. Also adds a "Signer's User ID" + to the signature. + :type user: ``str`` + """ + sig_type = SignatureType.BinaryDocument + hash_algo = prefs.pop('hash', None) + + if subject is None: + sig_type = SignatureType.Timestamp + + if isinstance(subject, PGPMessage): + if subject.type == 'cleartext': + sig_type = SignatureType.CanonicalDocument + + subject = subject.message + + sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid) + + return self._sign(subject, sig, **prefs) + + @KeyAction(KeyFlags.Certify, is_unlocked=True, is_public=False) + def certify(self, subject, level=SignatureType.Generic_Cert, **prefs): + """ + Sign a key or a user id within a key. + + :param subject: The user id or key to be certified. + :type subject: :py:obj:`PGPKey`, :py:obj:`PGPUID` + :param level: :py:obj:`~constants.SignatureType.Generic_Cert`, :py:obj:`~constants.SignatureType.Persona_Cert`, + :py:obj:`~constants.SignatureType.Casual_Cert`, or :py:obj:`~constants.SignatureType.Positive_Cert`. + Only used if subject is a :py:obj:`PGPUID`; otherwise, it is ignored. + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public + :returns: :py:obj:`PGPSignature` + + In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional + keyword arguments can be used with :py:meth:`PGPKey.certify`. + + These optional keywords only make sense, and thus only have an effect, when self-signing a key or User ID: + + :keyword usage: A ``set`` of key usage flags, as :py:obj:`~constants.KeyFlags`. + This keyword is ignored for non-self-certifications. + :type usage: ``set`` + :keyword ciphers: A list of preferred symmetric ciphers, as :py:obj:`~constants.SymmetricKeyAlgorithm`. + This keyword is ignored for non-self-certifications. + :type ciphers: ``list`` + :keyword hashes: A list of preferred hash algorithms, as :py:obj:`~constants.HashAlgorithm`. + This keyword is ignored for non-self-certifications. + :type hashes: ``list`` + :keyword compression: A list of preferred compression algorithms, as :py:obj:`~constants.CompressionAlgorithm`. + This keyword is ignored for non-self-certifications. + :type compression: ``list`` + :keyword key_expiration: Specify a key expiration date for when this key should expire, or a + :py:obj:`~datetime.timedelta` of how long after the key was created it should expire. + This keyword is ignored for non-self-certifications. + :type key_expiration: :py:obj:`datetime.datetime`, :py:obj:`datetime.timedelta` + :keyword keyserver: Specify the URI of the preferred key server of the user. + This keyword is ignored for non-self-certifications. + :type keyserver: ``str``, ``unicode``, ``bytes`` + :keyword primary: Whether or not to consider the certified User ID as the primary one. + This keyword is ignored for non-self-certifications, and any certifications directly on keys. + :type primary: ``bool`` + + These optional keywords only make sense, and thus only have an effect, when signing another key or User ID: + + :keyword trust: Specify the level and amount of trust to assert when certifying a public key. Should be a tuple + of two ``int`` s, specifying the trust level and trust amount. See + `RFC 4880 Section 5.2.3.13. Trust Signature <http://tools.ietf.org/html/rfc4880#section-5.2.3.13>`_ + for more on what these values mean. + :type trust: ``tuple`` of two ``int`` s + :keyword regex: Specify a regular expression to constrain the specified trust signature in the resulting signature. + Symbolically signifies that the specified trust signature only applies to User IDs which match + this regular expression. + This is meaningless without also specifying trust level and amount. + :type regex: ``str`` + """ + hash_algo = prefs.pop('hash', None) + sig_type = level + if isinstance(subject, PGPKey): + sig_type = SignatureType.DirectlyOnKey + + sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid) + + # signature options that only make sense in certifications + usage = prefs.pop('usage', None) + exportable = prefs.pop('exportable', None) + + if usage is not None: + sig._signature.subpackets.addnew('KeyFlags', hashed=True, flags=usage) + + if exportable is not None: + sig._signature.subpackets.addnew('ExportableCertification', hashed=True, bflag=exportable) + + keyfp = self.fingerprint + if isinstance(subject, PGPKey): + keyfp = subject.fingerprint + if isinstance(subject, PGPUID) and subject._parent is not None: + keyfp = subject._parent.fingerprint + + if keyfp == self.fingerprint: + # signature options that only make sense in self-certifications + cipher_prefs = prefs.pop('ciphers', None) + hash_prefs = prefs.pop('hashes', None) + compression_prefs = prefs.pop('compression', None) + key_expires = prefs.pop('key_expiration', None) + keyserver_flags = prefs.pop('keyserver_flags', None) + keyserver = prefs.pop('keyserver', None) + primary_uid = prefs.pop('primary', None) + + if key_expires is not None: + # key expires should be a timedelta, so if it's a datetime, turn it into a timedelta + if isinstance(key_expires, datetime): + key_expires = key_expires - self.created + + sig._signature.subpackets.addnew('KeyExpirationTime', hashed=True, expires=key_expires) + + if cipher_prefs is not None: + sig._signature.subpackets.addnew('PreferredSymmetricAlgorithms', hashed=True, flags=cipher_prefs) + + if hash_prefs is not None: + sig._signature.subpackets.addnew('PreferredHashAlgorithms', hashed=True, flags=hash_prefs) + if sig.hash_algorithm is None: + sig._signature.halg = hash_prefs[0] + + if compression_prefs is not None: + sig._signature.subpackets.addnew('PreferredCompressionAlgorithms', hashed=True, flags=compression_prefs) + + if keyserver_flags is not None: + sig._signature.subpackets.addnew('KeyServerPreferences', hashed=True, flags=keyserver_flags) + + if keyserver is not None: + sig._signature.subpackets.addnew('PreferredKeyServer', hashed=True, uri=keyserver) + + if primary_uid is not None: + sig._signature.subpackets.addnew('PrimaryUserID', hashed=True, primary=primary_uid) + + # Features is always set on self-signatures + sig._signature.subpackets.addnew('Features', hashed=True, flags=Features.pgpy_features) + + else: + # signature options that only make sense in non-self-certifications + trust = prefs.pop('trust', None) + regex = prefs.pop('regex', None) + + if trust is not None: + sig._signature.subpackets.addnew('TrustSignature', hashed=True, level=trust[0], amount=trust[1]) + + if regex is not None: + sig._signature.subpackets.addnew('RegularExpression', hashed=True, regex=regex) + + return self._sign(subject, sig, **prefs) + + @KeyAction(KeyFlags.Certify, is_unlocked=True, is_public=False) + def revoke(self, target, **prefs): + """ + Revoke a key, a subkey, or all current certification signatures of a User ID that were generated by this key so far. + + :param target: The key to revoke + :type target: :py:obj:`PGPKey`, :py:obj:`PGPUID` + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public + :returns: :py:obj:`PGPSignature` + + In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional + keyword arguments can be used with :py:meth:`PGPKey.revoke`. + + :keyword reason: Defaults to :py:obj:`constants.RevocationReason.NotSpecified` + :type reason: One of :py:obj:`constants.RevocationReason`. + :keyword comment: Defaults to an empty string. + :type comment: ``str`` + """ + hash_algo = prefs.pop('hash', None) + if isinstance(target, PGPUID): + sig_type = SignatureType.CertRevocation + + elif isinstance(target, PGPKey): + ##TODO: check to make sure that the key that is being revoked: + # - is this key + # - is one of this key's subkeys + # - specifies this key as its revocation key + if target.is_primary: + sig_type = SignatureType.KeyRevocation + + else: + sig_type = SignatureType.SubkeyRevocation + + else: # pragma: no cover + raise TypeError + + sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid) + + # signature options that only make sense when revoking + reason = prefs.pop('reason', RevocationReason.NotSpecified) + comment = prefs.pop('comment', "") + sig._signature.subpackets.addnew('ReasonForRevocation', hashed=True, code=reason, string=comment) + + return self._sign(target, sig, **prefs) + + @KeyAction(is_unlocked=True, is_public=False) + def revoker(self, revoker, **prefs): + """ + Generate a signature that specifies another key as being valid for revoking this key. + + :param revoker: The :py:obj:`PGPKey` to specify as a valid revocation key. + :type revoker: :py:obj:`PGPKey` + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked + :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public + :returns: :py:obj:`PGPSignature` + + In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional + keyword arguments can be used with :py:meth:`PGPKey.revoker`. + + :keyword sensitive: If ``True``, this sets the sensitive flag on the RevocationKey subpacket. Currently, + this has no other effect. + :type sensitive: ``bool`` + """ + hash_algo = prefs.pop('hash', None) + + sig = PGPSignature.new(SignatureType.DirectlyOnKey, self.key_algorithm, hash_algo, self.fingerprint.keyid) + + # signature options that only make sense when adding a revocation key + sensitive = prefs.pop('sensitive', False) + keyclass = RevocationKeyClass.Normal | (RevocationKeyClass.Sensitive if sensitive else 0x00) + + sig._signature.subpackets.addnew('RevocationKey', + hashed=True, + algorithm=revoker.key_algorithm, + fingerprint=revoker.fingerprint, + keyclass=keyclass) + + # revocation keys should really not be revocable themselves + prefs['revocable'] = False + return self._sign(self, sig, **prefs) + + @KeyAction(is_unlocked=True, is_public=False) + def bind(self, key, **prefs): + """ + Bind a subkey to this key. + + Valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPkey.certify` + """ + hash_algo = prefs.pop('hash', None) + + if self.is_primary and not key.is_primary: + sig_type = SignatureType.Subkey_Binding + + elif key.is_primary and not self.is_primary: + sig_type = SignatureType.PrimaryKey_Binding + + else: # pragma: no cover + raise PGPError + + sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid) + + if sig_type == SignatureType.Subkey_Binding: + # signature options that only make sense in subkey binding signatures + usage = prefs.pop('usage', None) + + if usage is not None: + sig._signature.subpackets.addnew('KeyFlags', hashed=True, flags=usage) + + # if possible, have the subkey create a primary key binding signature + if key.key_algorithm.can_sign: + subkeyid = key.fingerprint.keyid + esig = None + + if not key.is_public: + esig = key.bind(self) + + elif subkeyid in self.subkeys: # pragma: no cover + esig = self.subkeys[subkeyid].bind(self) + + if esig is not None: + sig._signature.subpackets.addnew('EmbeddedSignature', hashed=False, _sig=esig._signature) + + return self._sign(key, sig, **prefs) + + def verify(self, subject, signature=None): + """ + Verify a subject with a signature using this key. + + :param subject: The subject to verify + :type subject: ``str``, ``unicode``, ``None``, :py:obj:`PGPMessage`, :py:obj:`PGPKey`, :py:obj:`PGPUID` + :param signature: If the signature is detached, it should be specified here. + :type signature: :py:obj:`PGPSignature` + :returns: :py:obj:`~pgpy.types.SignatureVerification` + """ + sspairs = [] + + # some type checking + if not isinstance(subject, (type(None), PGPMessage, PGPKey, PGPUID, PGPSignature, six.string_types, bytes, bytearray)): + raise TypeError("Unexpected subject value: {:s}".format(str(type(subject)))) + if not isinstance(signature, (type(None), PGPSignature)): + raise TypeError("Unexpected signature value: {:s}".format(str(type(signature)))) + + def _filter_sigs(sigs): + _ids = {self.fingerprint.keyid} | set(self.subkeys) + return [ sig for sig in sigs if sig.signer in _ids ] + + # collect signature(s) + if signature is None: + if isinstance(subject, PGPMessage): + sspairs += [ (sig, subject.message) for sig in _filter_sigs(subject.signatures) ] + + if isinstance(subject, (PGPUID, PGPKey)): + sspairs += [ (sig, subject) for sig in _filter_sigs(subject.__sig__) ] + + if isinstance(subject, PGPKey): + # user ids + sspairs += [ (sig, uid) for uid in subject.userids for sig in _filter_sigs(uid.__sig__) ] + # user attributes + sspairs += [ (sig, ua) for ua in subject.userattributes for sig in _filter_sigs(ua.__sig__) ] + # subkey binding signatures + sspairs += [ (sig, subkey) for subkey in subject.subkeys.values() for sig in _filter_sigs(subkey.__sig__) ] + + elif signature.signer in {self.fingerprint.keyid} | set(self.subkeys): + sspairs += [(signature, subject)] + + if len(sspairs) == 0: + raise PGPError("No signatures to verify") + + # finally, start verifying signatures + sigv = SignatureVerification() + for sig, subj in sspairs: + if self.fingerprint.keyid != sig.signer and sig.signer in self.subkeys: + warnings.warn("Signature was signed with this key's subkey: {:s}. " + "Verifying with subkey...".format(sig.signer), + stacklevel=2) + sigv &= self.subkeys[sig.signer].verify(subj, sig) + + else: + verified = self._key.verify(sig.hashdata(subj), sig.__sig__, getattr(hashes, sig.hash_algorithm.name)()) + if verified is NotImplemented: + raise NotImplementedError(sig.key_algorithm) + + sigv.add_sigsubj(sig, self.fingerprint.keyid, subj, verified) + + return sigv + + @KeyAction(KeyFlags.EncryptCommunications, KeyFlags.EncryptStorage, is_public=True) + def encrypt(self, message, sessionkey=None, **prefs): + """ + Encrypt a PGPMessage using this key. + + :param message: The message to encrypt. + :type message: :py:obj:`PGPMessage` + :optional param sessionkey: Provide a session key to use when encrypting something. Default is ``None``. + If ``None``, a session key of the appropriate length will be generated randomly. + + .. warning:: + + Care should be taken when making use of this option! Session keys *absolutely need* + to be unpredictable! Use the ``gen_key()`` method on the desired + :py:obj:`~constants.SymmetricKeyAlgorithm` to generate the session key! + :type sessionkey: ``bytes``, ``str`` + + :raises: :py:exc:`~errors.PGPEncryptionError` if encryption failed for any reason. + :returns: A new :py:obj:`PGPMessage` with the encrypted contents of ``message`` + + The following optional keyword arguments can be used with :py:meth:`PGPKey.encrypt`: + + :keyword cipher: Specifies the symmetric block cipher to use when encrypting the message. + :type cipher: :py:obj:`~constants.SymmetricKeyAlgorithm` + :keyword user: Specifies the User ID to use as the recipient for this encryption operation, for the purposes of + preference defaults and selection validation. + :type user: ``str``, ``unicode`` + """ + user = prefs.pop('user', None) + uid = None + if user is not None: + uid = self.get_uid(user) + else: + uid = next(iter(self.userids), None) + if uid is None and self.parent is not None: + uid = next(iter(self.parent.userids), None) + cipher_algo = prefs.pop('cipher', uid.selfsig.cipherprefs[0]) + + if cipher_algo not in uid.selfsig.cipherprefs: + warnings.warn("Selected symmetric algorithm not in key preferences", stacklevel=3) + + if message.is_compressed and message._compression not in uid.selfsig.compprefs: + warnings.warn("Selected compression algorithm not in key preferences", stacklevel=3) + + if sessionkey is None: + sessionkey = cipher_algo.gen_key() + + # set up a new PKESessionKeyV3 + pkesk = PKESessionKeyV3() + pkesk.encrypter = bytearray(binascii.unhexlify(self.fingerprint.keyid.encode('latin-1'))) + pkesk.pkalg = self.key_algorithm + # pkesk.encrypt_sk(self.__key__, cipher_algo, sessionkey) + pkesk.encrypt_sk(self._key, cipher_algo, sessionkey) + + if message.is_encrypted: # pragma: no cover + _m = message + + else: + _m = PGPMessage() + skedata = IntegrityProtectedSKEDataV1() + skedata.encrypt(sessionkey, cipher_algo, message.__bytes__()) + _m |= skedata + + _m |= pkesk + + return _m + + def decrypt(self, message): + """ + Decrypt a PGPMessage using this key. + + :param message: An encrypted :py:obj:`PGPMessage` + :returns: A new :py:obj:`PGPMessage` with the decrypted contents of ``message`` + """ + if not message.is_encrypted: + warnings.warn("This message is not encrypted", stacklevel=2) + return message + + if self.fingerprint.keyid not in message.issuers: + sks = set(self.subkeys) + mis = set(message.issuers) + if sks & mis: + skid = list(sks & mis)[0] + warnings.warn("Message was encrypted with this key's subkey: {:s}. " + "Decrypting with that...".format(skid), + stacklevel=2) + return self.subkeys[skid].decrypt(message) + + raise PGPError("Cannot decrypt the provided message with this key") + + pkesk = next(pk for pk in message._sessionkeys if pk.pkalg == self.key_algorithm and pk.encrypter == self.fingerprint.keyid) + alg, key = pkesk.decrypt_sk(self._key) + + # now that we have the symmetric cipher used and the key, we can decrypt the actual message + decmsg = PGPMessage() + decmsg.parse(message.message.decrypt(key, alg)) + + return decmsg + + def parse(self, data): + unarmored = self.ascii_unarmor(data) + data = unarmored['body'] + + if unarmored['magic'] is not None and 'KEY' not in unarmored['magic']: + raise ValueError('Expected: KEY. Got: {}'.format(str(unarmored['magic']))) + + if unarmored['headers'] is not None: + self.ascii_headers = unarmored['headers'] + + # parse packets + # keys will hold other keys parsed here + keys = collections.OrderedDict() + # orphaned will hold all non-opaque orphaned packets + orphaned = [] + # last holds the last non-signature thing processed + + ##TODO: see issue #141 and fix this better + getpkt = lambda d: Packet(d) if len(d) > 0 else None # flake8: noqa + # some packets are filtered out + getpkt = filter(lambda p: p.header.tag != PacketTag.Trust, iter(functools.partial(getpkt, data), None)) + + def pktgrouper(): + class PktGrouper(object): + def __init__(self): + self.last = None + + def __call__(self, pkt): + if pkt.header.tag != PacketTag.Signature: + self.last = '{:02X}_{:s}'.format(id(pkt), pkt.__class__.__name__) + return self.last + return PktGrouper() + + while True: + # print(type(p) for p in getpkt) + for group in iter(group for _, group in itertools.groupby(getpkt, key=pktgrouper()) if not _.endswith('Opaque')): + pkt = next(group) + + # deal with pkt first + if isinstance(pkt, Key): + pgpobj = (self if self._key is None else PGPKey()) | pkt + + elif isinstance(pkt, (UserID, UserAttribute)): + pgpobj = PGPUID() | pkt + + else: # pragma: no cover + break + + # add signatures to whatever we got + [ operator.ior(pgpobj, PGPSignature() | sig) for sig in group if not isinstance(sig, Opaque) ] + + # and file away pgpobj + if isinstance(pgpobj, PGPKey): + if pgpobj.is_primary: + keys[(pgpobj.fingerprint.keyid, pgpobj.is_public)] = pgpobj + + else: + keys[next(reversed(keys))] |= pgpobj + + elif isinstance(pgpobj, PGPUID): + # parent is likely the most recently parsed primary key + keys[next(reversed(keys))] |= pgpobj + + else: # pragma: no cover + break + else: + # finished normally + break + + # this will only be reached called if the inner loop hit a break + warnings.warn("Warning: Orphaned packet detected! {:s}".format(repr(pkt)), stacklevel=2) # pragma: no cover + orphaned.append(pkt) # pragma: no cover + for pkt in group: # pragma: no cover + orphaned.append(pkt) + + # remove the reference to self from keys + [ keys.pop((getattr(self, 'fingerprint.keyid', '~'), None), t) for t in (True, False) ] + # return {'keys': keys, 'orphaned': orphaned} + return keys + + +class PGPKeyring(collections.Container, collections.Iterable, collections.Sized): + def __init__(self, *args): + """ + PGPKeyring objects represent in-memory keyrings that can contain any combination of supported private and public + keys. It can not currently be conveniently exported to a format that can be understood by GnuPG. + """ + super(PGPKeyring, self).__init__() + self._keys = {} + self._pubkeys = collections.deque() + self._privkeys = collections.deque() + self._aliases = collections.deque([{}]) + self.load(*args) + + def __contains__(self, alias): + aliases = set().union(*self._aliases) + + if isinstance(alias, six.string_types): + return alias in aliases or alias.replace(' ', '') in aliases + + return alias in aliases # pragma: no cover + + def __len__(self): + return len(self._keys) + + def __iter__(self): # pragma: no cover + for pgpkey in itertools.chain(self._pubkeys, self._privkeys): + yield pgpkey + + def _get_key(self, alias): + for m in self._aliases: + if alias in m: + return self._keys[m[alias]] + + if alias.replace(' ', '') in m: + return self._keys[m[alias.replace(' ', '')]] + + raise KeyError(alias) + + def _get_keys(self, alias): + return [self._keys[m[alias]] for m in self._aliases if alias in m] + + def _sort_alias(self, alias): + # remove alias from all levels of _aliases, and sort by created time and key half + # so the order of _aliases from left to right: + # - newer keys come before older ones + # - private keys come before public ones + # + # this list is sorted in the opposite direction from that, because they will be placed into self._aliases + # from right to left. + pkids = sorted(list(set().union(m.pop(alias) for m in self._aliases if alias in m)), + key=lambda pkid: (self._keys[pkid].created, self._keys[pkid].is_public)) + + # drop the now-sorted aliases into place + for depth, pkid in enumerate(pkids): + self._aliases[depth][alias] = pkid + + # finally, remove any empty dicts left over + while {} in self._aliases: # pragma: no cover + self._aliases.remove({}) + + def _add_alias(self, alias, pkid): + # brand new alias never seen before! + if alias not in self: + self._aliases[-1][alias] = pkid + + # this is a duplicate alias->key link; ignore it + elif alias in self and pkid in set(m[alias] for m in self._aliases if alias in m): + pass # pragma: no cover + + # this is an alias that already exists, but points to a key that is not already referenced by it + else: + adepth = len(self._aliases) - len([None for m in self._aliases if alias in m]) - 1 + # all alias maps have this alias, so increase total depth by 1 + if adepth == -1: + self._aliases.appendleft({}) + adepth = 0 + + self._aliases[adepth][alias] = pkid + self._sort_alias(alias) + + def _add_key(self, pgpkey): + pkid = id(pgpkey) + if pkid not in self._keys: + self._keys[pkid] = pgpkey + + # add to _{pub,priv}keys if this is either a primary key, or a subkey without one + if pgpkey.parent is None: + if pgpkey.is_public: + self._pubkeys.append(pkid) + + else: + self._privkeys.append(pkid) + + # aliases + self._add_alias(pgpkey.fingerprint, pkid) + self._add_alias(pgpkey.fingerprint.keyid, pkid) + self._add_alias(pgpkey.fingerprint.shortid, pkid) + for uid in pgpkey.userids: + self._add_alias(uid.name, pkid) + if uid.comment: + self._add_alias(uid.comment, pkid) + + if uid.email: + self._add_alias(uid.email, pkid) + + # subkeys + for subkey in pgpkey.subkeys.values(): + self._add_key(subkey) + + def load(self, *args): + """ + Load all keys provided into this keyring object. + + :param \*args: Each arg in ``args`` can be any of the formats supported by :py:meth:`PGPKey.from_path` and + :py:meth:`PGPKey.from_blob`, or a ``list`` or ``tuple`` of these. + :type \*args: ``list``, ``tuple``, ``str``, ``unicode``, ``bytes``, ``bytearray`` + :returns: a ``set`` containing the unique fingerprints of all of the keys that were loaded during this operation. + """ + def _preiter(first, iterable): + yield first + for item in iterable: + yield item + + loaded = set() + for key in iter(item for ilist in iter(ilist if isinstance(ilist, (tuple, list)) else [ilist] for ilist in args) + for item in ilist): + if os.path.isfile(key): + _key, keys = PGPKey.from_file(key) + + else: + _key, keys = PGPKey.from_blob(key) + + for ik in _preiter(_key, keys.values()): + self._add_key(ik) + loaded |= {ik.fingerprint} | {isk.fingerprint for isk in ik.subkeys.values()} + + return list(loaded) + + @contextlib.contextmanager + def key(self, identifier): + """ + A context-manager method. Yields the first :py:obj:`PGPKey` object that matches the provided identifier. + + :param identifier: The identifier to use to select a loaded key. + :type identifier: :py:exc:`PGPMessage`, :py:exc:`PGPSignature`, ``str`` + :raises: :py:exc:`KeyError` if there is no loaded key that satisfies the identifier. + """ + if isinstance(identifier, PGPMessage): + for issuer in identifier.issuers: + if issuer in self: + identifier = issuer + break + + if isinstance(identifier, PGPSignature): + identifier = identifier.signer + + yield self._get_key(identifier) + + def fingerprints(self, keyhalf='any', keytype='any'): + """ + List loaded fingerprints with some optional filtering. + + :param str keyhalf: Can be 'any', 'public', or 'private'. If 'public', or 'private', the fingerprints of keys of the + the other type will not be included in the results. + :param str keytype: Can be 'any', 'primary', or 'sub'. If 'primary' or 'sub', the fingerprints of keys of the + the other type will not be included in the results. + :returns: a ``set`` of fingerprints of keys matching the filters specified. + """ + return {pk.fingerprint for pk in self._keys.values() + if pk.is_primary in [True if keytype in ['primary', 'any'] else None, + False if keytype in ['sub', 'any'] else None] + if pk.is_public in [True if keyhalf in ['public', 'any'] else None, + False if keyhalf in ['private', 'any'] else None]} + + def unload(self, key): + """ + Unload a loaded key and its subkeys. + + The easiest way to do this is to select a key using :py:meth:`PGPKeyring.key` first:: + + with keyring.key("DSA von TestKey") as key: + keyring.unload(key) + + :param key: The key to unload. + :type key: :py:obj:`PGPKey` + """ + assert isinstance(key, PGPKey) + pkid = id(key) + if pkid in self._keys: + # remove references + [ kd.remove(pkid) for kd in [self._pubkeys, self._privkeys] if pkid in kd ] + # remove the key + self._keys.pop(pkid) + + # remove aliases + for m, a in [ (m, a) for m in self._aliases for a, p in m.items() if p == pkid ]: + m.pop(a) + # do a re-sort of this alias if it was not unique + if a in self: + self._sort_alias(a) + + # if key is a primary key, unload its subkeys as well + if key.is_primary: + [ self.unload(sk) for sk in key.subkeys.values() ] diff --git a/src/leap/mx/vendor/pgpy/symenc.py b/src/leap/mx/vendor/pgpy/symenc.py new file mode 100644 index 0000000..3d81a7c --- /dev/null +++ b/src/leap/mx/vendor/pgpy/symenc.py @@ -0,0 +1,58 @@ +""" symenc.py +""" +import six + +from cryptography.exceptions import UnsupportedAlgorithm + +from cryptography.hazmat.backends import default_backend + +from cryptography.hazmat.primitives.ciphers import Cipher +from cryptography.hazmat.primitives.ciphers import modes + +from .errors import PGPDecryptionError +from .errors import PGPEncryptionError +from .errors import PGPInsecureCipher + +__all__ = ['_encrypt', + '_decrypt'] + + +def _encrypt(pt, key, alg, iv=None): + if iv is None: + iv = b'\x00' * (alg.block_size // 8) + + if alg.is_insecure: + raise PGPInsecureCipher("{:s} is not secure. Do not use it for encryption!".format(alg.name)) + + if not callable(alg.cipher): + raise PGPEncryptionError("Cipher {:s} not supported".format(alg.name)) + + try: + encryptor = Cipher(alg.cipher(key), modes.CFB(iv), default_backend()).encryptor() + + except UnsupportedAlgorithm as ex: # pragma: no cover + six.raise_from(PGPEncryptionError, ex) + + else: + return bytearray(encryptor.update(pt) + encryptor.finalize()) + + +def _decrypt(ct, key, alg, iv=None): + if iv is None: + """ + Instead of using an IV, OpenPGP prefixes a string of length + equal to the block size of the cipher plus two to the data before it + is encrypted. The first block-size octets (for example, 8 octets for + a 64-bit block length) are random, and the following two octets are + copies of the last two octets of the IV. + """ + iv = b'\x00' * (alg.block_size // 8) + + try: + decryptor = Cipher(alg.cipher(key), modes.CFB(iv), default_backend()).decryptor() + + except UnsupportedAlgorithm as ex: # pragma: no cover + six.raise_from(PGPDecryptionError, ex) + + else: + return bytearray(decryptor.update(ct) + decryptor.finalize()) diff --git a/src/leap/mx/vendor/pgpy/types.py b/src/leap/mx/vendor/pgpy/types.py new file mode 100644 index 0000000..b549434 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/types.py @@ -0,0 +1,729 @@ +""" types.py +""" +from __future__ import division + +import abc +import base64 +import binascii +import bisect +import codecs +import collections +import operator +import os +import re +import warnings +import weakref + +from enum import EnumMeta +from enum import IntEnum + +import six + +from ._author import __version__ + +from .decorators import sdproperty + +from .errors import PGPError + +__all__ = ['Armorable', + 'ParentRef', + 'PGPObject', + 'Field', + 'Header', + 'MetaDispatchable', + 'Dispatchable', + 'SignatureVerification', + 'FlagEnumMeta', + 'FlagEnum', + 'Fingerprint', + 'SorteDeque'] + +if six.PY2: + FileNotFoundError = IOError + re.ASCII = 0 + + +class Armorable(six.with_metaclass(abc.ABCMeta)): + __crc24_init__ = 0x0B704CE + __crc24_poly__ = 0x1864CFB + + __armor_fmt__ = '-----BEGIN PGP {block_type}-----\n' \ + '{headers}\n' \ + '{packet}\n' \ + '={crc}\n' \ + '-----END PGP {block_type}-----\n' + + @property + def charset(self): + return self.ascii_headers.get('Charset', 'utf-8') + + @charset.setter + def charset(self, encoding): + self.ascii_headers['Charset'] = codecs.lookup(encoding).name + + @staticmethod + def is_ascii(text): + if isinstance(text, six.string_types): + return bool(re.match(r'^[ -~\r\n]+$', text, flags=re.ASCII)) + + if isinstance(text, (bytes, bytearray)): + return bool(re.match(br'^[ -~\r\n]+$', text, flags=re.ASCII)) + + raise TypeError("Expected: ASCII input of type str, bytes, or bytearray") # pragma: no cover + + @staticmethod + def ascii_unarmor(text): + """ + Takes an ASCII-armored PGP block and returns the decoded byte value. + + :param text: An ASCII-armored PGP block, to un-armor. + :raises: :py:exc:`ValueError` if ``text`` did not contain an ASCII-armored PGP block. + :raises: :py:exc:`TypeError` if ``text`` is not a ``str``, ``bytes``, or ``bytearray`` + :returns: A ``dict`` containing information from ``text``, including the de-armored data. + """ + m = {'magic': None, 'headers': None, 'body': bytearray(), 'crc': None} + if not Armorable.is_ascii(text): + m['body'] = bytearray(text) + return m + + if isinstance(text, (bytes, bytearray)): # pragma: no cover + text = text.decode('latin-1') + + # the re.VERBOSE flag allows for: + # - whitespace is ignored except when in a character class or escaped + # - anything after a '#' that is not escaped or in a character class is ignored, allowing for comments + m = re.search(r"""# This capture group is optional because it will only be present in signed cleartext messages + (^-{5}BEGIN\ PGP\ SIGNED\ MESSAGE-{5}(?:\r?\n) + (Hash:\ (?P<hashes>[A-Za-z0-9\-,]+)(?:\r?\n){2})? + (?P<cleartext>(.*\n)+)(?:\r?\n) + )? + # armor header line; capture the variable part of the magic text + ^-{5}BEGIN\ PGP\ (?P<magic>[A-Z0-9 ,]+)-{5}(?:\r?\n) + # try to capture all the headers into one capture group + # if this doesn't match, m['headers'] will be None + (?P<headers>(^.+:\ .+(?:\r?\n))+)?(?:\r?\n)? + # capture all lines of the body, up to 76 characters long, + # including the newline, and the pad character(s) + (?P<body>([A-Za-z0-9+/]{1,75}={,2}(?:\r?\n))+) + # capture the armored CRC24 value + ^=(?P<crc>[A-Za-z0-9+/]{4})(?:\r?\n) + # finally, capture the armor tail line, which must match the armor header line + ^-{5}END\ PGP\ (?P=magic)-{5}(?:\r?\n)? + """, text, flags=re.MULTILINE | re.VERBOSE) + + if m is None: # pragma: no cover + raise ValueError("Expected: ASCII-armored PGP data") + + m = m.groupdict() + + if m['hashes'] is not None: + m['hashes'] = m['hashes'].split(',') + + if m['headers'] is not None: + m['headers'] = collections.OrderedDict(re.findall('^(?P<key>.+): (?P<value>.+)$\n?', m['headers'], flags=re.MULTILINE)) + + if m['body'] is not None: + try: + m['body'] = bytearray(base64.b64decode(m['body'].encode())) + + except (binascii.Error, TypeError) as ex: + six.raise_from(PGPError, ex) + + if m['crc'] is not None: + m['crc'] = Header.bytes_to_int(base64.b64decode(m['crc'].encode())) + if Armorable.crc24(m['body']) != m['crc']: + warnings.warn('Incorrect crc24', stacklevel=3) + + return m + + @staticmethod + def crc24(data): + # CRC24 computation, as described in the RFC 4880 section on Radix-64 Conversions + # + # The checksum is a 24-bit Cyclic Redundancy Check (CRC) converted to + # four characters of radix-64 encoding by the same MIME base64 + # transformation, preceded by an equal sign (=). The CRC is computed + # by using the generator 0x864CFB and an initialization of 0xB704CE. + # The accumulation is done on the data before it is converted to + # radix-64, rather than on the converted data. + crc = Armorable.__crc24_init__ + + if not isinstance(data, bytearray): + data = six.iterbytes(data) + + for b in data: + crc ^= b << 16 + + for i in range(8): + crc <<= 1 + if crc & 0x1000000: + crc ^= Armorable.__crc24_poly__ + + return crc & 0xFFFFFF + + @abc.abstractproperty + def magic(self): + """The magic string identifier for the current PGP type""" + + @classmethod + def from_file(cls, filename): + with open(filename, 'rb') as file: + obj = cls() + data = bytearray(os.path.getsize(filename)) + file.readinto(data) + + po = obj.parse(data) + + if po is not None: + return (obj, po) + + return obj # pragma: no cover + + @classmethod + def from_blob(cls, blob): + obj = cls() + if (not isinstance(blob, six.binary_type)) and (not isinstance(blob, bytearray)): + po = obj.parse(bytearray(blob, 'latin-1')) + + else: + po = obj.parse(bytearray(blob)) + + if po is not None: + return (obj, po) + + return obj # pragma: no cover + + def __init__(self): + super(Armorable, self).__init__() + self.ascii_headers = collections.OrderedDict() + self.ascii_headers['Version'] = 'PGPy v' + __version__ # Default value + + def __str__(self): + payload = base64.b64encode(self.__bytes__()).decode('latin-1') + payload = '\n'.join(payload[i:(i + 64)] for i in range(0, len(payload), 64)) + + return self.__armor_fmt__.format( + block_type=self.magic, + headers=''.join('{key}: {val}\n'.format(key=key, val=val) for key, val in self.ascii_headers.items()), + packet=payload, + crc=base64.b64encode(PGPObject.int_to_bytes(self.crc24(self.__bytes__()), 3)).decode('latin-1') + ) + + def __copy__(self): + obj = self.__class__() + obj.ascii_headers = self.ascii_headers.copy() + + return obj + + +class ParentRef(object): + # mixin class to handle weak-referencing a parent object + @property + def _parent(self): + if isinstance(self.__parent, weakref.ref): + return self.__parent() + return self.__parent + + @_parent.setter + def _parent(self, parent): + try: + self.__parent = weakref.ref(parent) + + except TypeError: + self.__parent = parent + + @property + def parent(self): + return self._parent + + def __init__(self): + super(ParentRef, self).__init__() + self._parent = None + + +class PGPObject(six.with_metaclass(abc.ABCMeta, object)): + __metaclass__ = abc.ABCMeta + + @staticmethod + def int_byte_len(i): + return (i.bit_length() + 7) // 8 + + @staticmethod + def bytes_to_int(b, order='big'): # pragma: no cover + """convert bytes to integer""" + if six.PY2: + # save the original type of b without having to copy any data + _b = b.__class__() + if order != 'little': + b = reversed(b) + + if not isinstance(_b, bytearray): + b = six.iterbytes(b) + + return sum(c << (i * 8) for i, c in enumerate(b)) + + return int.from_bytes(b, order) + + @staticmethod + def int_to_bytes(i, minlen=1, order='big'): # pragma: no cover + """convert integer to bytes""" + blen = max(minlen, PGPObject.int_byte_len(i), 1) + + if six.PY2: + r = iter(_ * 8 for _ in (range(blen) if order == 'little' else range(blen - 1, -1, -1))) + return bytes(bytearray((i >> c) & 0xff for c in r)) + + return i.to_bytes(blen, order) + + @staticmethod + def text_to_bytes(text): + if text is None: + return text + + # if we got bytes, just return it + if isinstance(text, (bytearray, six.binary_type)): + return text + + # if we were given a unicode string, or if we translated the string into utf-8, + # we know that Python already has it in utf-8 encoding, so we can now just encode it to bytes + return text.encode('utf-8') + + @staticmethod + def bytes_to_text(text): + if text is None or isinstance(text, six.text_type): + return text + + return text.decode('utf-8') + + @abc.abstractmethod + def parse(self, packet): + """this method is too abstract to understand""" + + @abc.abstractmethod + def __bytearray__(self): + """ + Returns the contents of concrete subclasses in a binary format that can be understood by other OpenPGP + implementations + """ + + def __bytes__(self): + """ + Return the contents of concrete subclasses in a binary format that can be understood by other OpenPGP + implementations + """ + # this is what all subclasses will do anyway, so doing this here we can reduce code duplication significantly + return bytes(self.__bytearray__()) + + +class Field(PGPObject): + @abc.abstractmethod + def __len__(self): + """Return the length of the output of __bytes__""" + + +class Header(Field): + @staticmethod + def encode_length(l, nhf=True, llen=1): + def _new_length(l): + if 192 > l: + return Header.int_to_bytes(l) + + elif 8384 > l: + elen = ((l & 0xFF00) + (192 << 8)) + ((l & 0xFF) - 192) + return Header.int_to_bytes(elen, 2) + + return b'\xFF' + Header.int_to_bytes(l, 4) + + def _old_length(l, llen): + return Header.int_to_bytes(l, llen) if llen > 0 else b'' + + return _new_length(l) if nhf else _old_length(l, llen) + + @sdproperty + def length(self): + return self._len + + @length.register(int) + def length_int(self, val): + self._len = val + + @length.register(six.binary_type) + @length.register(bytearray) + def length_bin(self, val): + def _new_len(b): + fo = b[0] + + if 192 > fo: + self._len = self.bytes_to_int(b[:1]) + del b[:1] + + elif 224 > fo: # >= 192 is implied + dlen = self.bytes_to_int(b[:2]) + self._len = ((dlen - (192 << 8)) & 0xFF00) + ((dlen & 0xFF) + 192) + del b[:2] + + elif 255 > fo: # pragma: no cover + # not testable until partial body lengths actually work + # >= 224 is implied + # this is a partial-length header + self._partial = True + self._len = 1 << (fo & 0x1f) + + elif 255 == fo: + self._len = self.bytes_to_int(b[1:5]) + del b[:5] + + else: # pragma: no cover + raise ValueError("Malformed length: 0x{:02x}".format(fo)) + + def _old_len(b): + if self.llen > 0: + self._len = self.bytes_to_int(b[:self.llen]) + del b[:self.llen] + + else: # pragma: no cover + self._len = 0 + + _new_len(val) if self._lenfmt == 1 else _old_len(val) + + @sdproperty + def llen(self): + l = self.length + lf = self._lenfmt + + if lf == 1: + # new-format length + if 192 > l: + return 1 + + elif 8384 > self.length: # >= 192 is implied + return 2 + + else: + return 5 + + else: + # old-format length + ##TODO: what if _llen needs to be (re)computed? + return self._llen + + @llen.register(int) + def llen_int(self, val): + if self._lenfmt == 0: + self._llen = {0: 1, 1: 2, 2: 4, 3: 0}[val] + + def __init__(self): + super(Header, self).__init__() + self._len = 1 + self._llen = 1 + self._lenfmt = 1 + self._partial = False + + +class MetaDispatchable(abc.ABCMeta): + """ + MetaDispatchable is a metaclass for objects that subclass Dispatchable + """ + + _roots = set() + """ + _roots is a set of all currently registered RootClass class objects + + A RootClass is successfully registered if the following things are true: + - it inherits (directly or indirectly) from Dispatchable + - __typeid__ == -1 + """ + _registry = {} + """ + _registry is the Dispatchable class registry. It uses the following format: + + { (RootClass, None): OpaqueClass }: + denotes the default ("opaque") for a given RootClass. + + An OpaqueClass is successfully registered as such provided the following conditions are met: + - it inherits directly from a RootClass + - __typeid__ is None + + { (RootClass, TypeID): SubClass }: + denotes the class that handles the type given in TypeID + + a SubClass is successfully registered as such provided the following conditions are met: + - it inherits (directly or indirectly) from a RootClass + - __typeid__ is a positive int + - the given typeid is not already registered + + { (RootClass, TypeID): VerSubClass }: + denotes that a given TypeID has multiple versions, and that this is class' subclasses handle those. + A VerSubClass is registered identically to a normal SubClass. + + { (RootClass, TypeID, Ver): VerSubClass }: + denotes the class that handles the type given in TypeID and the version of that type given in Ver + + a Versioned SubClass is successfully registered as such provided the following conditions are met: + - it inherits from a VerSubClass + - __ver__ > 0 + - the given typeid/ver combination is not already registered + """ + + def __new__(mcs, name, bases, attrs): # NOQA + ncls = super(MetaDispatchable, mcs).__new__(mcs, name, bases, attrs) + + if not hasattr(ncls.__typeid__, '__isabstractmethod__'): + if ncls.__typeid__ == -1 and not issubclass(ncls, tuple(MetaDispatchable._roots)): + # this is a root class + MetaDispatchable._roots.add(ncls) + + elif issubclass(ncls, tuple(MetaDispatchable._roots)) and ncls.__typeid__ != -1: + for rcls in [ root for root in MetaDispatchable._roots if issubclass(ncls, root) ]: + if (rcls, ncls.__typeid__) not in MetaDispatchable._registry: + MetaDispatchable._registry[(rcls, ncls.__typeid__)] = ncls + + if (ncls.__ver__ is not None and ncls.__ver__ > 0 and + (rcls, ncls.__typeid__, ncls.__ver__) not in MetaDispatchable._registry): + MetaDispatchable._registry[(rcls, ncls.__typeid__, ncls.__ver__)] = ncls + + # finally, return the new class object + return ncls + + def __call__(cls, packet=None): # NOQA + def _makeobj(cls): + obj = object.__new__(cls) + obj.__init__() + return obj + + if packet is not None: + if cls in MetaDispatchable._roots: + rcls = cls + + elif issubclass(cls, tuple(MetaDispatchable._roots)): # pragma: no cover + rcls = next(root for root in MetaDispatchable._roots if issubclass(cls, root)) + + ##TODO: else raise an exception of some kind, but this should never happen + + header = rcls.__headercls__() + header.parse(packet) + + ncls = None + if (rcls, header.typeid) in MetaDispatchable._registry: + ncls = MetaDispatchable._registry[(rcls, header.typeid)] + + if ncls.__ver__ == 0: + if header.__class__ != ncls.__headercls__: + nh = ncls.__headercls__() + nh.__dict__.update(header.__dict__) + try: + nh.parse(packet) + + except Exception as ex: + six.raise_from(PGPError, ex) + + header = nh + + if (rcls, header.typeid, header.version) in MetaDispatchable._registry: + ncls = MetaDispatchable._registry[(rcls, header.typeid, header.version)] + + else: # pragma: no cover + ncls = None + + if ncls is None: + ncls = MetaDispatchable._registry[(rcls, None)] + + obj = _makeobj(ncls) + obj.header = header + + try: + obj.parse(packet) + + except Exception as ex: + six.raise_from(PGPError, ex) + + else: + obj = _makeobj(cls) + + return obj + + +class Dispatchable(six.with_metaclass(MetaDispatchable, PGPObject)): + __metaclass__ = MetaDispatchable + + @abc.abstractproperty + def __headercls__(self): # pragma: no cover + return False + + @abc.abstractproperty + def __typeid__(self): # pragma: no cover + return False + + __ver__ = None + + +class SignatureVerification(object): + _sigsubj = collections.namedtuple('sigsubj', ['verified', 'by', 'signature', 'subject']) + + @property + def good_signatures(self): + """ + A generator yielding namedtuples of all signatures that were successfully verified + in the operation that returned this instance. The namedtuple has the following attributes: + + ``sigsubj.verified`` - ``bool`` of whether the signature verified successfully or not. + + ``sigsubj.by`` - the :py:obj:`~pgpy.PGPKey` that was used in this verify operation. + + ``sigsubj.signature`` - the :py:obj:`~pgpy.PGPSignature` that was verified. + + ``sigsubj.subject`` - the subject that was verified using the signature. + """ + for s in [ i for i in self._subjects if i.verified ]: + yield s + + @property + def bad_signatures(self): # pragma: no cover + """ + A generator yielding namedtuples of all signatures that were not verified + in the operation that returned this instance. The namedtuple has the following attributes: + + ``sigsubj.verified`` - ``bool`` of whether the signature verified successfully or not. + + ``sigsubj.by`` - the :py:obj:`~pgpy.PGPKey` that was used in this verify operation. + + ``sigsubj.signature`` - the :py:obj:`~pgpy.PGPSignature` that was verified. + + ``sigsubj.subject`` - the subject that was verified using the signature. + """ + for s in [ i for i in self._subjects if not i.verified ]: + yield s + + def __init__(self): + """ + Returned by :py:meth:`PGPKey.verify` + + Can be compared directly as a boolean to determine whether or not the specified signature verified. + """ + super(SignatureVerification, self).__init__() + self._subjects = [] + + def __contains__(self, item): + return item in {ii for i in self._subjects for ii in [i.signature, i.subject]} + + def __len__(self): + return len(self._subjects) + + def __bool__(self): + return all(s.verified for s in self._subjects) + + def __nonzero__(self): + return self.__bool__() + + def __and__(self, other): + if not isinstance(other, SignatureVerification): + raise TypeError(type(other)) + + self._subjects += other._subjects + return self + + def __repr__(self): + return "<SignatureVerification({verified})>".format(verified=str(bool(self))) + + def add_sigsubj(self, signature, by, subject=None, verified=False): + self._subjects.append(self._sigsubj(verified, by, signature, subject)) + + +class FlagEnumMeta(EnumMeta): + def __and__(self, other): + return { f for f in iter(self) if f.value & other } + + def __rand__(self, other): # pragma: no cover + return self & other + + +if six.PY2: + class FlagEnum(IntEnum): + __metaclass__ = FlagEnumMeta + +else: + namespace = FlagEnumMeta.__prepare__('FlagEnum', (IntEnum,)) + FlagEnum = FlagEnumMeta('FlagEnum', (IntEnum,), namespace) + + +class Fingerprint(str): + """ + A subclass of ``str``. Can be compared using == and != to ``str``, ``unicode``, and other :py:obj:`Fingerprint` instances. + + Primarily used as a key for internal dictionaries, so it ignores spaces when comparing and hashing + """ + @property + def keyid(self): + return str(self).replace(' ', '')[-16:] + + @property + def shortid(self): + return str(self).replace(' ', '')[-8:] + + def __new__(cls, content): + if isinstance(content, Fingerprint): + return content + + # validate input before continuing: this should be a string of 40 hex digits + content = content.upper().replace(' ', '') + if not bool(re.match(r'^[A-F0-9]{40}$', content)): + raise ValueError("Expected: String of 40 hex digits") + + # store in the format: "AAAA BBBB CCCC DDDD EEEE FFFF 0000 1111 2222 3333" + # ^^ note 2 spaces here + spaces = [ ' ' if i != 4 else ' ' for i in range(10) ] + chunks = [ ''.join(g) for g in six.moves.zip_longest(*[iter(content)] * 4) ] + content = ''.join(j for i in six.moves.zip_longest(chunks, spaces, fillvalue='') for j in i).strip() + + return str.__new__(cls, content) + + def __eq__(self, other): + if isinstance(other, Fingerprint): + return str(self) == str(other) + + if isinstance(other, (six.text_type, bytes, bytearray)): + if isinstance(other, (bytes, bytearray)): # pragma: no cover + other = other.decode('latin-1') + + other = str(other).replace(' ', '') + return any([self.replace(' ', '') == other, + self.keyid == other, + self.shortid == other]) + + return False # pragma: no cover + + def __ne__(self, other): + return not (self == other) + + def __hash__(self): + return hash(str(self.replace(' ', ''))) + + def __bytes__(self): + return binascii.unhexlify(six.b(self.replace(' ', ''))) + + +class SorteDeque(collections.deque): + """A deque subclass that tries to maintain sorted ordering using bisect""" + def insort(self, item): + i = bisect.bisect_left(self, item) + self.rotate(- i) + self.appendleft(item) + self.rotate(i) + + def resort(self, item): # pragma: no cover + if item in self: + # if item is already in self, see if it is still in sorted order. + # if not, re-sort it by removing it and then inserting it into its sorted order + i = bisect.bisect_left(self, item) + if i == len(self) or self[i] is not item: + self.remove(item) + self.insort(item) + + else: + # if item is not in self, just insert it in sorted order + self.insort(item) + + def check(self): # pragma: no cover + """re-sort any items in self that are not sorted""" + for unsorted in iter(self[i] for i in range(len(self) - 2) if not operator.le(self[i], self[i + 1])): + self.resort(unsorted) |