summaryrefslogtreecommitdiff
path: root/src/leap/mx
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2017-06-26 15:06:41 +0200
committerKali Kaneko <kali@leap.se>2017-06-27 10:33:48 +0200
commit59504c7ddf7aab71614d691e705d386f58b5100d (patch)
treeea6afce37890a23cd088eac50587c3293035e78c /src/leap/mx
parente82b8143d3e9b2f62650e06798eee262885036c2 (diff)
[pkg] vendor pgpy 0.4.1
Diffstat (limited to 'src/leap/mx')
-rw-r--r--src/leap/mx/mail_receiver.py5
-rw-r--r--src/leap/mx/tests/test_mail_receiver.py2
-rw-r--r--src/leap/mx/vendor/__init__.py0
-rw-r--r--src/leap/mx/vendor/pgpy/README30
-rw-r--r--src/leap/mx/vendor/pgpy/__init__.py21
-rw-r--r--src/leap/mx/vendor/pgpy/_author.py18
-rw-r--r--src/leap/mx/vendor/pgpy/_curves.py54
-rw-r--r--src/leap/mx/vendor/pgpy/constants.py484
-rw-r--r--src/leap/mx/vendor/pgpy/decorators.py131
-rw-r--r--src/leap/mx/vendor/pgpy/errors.py39
-rw-r--r--src/leap/mx/vendor/pgpy/memoryview.py128
-rw-r--r--src/leap/mx/vendor/pgpy/packet/__init__.py11
-rw-r--r--src/leap/mx/vendor/pgpy/packet/fields.py1514
-rw-r--r--src/leap/mx/vendor/pgpy/packet/packets.py1605
-rw-r--r--src/leap/mx/vendor/pgpy/packet/subpackets/__init__.py7
-rw-r--r--src/leap/mx/vendor/pgpy/packet/subpackets/signature.py887
-rw-r--r--src/leap/mx/vendor/pgpy/packet/subpackets/types.py136
-rw-r--r--src/leap/mx/vendor/pgpy/packet/subpackets/userattribute.py109
-rw-r--r--src/leap/mx/vendor/pgpy/packet/types.py288
-rw-r--r--src/leap/mx/vendor/pgpy/pgp.py2527
-rw-r--r--src/leap/mx/vendor/pgpy/symenc.py58
-rw-r--r--src/leap/mx/vendor/pgpy/types.py729
22 files changed, 8780 insertions, 3 deletions
diff --git a/src/leap/mx/mail_receiver.py b/src/leap/mx/mail_receiver.py
index e376554..276ae13 100644
--- a/src/leap/mx/mail_receiver.py
+++ b/src/leap/mx/mail_receiver.py
@@ -42,8 +42,6 @@ import email.utils
from datetime import datetime, timedelta
from email import message_from_string
-from pgpy import PGPKey, PGPMessage
-from pgpy.errors import PGPEncryptionError
from twisted.application.service import Service, IService
from twisted.internet import inotify, defer, task, reactor
@@ -59,6 +57,9 @@ from leap.soledad.common.document import ServerDocument
from leap.mx.bounce import bounce_message
from leap.mx.bounce import InvalidReturnPathError
+from leap.mx.vendor.pgpy import PGPKey, PGPMessage
+from leap.mx.vendor.pgpy.errors import PGPEncryptionError
+
class MailReceiver(Service):
"""
diff --git a/src/leap/mx/tests/test_mail_receiver.py b/src/leap/mx/tests/test_mail_receiver.py
index abe5a71..0d03806 100644
--- a/src/leap/mx/tests/test_mail_receiver.py
+++ b/src/leap/mx/tests/test_mail_receiver.py
@@ -26,11 +26,11 @@ import shutil
import tempfile
from email.message import Message
-from pgpy import PGPKey, PGPMessage
from twisted.internet import defer, reactor
from twisted.trial import unittest
from leap.mx.mail_receiver import MailReceiver
+from leap.mx.vendor.pgpy import PGPKey, PGPMessage
BOUNCE_ADDRESS = "bounce@leap.se"
diff --git a/src/leap/mx/vendor/__init__.py b/src/leap/mx/vendor/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/leap/mx/vendor/__init__.py
diff --git a/src/leap/mx/vendor/pgpy/README b/src/leap/mx/vendor/pgpy/README
new file mode 100644
index 0000000..275a496
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/README
@@ -0,0 +1,30 @@
+This submodule is a copy of PGPy 0.4.1
+We're vendoring it here because a package for python2 is not readily available.
+If you're thinking about packaging it, you probably could port leap.mx to py3 instead.
+
+pgpy is Copyright (c) 2014 Michael Greene - All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+* Neither the name of the {organization} nor the names of its
+ contributors may be used to endorse or promote products derived from
+ this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/src/leap/mx/vendor/pgpy/__init__.py b/src/leap/mx/vendor/pgpy/__init__.py
new file mode 100644
index 0000000..b80740d
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/__init__.py
@@ -0,0 +1,21 @@
+""" PGPy :: Pretty Good Privacy for Python
+"""
+from ._author import *
+
+from .pgp import PGPKey
+from .pgp import PGPKeyring
+from .pgp import PGPMessage
+from .pgp import PGPSignature
+from .pgp import PGPUID
+
+__all__ = ['__author__',
+ '__copyright__',
+ '__license__',
+ '__version__',
+ 'constants',
+ 'errors',
+ 'PGPKey',
+ 'PGPKeyring',
+ 'PGPMessage',
+ 'PGPSignature',
+ 'PGPUID', ]
diff --git a/src/leap/mx/vendor/pgpy/_author.py b/src/leap/mx/vendor/pgpy/_author.py
new file mode 100644
index 0000000..ae17b45
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/_author.py
@@ -0,0 +1,18 @@
+"""_author.py
+
+Canonical location for authorship information
+__version__ is a PEP-386 compliant version string,
+making use of distutils.version.LooseVersion
+"""
+
+from distutils.version import LooseVersion
+
+__all__ = ['__author__',
+ '__copyright__',
+ '__license__',
+ '__version__']
+
+__author__ = "Michael Greene"
+__copyright__ = "Copyright (c) 2014 Michael Greene"
+__license__ = "BSD"
+__version__ = str(LooseVersion("0.4.1"))
diff --git a/src/leap/mx/vendor/pgpy/_curves.py b/src/leap/mx/vendor/pgpy/_curves.py
new file mode 100644
index 0000000..9503075
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/_curves.py
@@ -0,0 +1,54 @@
+""" _curves.py
+specify some additional curves that OpenSSL provides but cryptography doesn't explicitly expose
+"""
+
+from cryptography import utils
+
+from cryptography.hazmat.primitives.asymmetric import ec
+
+from cryptography.hazmat.bindings.openssl.binding import Binding
+
+__all__ = tuple()
+
+# TODO: investigate defining additional curves using EC_GROUP_new_curve
+# https://wiki.openssl.org/index.php/Elliptic_Curve_Cryptography#Defining_Curves
+
+
+def _openssl_get_supported_curves():
+ if hasattr(_openssl_get_supported_curves, '_curves'):
+ return _openssl_get_supported_curves._curves
+
+ # use cryptography's cffi bindings to get an array of curve names
+ b = Binding()
+ cn = b.lib.EC_get_builtin_curves(b.ffi.NULL, 0)
+ cs = b.ffi.new('EC_builtin_curve[]', cn)
+ b.lib.EC_get_builtin_curves(cs, cn)
+
+ # store the result so we don't have to do all of this every time
+ curves = { b.ffi.string(b.lib.OBJ_nid2sn(c.nid)).decode('utf-8') for c in cs }
+ _openssl_get_supported_curves._curves = curves
+ return curves
+
+
+@utils.register_interface(ec.EllipticCurve)
+class BrainpoolP256R1(object):
+ name = 'brainpoolP256r1'
+ key_size = 256
+
+
+@utils.register_interface(ec.EllipticCurve)
+class BrainpoolP384R1(object):
+ name = 'brainpoolP384r1'
+ key_size = 384
+
+
+@utils.register_interface(ec.EllipticCurve)
+class BrainpoolP512R1(object):
+ name = 'brainpoolP512r1'
+ key_size = 512
+
+
+# add these curves to the _CURVE_TYPES list
+for curve in [BrainpoolP256R1, BrainpoolP384R1, BrainpoolP512R1]:
+ if curve.name not in ec._CURVE_TYPES and curve.name in _openssl_get_supported_curves():
+ ec._CURVE_TYPES[curve.name] = curve
diff --git a/src/leap/mx/vendor/pgpy/constants.py b/src/leap/mx/vendor/pgpy/constants.py
new file mode 100644
index 0000000..271b0d0
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/constants.py
@@ -0,0 +1,484 @@
+""" constants.py
+"""
+import bz2
+import hashlib
+import imghdr
+import os
+import time
+import zlib
+
+from collections import namedtuple
+from enum import Enum
+from enum import IntEnum
+from pyasn1.type.univ import ObjectIdentifier
+
+import six
+
+from cryptography.hazmat.backends import openssl
+from cryptography.hazmat.primitives.asymmetric import ec
+from cryptography.hazmat.primitives.ciphers import algorithms
+
+from .decorators import classproperty
+from .types import FlagEnum
+from ._curves import BrainpoolP256R1, BrainpoolP384R1, BrainpoolP512R1
+
+__all__ = ['Backend',
+ 'EllipticCurveOID',
+ 'PacketTag',
+ 'SymmetricKeyAlgorithm',
+ 'PubKeyAlgorithm',
+ 'CompressionAlgorithm',
+ 'HashAlgorithm',
+ 'RevocationReason',
+ 'ImageEncoding',
+ 'SignatureType',
+ 'KeyServerPreferences',
+ 'String2KeyType',
+ 'TrustLevel',
+ 'KeyFlags',
+ 'Features',
+ 'RevocationKeyClass',
+ 'NotationDataFlags',
+ 'TrustFlags']
+
+
+# this is 50 KiB
+_hashtunedata = bytearray([10, 11, 12, 13, 14, 15, 16, 17] * 128 * 50)
+
+
+class Backend(Enum):
+ OpenSSL = openssl.backend
+
+
+class EllipticCurveOID(Enum):
+ # these are specified as:
+ # id = (oid, curve)
+ Invalid = ('', )
+ #: DJB's fast elliptic curve
+ #:
+ #: .. warning::
+ #: This curve is not currently usable by PGPy
+ Curve25519 = ('1.3.6.1.4.1.3029.1.5.1', )
+ #: Twisted Edwards variant of Curve25519
+ #:
+ #: .. warning::
+ #: This curve is not currently usable by PGPy
+ Ed25519 = ('1.3.6.1.4.1.11591.15.1', )
+ #: NIST P-256, also known as SECG curve secp256r1
+ NIST_P256 = ('1.2.840.10045.3.1.7', ec.SECP256R1)
+ #: NIST P-384, also known as SECG curve secp384r1
+ NIST_P384 = ('1.3.132.0.34', ec.SECP384R1)
+ #: NIST P-521, also known as SECG curve secp521r1
+ NIST_P521 = ('1.3.132.0.35', ec.SECP521R1)
+ #: Brainpool Standard Curve, 256-bit
+ #:
+ #: .. note::
+ #: Requires OpenSSL >= 1.0.2
+ Brainpool_P256 = ('1.3.36.3.3.2.8.1.1.7', BrainpoolP256R1)
+ #: Brainpool Standard Curve, 384-bit
+ #:
+ #: .. note::
+ #: Requires OpenSSL >= 1.0.2
+ Brainpool_P384 = ('1.3.36.3.3.2.8.1.1.11', BrainpoolP384R1)
+ #: Brainpool Standard Curve, 512-bit
+ #:
+ #: .. note::
+ #: Requires OpenSSL >= 1.0.2
+ Brainpool_P512 = ('1.3.36.3.3.2.8.1.1.13', BrainpoolP512R1)
+ #: SECG curve secp256k1
+ SECP256K1 = ('1.3.132.0.10', ec.SECP256K1)
+
+ def __new__(cls, oid, curve=None):
+ # preprocessing stage for enum members:
+ # - set enum_member.value to ObjectIdentifier(oid)
+ # - if curve is not None and curve.name is in ec._CURVE_TYPES, set enum_member.curve to curve
+ # - otherwise, set enum_member.curve to None
+ obj = object.__new__(cls)
+ obj._value_ = ObjectIdentifier(oid)
+ obj.curve = None
+
+ if curve is not None and curve.name in ec._CURVE_TYPES:
+ obj.curve = curve
+
+ return obj
+
+ @property
+ def can_gen(self):
+ return self.curve is not None
+
+ @property
+ def key_size(self):
+ if self.curve is not None:
+ return self.curve.key_size
+
+ @property
+ def kdf_halg(self):
+ # return the hash algorithm to specify in the KDF fields when generating a key
+ algs = {256: HashAlgorithm.SHA256,
+ 384: HashAlgorithm.SHA384,
+ 512: HashAlgorithm.SHA512,
+ 521: HashAlgorithm.SHA512}
+
+ return algs.get(self.key_size, None)
+
+ @property
+ def kek_alg(self):
+ # return the AES algorithm to specify in the KDF fields when generating a key
+ algs = {256: SymmetricKeyAlgorithm.AES128,
+ 384: SymmetricKeyAlgorithm.AES192,
+ 512: SymmetricKeyAlgorithm.AES256,
+ 521: SymmetricKeyAlgorithm.AES256}
+
+ return algs.get(self.key_size, None)
+
+
+class PacketTag(IntEnum):
+ Invalid = 0
+ PublicKeyEncryptedSessionKey = 1
+ Signature = 2
+ SymmetricKeyEncryptedSessionKey = 3
+ OnePassSignature = 4
+ SecretKey = 5
+ PublicKey = 6
+ SecretSubKey = 7
+ CompressedData = 8
+ SymmetricallyEncryptedData = 9
+ Marker = 10
+ LiteralData = 11
+ Trust = 12
+ UserID = 13
+ PublicSubKey = 14
+ UserAttribute = 17
+ SymmetricallyEncryptedIntegrityProtectedData = 18
+ ModificationDetectionCode = 19
+
+
+class SymmetricKeyAlgorithm(IntEnum):
+ """Supported symmetric key algorithms."""
+ Plaintext = 0x00
+ #: .. warning::
+ #: IDEA is insecure. PGPy only allows it to be used for decryption, not encryption!
+ IDEA = 0x01
+ #: Triple-DES with 168-bit key derived from 192
+ TripleDES = 0x02
+ #: CAST5 (or CAST-128) with 128-bit key
+ CAST5 = 0x03
+ #: Blowfish with 128-bit key and 16 rounds
+ Blowfish = 0x04
+ #: AES with 128-bit key
+ AES128 = 0x07
+ #: AES with 192-bit key
+ AES192 = 0x08
+ #: AES with 256-bit key
+ AES256 = 0x09
+ # Twofish with 256-bit key - not currently supported
+ Twofish256 = 0x0A
+ #: Camellia with 128-bit key
+ Camellia128 = 0x0B
+ #: Camellia with 192-bit key
+ Camellia192 = 0x0C
+ #: Camellia with 256-bit key
+ Camellia256 = 0x0D
+
+ @property
+ def cipher(self):
+ bs = {SymmetricKeyAlgorithm.IDEA: algorithms.IDEA,
+ SymmetricKeyAlgorithm.TripleDES: algorithms.TripleDES,
+ SymmetricKeyAlgorithm.CAST5: algorithms.CAST5,
+ SymmetricKeyAlgorithm.Blowfish: algorithms.Blowfish,
+ SymmetricKeyAlgorithm.AES128: algorithms.AES,
+ SymmetricKeyAlgorithm.AES192: algorithms.AES,
+ SymmetricKeyAlgorithm.AES256: algorithms.AES,
+ SymmetricKeyAlgorithm.Twofish256: namedtuple('Twofish256', ['block_size'])(block_size=128),
+ SymmetricKeyAlgorithm.Camellia128: algorithms.Camellia,
+ SymmetricKeyAlgorithm.Camellia192: algorithms.Camellia,
+ SymmetricKeyAlgorithm.Camellia256: algorithms.Camellia}
+
+ if self in bs:
+ return bs[self]
+
+ raise NotImplementedError(repr(self))
+
+ @property
+ def is_insecure(self):
+ insecure_ciphers = {SymmetricKeyAlgorithm.IDEA}
+ return self in insecure_ciphers
+
+ @property
+ def block_size(self):
+ return self.cipher.block_size
+
+ @property
+ def key_size(self):
+ ks = {SymmetricKeyAlgorithm.IDEA: 128,
+ SymmetricKeyAlgorithm.TripleDES: 192,
+ SymmetricKeyAlgorithm.CAST5: 128,
+ SymmetricKeyAlgorithm.Blowfish: 128,
+ SymmetricKeyAlgorithm.AES128: 128,
+ SymmetricKeyAlgorithm.AES192: 192,
+ SymmetricKeyAlgorithm.AES256: 256,
+ SymmetricKeyAlgorithm.Twofish256: 256,
+ SymmetricKeyAlgorithm.Camellia128: 128,
+ SymmetricKeyAlgorithm.Camellia192: 192,
+ SymmetricKeyAlgorithm.Camellia256: 256}
+
+ if self in ks:
+ return ks[self]
+
+ raise NotImplementedError(repr(self))
+
+ def gen_iv(self):
+ return os.urandom(self.block_size // 8)
+
+ def gen_key(self):
+ return os.urandom(self.key_size // 8)
+
+
+class PubKeyAlgorithm(IntEnum):
+ Invalid = 0x00
+ #: Signifies that a key is an RSA key.
+ RSAEncryptOrSign = 0x01
+ RSAEncrypt = 0x02 # deprecated
+ RSASign = 0x03 # deprecated
+ #: Signifies that a key is an ElGamal key.
+ ElGamal = 0x10
+ #: Signifies that a key is a DSA key.
+ DSA = 0x11
+ #: Signifies that a key is an ECDH key.
+ ECDH = 0x12
+ #: Signifies that a key is an ECDSA key.
+ ECDSA = 0x13
+ FormerlyElGamalEncryptOrSign = 0x14 # deprecated - do not generate
+ # DiffieHellman = 0x15 # X9.42
+
+ @property
+ def can_gen(self):
+ return self in {PubKeyAlgorithm.RSAEncryptOrSign,
+ PubKeyAlgorithm.DSA,
+ PubKeyAlgorithm.ECDSA,
+ PubKeyAlgorithm.ECDH}
+
+ @property
+ def can_encrypt(self): # pragma: no cover
+ return self in {PubKeyAlgorithm.RSAEncryptOrSign, PubKeyAlgorithm.ElGamal, PubKeyAlgorithm.ECDH}
+
+ @property
+ def can_sign(self):
+ return self in {PubKeyAlgorithm.RSAEncryptOrSign, PubKeyAlgorithm.DSA, PubKeyAlgorithm.ECDSA}
+
+ @property
+ def deprecated(self):
+ return self in {PubKeyAlgorithm.RSAEncrypt,
+ PubKeyAlgorithm.RSASign,
+ PubKeyAlgorithm.FormerlyElGamalEncryptOrSign}
+
+
+class CompressionAlgorithm(IntEnum):
+ #: No compression
+ Uncompressed = 0x00
+ #: ZIP DEFLATE
+ ZIP = 0x01
+ #: ZIP DEFLATE with zlib headers
+ ZLIB = 0x02
+ #: Bzip2
+ BZ2 = 0x03
+
+ def compress(self, data):
+ if self is CompressionAlgorithm.Uncompressed:
+ return data
+
+ if self is CompressionAlgorithm.ZIP:
+ return zlib.compress(data)[2:-4]
+
+ if self is CompressionAlgorithm.ZLIB:
+ return zlib.compress(data)
+
+ if self is CompressionAlgorithm.BZ2:
+ return bz2.compress(data)
+
+ raise NotImplementedError(self)
+
+ def decompress(self, data):
+ if six.PY2:
+ data = bytes(data)
+
+ if self is CompressionAlgorithm.Uncompressed:
+ return data
+
+ if self is CompressionAlgorithm.ZIP:
+ return zlib.decompress(data, -15)
+
+ if self is CompressionAlgorithm.ZLIB:
+ return zlib.decompress(data)
+
+ if self is CompressionAlgorithm.BZ2:
+ return bz2.decompress(data)
+
+ raise NotImplementedError(self)
+
+
+class HashAlgorithm(IntEnum):
+ Invalid = 0x00
+ MD5 = 0x01
+ SHA1 = 0x02
+ RIPEMD160 = 0x03
+ _reserved_1 = 0x04
+ _reserved_2 = 0x05
+ _reserved_3 = 0x06
+ _reserved_4 = 0x07
+ SHA256 = 0x08
+ SHA384 = 0x09
+ SHA512 = 0x0A
+ SHA224 = 0x0B
+
+ def __init__(self, *args):
+ super(self.__class__, self).__init__()
+ self._tuned_count = 0
+
+ @property
+ def hasher(self):
+ return hashlib.new(self.name)
+
+ @property
+ def digest_size(self):
+ return self.hasher.digest_size
+
+ @property
+ def tuned_count(self):
+ if self._tuned_count == 0:
+ self.tune_count()
+
+ return self._tuned_count
+
+ def tune_count(self):
+ start = end = 0
+ htd = _hashtunedata[:]
+
+ while start == end:
+ # potentially do this multiple times in case the resolution of time.time is low enough that
+ # hashing 100 KiB isn't enough time to produce a measurable difference
+ # (e.g. if the timer for time.time doesn't have enough precision)
+ htd = htd + htd
+ h = self.hasher
+
+ start = time.time()
+ h.update(htd)
+ end = time.time()
+
+ # now calculate how many bytes need to be hashed to reach our expected time period
+ # GnuPG tunes for about 100ms, so we'll do that as well
+ _TIME = 0.100
+ ct = int(len(htd) * (_TIME / (end - start)))
+ c1 = ((ct >> (ct.bit_length() - 5)) - 16)
+ c2 = (ct.bit_length() - 11)
+ c = ((c2 << 4) + c1)
+
+ # constrain self._tuned_count to be between 0 and 255
+ self._tuned_count = max(min(c, 255), 0)
+
+
+class RevocationReason(IntEnum):
+ #: No reason was specified. This is the default reason.
+ NotSpecified = 0x00
+ #: The key was superseded by a new key. Only meaningful when revoking a key.
+ Superseded = 0x01
+ #: Key material has been compromised. Only meaningful when revoking a key.
+ Compromised = 0x02
+ #: Key is retired and no longer used. Only meaningful when revoking a key.
+ Retired = 0x03
+ #: User ID information is no longer valid. Only meaningful when revoking a certification of a user id.
+ UserID = 0x20
+
+
+class ImageEncoding(IntEnum):
+ Unknown = 0x00
+ JPEG = 0x01
+
+ @classmethod
+ def encodingof(cls, imagebytes):
+ type = imghdr.what(None, h=imagebytes)
+ if type == 'jpeg':
+ return ImageEncoding.JPEG
+ return ImageEncoding.Unknown # pragma: no cover
+
+
+class SignatureType(IntEnum):
+ BinaryDocument = 0x00
+ CanonicalDocument = 0x01
+ Standalone = 0x02
+ Generic_Cert = 0x10
+ Persona_Cert = 0x11
+ Casual_Cert = 0x12
+ Positive_Cert = 0x13
+ Subkey_Binding = 0x18
+ PrimaryKey_Binding = 0x19
+ DirectlyOnKey = 0x1F
+ KeyRevocation = 0x20
+ SubkeyRevocation = 0x28
+ CertRevocation = 0x30
+ Timestamp = 0x40
+ ThirdParty_Confirmation = 0x50
+
+
+class KeyServerPreferences(IntEnum):
+ Unknown = 0x00
+ NoModify = 0x80
+
+
+class String2KeyType(IntEnum):
+ Simple = 0
+ Salted = 1
+ Reserved = 2
+ Iterated = 3
+
+
+class TrustLevel(IntEnum):
+ Unknown = 0
+ Expired = 1
+ Undefined = 2
+ Never = 3
+ Marginal = 4
+ Fully = 5
+ Ultimate = 6
+
+
+class KeyFlags(FlagEnum):
+ #: Signifies that a key may be used to certify keys and user ids. Primary keys always have this, even if it is not specified.
+ Certify = 0x01
+ #: Signifies that a key may be used to sign messages and documents.
+ Sign = 0x02
+ #: Signifies that a key may be used to encrypt messages.
+ EncryptCommunications = 0x04
+ #: Signifies that a key may be used to encrypt storage. Currently equivalent to :py:obj:`~pgpy.constants.EncryptCommunications`.
+ EncryptStorage = 0x08
+ #: Signifies that the private component of a given key may have been split by a secret-sharing mechanism. Split
+ #: keys are not currently supported by PGPy.
+ Split = 0x10
+ #: Signifies that a key may be used for authentication.
+ Authentication = 0x20
+ #: Signifies that the private component of a key may be in the possession of more than one person.
+ MultiPerson = 0x80
+
+
+class Features(FlagEnum):
+ ModificationDetection = 0x01
+
+ @classproperty
+ def pgpy_features(cls):
+ return Features.ModificationDetection
+
+
+class RevocationKeyClass(FlagEnum):
+ Sensitive = 0x40
+ Normal = 0x80
+
+
+class NotationDataFlags(FlagEnum):
+ HumanReadable = 0x80
+
+
+class TrustFlags(FlagEnum):
+ Revoked = 0x20
+ SubRevoked = 0x40
+ Disabled = 0x80
+ PendingCheck = 0x100
diff --git a/src/leap/mx/vendor/pgpy/decorators.py b/src/leap/mx/vendor/pgpy/decorators.py
new file mode 100644
index 0000000..d2b9926
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/decorators.py
@@ -0,0 +1,131 @@
+""" decorators.py
+"""
+import contextlib
+import functools
+import six
+import warnings
+
+try:
+ from singledispatch import singledispatch
+
+except ImportError: # pragma: no cover
+ from functools import singledispatch
+
+
+from .errors import PGPError
+
+__all__ = ['classproperty',
+ 'sdmethod',
+ 'sdproperty',
+ 'KeyAction']
+
+
+def classproperty(fget):
+ class ClassProperty(object):
+ def __init__(self, fget):
+ self.fget = fget
+ self.__doc__ = fget.__doc__
+
+ def __get__(self, cls, owner):
+ return self.fget(owner)
+
+ def __set__(self, obj, value): # pragma: no cover
+ raise AttributeError("Read-only attribute")
+
+ def __delete__(self, obj): # pragma: no cover
+ raise AttributeError("Read-only attribute")
+
+ return ClassProperty(fget)
+
+
+def sdmethod(meth):
+ """
+ This is a hack to monkey patch sdproperty to work as expected with instance methods.
+ """
+ sd = singledispatch(meth)
+
+ def wrapper(obj, *args, **kwargs):
+ return sd.dispatch(args[0].__class__)(obj, *args, **kwargs)
+
+ wrapper.register = sd.register
+ wrapper.dispatch = sd.dispatch
+ wrapper.registry = sd.registry
+ wrapper._clear_cache = sd._clear_cache
+ functools.update_wrapper(wrapper, meth)
+ return wrapper
+
+
+def sdproperty(fget):
+ def defset(obj, val): # pragma: no cover
+ raise TypeError(str(val.__class__))
+
+ class SDProperty(property):
+ def register(self, cls=None, fset=None):
+ return self.fset.register(cls, fset)
+
+ def setter(self, fset):
+ self.register(object, fset)
+ return type(self)(self.fget, self.fset, self.fdel, self.__doc__)
+
+ return SDProperty(fget, sdmethod(defset))
+
+
+class KeyAction(object):
+ def __init__(self, *usage, **conditions):
+ super(KeyAction, self).__init__()
+ self.flags = set(usage)
+ self.conditions = conditions
+
+ @contextlib.contextmanager
+ def usage(self, key, user):
+ def _preiter(first, iterable):
+ yield first
+ for item in iterable:
+ yield item
+
+ em = {}
+ em['keyid'] = key.fingerprint.keyid
+ em['flags'] = ', '.join(flag.name for flag in self.flags)
+
+ if len(self.flags):
+ for _key in _preiter(key, key.subkeys.values()):
+ if self.flags & set(_key._get_key_flags(user)):
+ break
+
+ else: # pragma: no cover
+ raise PGPError("Key {keyid:s} does not have the required usage flag {flags:s}".format(**em))
+
+ else:
+ _key = key
+
+ if _key is not key:
+ em['subkeyid'] = _key.fingerprint.keyid
+ warnings.warn("Key {keyid:s} does not have the required usage flag {flags:s}; using subkey {subkeyid:s}"
+ "".format(**em), stacklevel=4)
+
+ yield _key
+
+ def check_attributes(self, key):
+ for attr, expected in self.conditions.items():
+ if getattr(key, attr) != expected:
+ raise PGPError("Expected: {attr:s} == {eval:s}. Got: {got:s}"
+ "".format(attr=attr, eval=str(expected), got=str(getattr(key, attr))))
+
+ def __call__(self, action):
+ # @functools.wraps(action)
+ @six.wraps(action)
+ def _action(key, *args, **kwargs):
+ if key._key is None:
+ raise PGPError("No key!")
+
+ # if a key is in the process of being created, it needs to be allowed to certify its own user id
+ if len(key._uids) == 0 and key.is_primary and action is not key.certify.__wrapped__:
+ raise PGPError("Key is not complete - please add a User ID!")
+
+ with self.usage(key, kwargs.get('user', None)) as _key:
+ self.check_attributes(key)
+
+ # do the thing
+ return action(_key, *args, **kwargs)
+
+ return _action
diff --git a/src/leap/mx/vendor/pgpy/errors.py b/src/leap/mx/vendor/pgpy/errors.py
new file mode 100644
index 0000000..6af0612
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/errors.py
@@ -0,0 +1,39 @@
+""" errors.py
+"""
+
+__all__ = ('PGPError',
+ 'PGPEncryptionError',
+ 'PGPDecryptionError',
+ 'PGPOpenSSLCipherNotSupported',
+ 'PGPInsecureCipher',
+ 'WontImplementError',)
+
+
+class PGPError(Exception):
+ """Raised as a general error in PGPy"""
+ pass
+
+
+class PGPEncryptionError(Exception):
+ """Raised when encryption fails"""
+ pass
+
+
+class PGPDecryptionError(Exception):
+ """Raised when decryption fails"""
+ pass
+
+
+class PGPOpenSSLCipherNotSupported(Exception):
+ """Raised when OpenSSL does not support the requested cipher"""
+ pass
+
+
+class PGPInsecureCipher(Exception):
+ """Raised when a cipher known to be insecure is attempted to be used to encrypt data"""
+ pass
+
+
+class WontImplementError(NotImplementedError):
+ """Raised when something that is not implemented, will not be implemented"""
+ pass
diff --git a/src/leap/mx/vendor/pgpy/memoryview.py b/src/leap/mx/vendor/pgpy/memoryview.py
new file mode 100644
index 0000000..35f3801
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/memoryview.py
@@ -0,0 +1,128 @@
+""" util.py
+"""
+import six
+
+__all__ = ('memoryview', )
+
+memoryview = memoryview
+
+if six.PY2:
+ # because Python2's memoryview can't be released directly, nor can it be used as a context manager
+ # this wrapper object should hopefully make the behavior more uniform to python 3's
+ import __builtin__
+ import functools
+
+ # this decorator will raise a ValueError if the wrapped memoryview object has been "released"
+ def notreleased(meth):
+ @functools.wraps(meth)
+ def _inner(self, *args, **kwargs):
+ if self._mem is None:
+ raise ValueError("operation forbidden on released memoryview object")
+ return meth(self, *args, **kwargs)
+
+ return _inner
+
+ class memoryview(object): # flake8: noqa
+ @property
+ @notreleased
+ def obj(self):
+ """The underlying object of the memoryview."""
+ return self._obj
+
+ @property
+ @notreleased
+ def nbytes(self):
+ # nbytes == product(shape) * itemsize == len(m.tobytes())
+ nb = 1
+ for dim in self.shape:
+ nb *= dim
+ return nb * self.itemsize
+
+ # TODO: c_contiguous -> (self.ndim == 0 or ???)
+ # TODO: f_contiguous -> (self.ndim == 0 or ???)
+ # TODO: contiguous -> return self.c_contiguous or self.f_contiguous
+
+ def __new__(cls, obj, parent=None):
+ memview = object.__new__(cls)
+ memview._obj = obj if parent is None else parent.obj
+ return memview
+
+ def __init__(self, obj):
+ if not hasattr(self, '_mem'):
+ if not isinstance(obj, __builtin__.memoryview):
+ obj = __builtin__.memoryview(obj)
+ self._mem = obj
+
+ def __dir__(self):
+ # so dir(...) looks like a memoryview object, and also
+ # contains our additional methods and properties, but not our instance members
+ return sorted(set(self.__class__.__dict__) | set(dir(self._mem)))
+
+ @notreleased
+ def __getitem__(self, item):
+ # if this is a slice, it'll return another real memoryview object
+ # we'll need to wrap that subview in another memoryview wrapper
+ if isinstance(item, slice):
+ return memoryview(self._mem.__getitem__(item))
+
+ return self._mem.__getitem__(item)
+
+ @notreleased
+ def __setitem__(self, key, value):
+ self._mem.__setitem__(key, value)
+
+ @notreleased
+ def __delitem__(self, key):
+ raise TypeError("cannot delete memory")
+
+ def __getattribute__(self, item):
+ try:
+ return object.__getattribute__(self, item)
+
+ except AttributeError:
+ if object.__getattribute__(self, '_mem') is None:
+ raise ValueError("operation forbidden on released memoryview object")
+
+ return object.__getattribute__(self, '_mem').__getattribute__(item)
+
+ def __setattr__(self, key, value):
+ if key not in self.__dict__ and hasattr(__builtin__.memoryview, key):
+ # there are no writable attributes on memoryview objects
+ # changing indexed values is handled by __setitem__
+ raise AttributeError("attribute '{}' of 'memoryview' objects is not writable".format(key))
+
+ else:
+ object.__setattr__(self, key, value)
+
+ @notreleased
+ def __len__(self):
+ return len(self._mem)
+
+ def __eq__(self, other):
+ if isinstance(other, memoryview):
+ return self._mem == other._mem
+
+ return self._mem == other
+
+ @notreleased
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.release()
+
+ def __repr__(self):
+ return '<{}memory at 0x{:02X}>'.format('' if self._mem else 'released ', id(self))
+
+ def release(self):
+ """Release the underlying buffer exposed by the memoryview object"""
+ # this should effectively do the same job as memoryview.release() in Python 3
+ self._mem = None
+ self._obj = None
+
+ @notreleased
+ def hex(self):
+ """Return the data in the buffer as a string of hexadecimal numbers."""
+ return ''.join(('{:02X}'.format(ord(c)) for c in self._mem))
+
+ # TODO: cast
diff --git a/src/leap/mx/vendor/pgpy/packet/__init__.py b/src/leap/mx/vendor/pgpy/packet/__init__.py
new file mode 100644
index 0000000..36120dd
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/packet/__init__.py
@@ -0,0 +1,11 @@
+from .types import Key
+from .types import Opaque
+from .types import Packet
+from .types import Primary
+from .types import Private
+from .types import Public
+from .types import Sub
+
+from .packets import * # NOQA
+
+__all__ = ['Key', 'Opaque', 'Packet', 'Primary', 'Private', 'Public', 'Sub']
diff --git a/src/leap/mx/vendor/pgpy/packet/fields.py b/src/leap/mx/vendor/pgpy/packet/fields.py
new file mode 100644
index 0000000..b5b5f2e
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/packet/fields.py
@@ -0,0 +1,1514 @@
+""" fields.py
+"""
+from __future__ import absolute_import, division
+
+import abc
+import binascii
+import collections
+import copy
+import hashlib
+import itertools
+import math
+import os
+
+from pyasn1.codec.der import decoder
+from pyasn1.codec.der import encoder
+from pyasn1.type.univ import Integer
+from pyasn1.type.univ import Sequence
+
+from cryptography.exceptions import InvalidSignature
+
+from cryptography.hazmat.backends import default_backend
+
+from cryptography.hazmat.primitives import hashes
+
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.asymmetric import dsa
+from cryptography.hazmat.primitives.asymmetric import ec
+from cryptography.hazmat.primitives.asymmetric import padding
+
+from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash
+
+from cryptography.hazmat.primitives.keywrap import aes_key_wrap
+from cryptography.hazmat.primitives.keywrap import aes_key_unwrap
+
+from cryptography.hazmat.primitives.padding import PKCS7
+
+from .subpackets import Signature as SignatureSP
+from .subpackets import UserAttribute
+from .subpackets import signature
+from .subpackets import userattribute
+
+from .types import MPI
+from .types import MPIs
+
+from ..constants import EllipticCurveOID
+from ..constants import HashAlgorithm
+from ..constants import PubKeyAlgorithm
+from ..constants import String2KeyType
+from ..constants import SymmetricKeyAlgorithm
+
+from ..decorators import sdproperty
+
+from ..errors import PGPDecryptionError
+from ..errors import PGPError
+
+from ..symenc import _decrypt
+from ..symenc import _encrypt
+
+from ..types import Field
+
+__all__ = ['SubPackets',
+ 'UserAttributeSubPackets',
+ 'Signature',
+ 'RSASignature',
+ 'DSASignature',
+ 'ECDSASignature',
+ 'PubKey',
+ 'OpaquePubKey',
+ 'RSAPub',
+ 'DSAPub',
+ 'ElGPub',
+ 'ECDSAPub',
+ 'ECDHPub',
+ 'String2Key',
+ 'ECKDF',
+ 'PrivKey',
+ 'OpaquePrivKey',
+ 'RSAPriv',
+ 'DSAPriv',
+ 'ElGPriv',
+ 'ECDSAPriv',
+ 'ECDHPriv',
+ 'CipherText',
+ 'RSACipherText',
+ 'ElGCipherText',
+ 'ECDHCipherText', ]
+
+
+class SubPackets(collections.MutableMapping, Field):
+ _spmodule = signature
+
+ def __init__(self):
+ super(SubPackets, self).__init__()
+ self._hashed_sp = collections.OrderedDict()
+ self._unhashed_sp = collections.OrderedDict()
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += self.__hashbytearray__()
+ _bytes += self.__unhashbytearray__()
+ return _bytes
+
+ def __hashbytearray__(self):
+ _bytes = bytearray()
+ _bytes += self.int_to_bytes(sum(len(sp) for sp in self._hashed_sp.values()), 2)
+ for hsp in self._hashed_sp.values():
+ _bytes += hsp.__bytearray__()
+ return _bytes
+
+ def __unhashbytearray__(self):
+ _bytes = bytearray()
+ _bytes += self.int_to_bytes(sum(len(sp) for sp in self._unhashed_sp.values()), 2)
+ for uhsp in self._unhashed_sp.values():
+ _bytes += uhsp.__bytearray__()
+ return _bytes
+
+ def __len__(self): # pragma: no cover
+ return sum(sp.header.length for sp in itertools.chain(self._hashed_sp.values(), self._unhashed_sp.values())) + 4
+
+ def __iter__(self):
+ for sp in itertools.chain(self._hashed_sp.values(), self._unhashed_sp.values()):
+ yield sp
+
+ def __setitem__(self, key, val):
+ # the key provided should always be the classname for the subpacket
+ # but, there can be multiple subpackets of the same type
+ # so, it should be stored in the format: [h_]<key>_<seqid>
+ # where:
+ # - <key> is the classname of val
+ # - <seqid> is a sequence id, starting at 0, for a given classname
+
+ i = 0
+ if isinstance(key, tuple): # pragma: no cover
+ key, i = key
+
+ d = self._unhashed_sp
+ if key.startswith('h_'):
+ d, key = self._hashed_sp, key[2:]
+
+ while (key, i) in d:
+ i += 1
+
+ d[(key, i)] = val
+
+ def __getitem__(self, key):
+ if isinstance(key, tuple): # pragma: no cover
+ return self._hashed_sp.get(key, self._unhashed_sp.get(key))
+
+ if key.startswith('h_'):
+ return [v for k, v in self._hashed_sp.items() if key[2:] == k[0]]
+
+ else:
+ return [v for k, v in itertools.chain(self._hashed_sp.items(), self._unhashed_sp.items()) if key == k[0]]
+
+ def __delitem__(self, key):
+ ##TODO: this
+ raise NotImplementedError
+
+ def __contains__(self, key):
+ return key in set(k for k, _ in itertools.chain(self._hashed_sp, self._unhashed_sp))
+
+ def __copy__(self):
+ sp = SubPackets()
+ sp._hashed_sp = self._hashed_sp.copy()
+ sp._unhashed_sp = self._unhashed_sp.copy()
+
+ return sp
+
+ def addnew(self, spname, hashed=False, **kwargs):
+ nsp = getattr(self._spmodule, spname)()
+ for p, v in kwargs.items():
+ if hasattr(nsp, p):
+ setattr(nsp, p, v)
+ nsp.update_hlen()
+ if hashed:
+ self['h_' + spname] = nsp
+
+ else:
+ self[spname] = nsp
+
+ def update_hlen(self):
+ for sp in self:
+ sp.update_hlen()
+
+ def parse(self, packet):
+ hl = self.bytes_to_int(packet[:2])
+ del packet[:2]
+
+ # we do it this way because we can't ensure that subpacket headers are sized appropriately
+ # for their contents, but we can at least output that correctly
+ # so instead of tracking how many bytes we can now output, we track how many bytes have we parsed so far
+ plen = len(packet)
+ while plen - len(packet) < hl:
+ sp = SignatureSP(packet)
+ self['h_' + sp.__class__.__name__] = sp
+
+ uhl = self.bytes_to_int(packet[:2])
+ del packet[:2]
+
+ plen = len(packet)
+ while plen - len(packet) < uhl:
+ sp = SignatureSP(packet)
+ self[sp.__class__.__name__] = sp
+
+
+class UserAttributeSubPackets(SubPackets):
+ """
+ This is nearly the same as just the unhashed subpackets from above,
+ except that there isn't a length specifier. So, parse will only parse one packet,
+ appending that one packet to self.__unhashed_sp.
+ """
+ _spmodule = userattribute
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ for uhsp in self._unhashed_sp.values():
+ _bytes += uhsp.__bytearray__()
+ return _bytes
+
+ def __len__(self): # pragma: no cover
+ return sum(len(sp) for sp in self._unhashed_sp.values())
+
+ def parse(self, packet):
+ # parse just one packet and add it to the unhashed subpacket ordereddict
+ # I actually have yet to come across a User Attribute packet with more than one subpacket
+ # which makes sense, given that there is only one defined subpacket
+ sp = UserAttribute(packet)
+ self[sp.__class__.__name__] = sp
+
+
+class Signature(MPIs):
+ def __init__(self):
+ for i in self.__mpis__:
+ setattr(self, i, MPI(0))
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ for i in self:
+ _bytes += i.to_mpibytes()
+ return _bytes
+
+ @abc.abstractproperty
+ def __sig__(self):
+ """return the signature bytes in a format that can be understood by the signature verifier"""
+
+ @abc.abstractmethod
+ def from_signer(self, sig):
+ """create and parse a concrete Signature class instance"""
+
+
+class RSASignature(Signature):
+ __mpis__ = ('md_mod_n', )
+
+ def __sig__(self):
+ return self.md_mod_n.to_mpibytes()[2:]
+
+ def parse(self, packet):
+ self.md_mod_n = MPI(packet)
+
+ def from_signer(self, sig):
+ self.md_mod_n = MPI(self.bytes_to_int(sig))
+
+
+class DSASignature(Signature):
+ __mpis__ = ('r', 's')
+
+ def __sig__(self):
+ # return the signature data into an ASN.1 sequence of integers in DER format
+ seq = Sequence()
+ for i in self:
+ seq.setComponentByPosition(len(seq), Integer(i))
+
+ return encoder.encode(seq)
+
+ def from_signer(self, sig):
+ ##TODO: just use pyasn1 for this
+ def _der_intf(_asn):
+ if _asn[0] != 0x02: # pragma: no cover
+ raise ValueError("Expected: Integer (0x02). Got: 0x{:02X}".format(_asn[0]))
+ del _asn[0]
+
+ if _asn[0] & 0x80: # pragma: no cover
+ llen = _asn[0] & 0x7F
+ del _asn[0]
+
+ flen = self.bytes_to_int(_asn[:llen])
+ del _asn[:llen]
+
+ else:
+ flen = _asn[0] & 0x7F
+ del _asn[0]
+
+ i = self.bytes_to_int(_asn[:flen])
+ del _asn[:flen]
+ return i
+
+ if isinstance(sig, bytes):
+ sig = bytearray(sig)
+
+ # this is a very limited asn1 decoder - it is only intended to decode a DER encoded sequence of integers
+ if not sig[0] == 0x30:
+ raise NotImplementedError("Expected: Sequence (0x30). Got: 0x{:02X}".format(sig[0]))
+ del sig[0]
+
+ # skip the sequence length field
+ if sig[0] & 0x80: # pragma: no cover
+ llen = sig[0] & 0x7F
+ del sig[:llen + 1]
+
+ else:
+ del sig[0]
+
+ self.r = MPI(_der_intf(sig))
+ self.s = MPI(_der_intf(sig))
+
+ def parse(self, packet):
+ self.r = MPI(packet)
+ self.s = MPI(packet)
+
+
+class ECDSASignature(DSASignature):
+ def from_signer(self, sig):
+ seq, _ = decoder.decode(sig)
+ self.r = MPI(seq[0])
+ self.s = MPI(seq[1])
+
+
+class PubKey(MPIs):
+ __pubfields__ = ()
+
+ @property
+ def __mpis__(self):
+ for i in self.__pubfields__:
+ yield i
+
+ def __init__(self):
+ super(PubKey, self).__init__()
+ for field in self.__pubfields__:
+ if isinstance(field, tuple): # pragma: no cover
+ field, val = field
+
+ else:
+ val = MPI(0)
+
+ setattr(self, field, val)
+
+ @abc.abstractmethod
+ def __pubkey__(self):
+ """return the requisite *PublicKey class from the cryptography library"""
+
+ def __len__(self):
+ return sum(len(getattr(self, i)) for i in self.__pubfields__)
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ for field in self.__pubfields__:
+ _bytes += getattr(self, field).to_mpibytes()
+
+ return _bytes
+
+ def publen(self):
+ return len(self)
+
+ def verify(self, subj, sigbytes, hash_alg):
+ return NotImplemented # pragma: no cover
+
+
+class OpaquePubKey(PubKey): # pragma: no cover
+ def __init__(self):
+ super(OpaquePubKey, self).__init__()
+ self.data = bytearray()
+
+ def __iter__(self):
+ yield self.data
+
+ def __pubkey__(self):
+ return NotImplemented
+
+ def __bytearray__(self):
+ return self.data
+
+ def parse(self, packet):
+ ##TODO: this needs to be length-bounded to the end of the packet
+ self.data = packet
+
+
+class RSAPub(PubKey):
+ __pubfields__ = ('n', 'e')
+
+ def __pubkey__(self):
+ return rsa.RSAPublicNumbers(self.e, self.n).public_key(default_backend())
+
+ def verify(self, subj, sigbytes, hash_alg):
+ # zero-pad sigbytes if necessary
+ sigbytes = (b'\x00' * (self.n.byte_length() - len(sigbytes))) + sigbytes
+ verifier = self.__pubkey__().verifier(sigbytes, padding.PKCS1v15(), hash_alg)
+ verifier.update(subj)
+
+ try:
+ verifier.verify()
+
+ except InvalidSignature:
+ return False
+
+ return True
+
+ def parse(self, packet):
+ self.n = MPI(packet)
+ self.e = MPI(packet)
+
+
+class DSAPub(PubKey):
+ __pubfields__ = ('p', 'q', 'g', 'y')
+
+ def __pubkey__(self):
+ params = dsa.DSAParameterNumbers(self.p, self.q, self.g)
+ return dsa.DSAPublicNumbers(self.y, params).public_key(default_backend())
+
+ def verify(self, subj, sigbytes, hash_alg):
+ verifier = self.__pubkey__().verifier(sigbytes, hash_alg)
+ verifier.update(subj)
+
+ try:
+ verifier.verify()
+
+ except InvalidSignature:
+ return False
+
+ return True
+
+ def parse(self, packet):
+ self.p = MPI(packet)
+ self.q = MPI(packet)
+ self.g = MPI(packet)
+ self.y = MPI(packet)
+
+
+class ElGPub(PubKey):
+ __pubfields__ = ('p', 'g', 'y')
+
+ def __pubkey__(self):
+ raise NotImplementedError()
+
+ def parse(self, packet):
+ self.p = MPI(packet)
+ self.g = MPI(packet)
+ self.y = MPI(packet)
+
+
+class ECDSAPub(PubKey):
+ __pubfields__ = ('x', 'y')
+
+ def __init__(self):
+ super(ECDSAPub, self).__init__()
+ self.oid = None
+
+ def __len__(self):
+ return sum([len(getattr(self, i)) - 2 for i in self.__pubfields__] +
+ [3, len(encoder.encode(self.oid.value)) - 1])
+
+ def __pubkey__(self):
+ return ec.EllipticCurvePublicNumbers(self.x, self.y, self.oid.curve()).public_key(default_backend())
+
+ def __bytearray__(self):
+ _b = bytearray()
+ _b += encoder.encode(self.oid.value)[1:]
+ # 0x04 || x || y
+ # where x and y are the same length
+ _xy = b'\x04' + self.x.to_mpibytes()[2:] + self.y.to_mpibytes()[2:]
+ _b += MPI(self.bytes_to_int(_xy, 'big')).to_mpibytes()
+
+ return _b
+
+ def __copy__(self):
+ pkt = super(ECDSAPub, self).__copy__()
+ pkt.oid = self.oid
+ return pkt
+
+ def verify(self, subj, sigbytes, hash_alg):
+ verifier = self.__pubkey__().verifier(sigbytes, ec.ECDSA(hash_alg))
+ verifier.update(subj)
+
+ try:
+ verifier.verify()
+
+ except InvalidSignature:
+ return False
+
+ return True
+
+ def parse(self, packet):
+ oidlen = packet[0]
+ del packet[0]
+ _oid = bytearray(b'\x06')
+ _oid.append(oidlen)
+ _oid += bytearray(packet[:oidlen])
+ # try:
+ oid, _ = decoder.decode(bytes(_oid))
+
+ # except:
+ # raise PGPError("Bad OID octet stream: b'{:s}'".format(''.join(['\\x{:02X}'.format(c) for c in _oid])))
+ self.oid = EllipticCurveOID(oid)
+ del packet[:oidlen]
+
+ # flen = (self.oid.bit_length // 8)
+ xy = bytearray(MPI(packet).to_mpibytes()[2:])
+ # xy = bytearray(MPI(packet).to_bytes(flen, 'big'))
+ # the first byte is just \x04
+ del xy[:1]
+ # now xy needs to be separated into x, y
+ xylen = len(xy)
+ x, y = xy[:xylen // 2], xy[xylen // 2:]
+ self.x = MPI(self.bytes_to_int(x))
+ self.y = MPI(self.bytes_to_int(y))
+
+
+class ECDHPub(PubKey):
+ __pubfields__ = ('x', 'y')
+
+ def __init__(self):
+ super(ECDHPub, self).__init__()
+ self.oid = None
+ self.kdf = ECKDF()
+
+ def __len__(self):
+ return sum([len(getattr(self, i)) - 2 for i in self.__pubfields__] +
+ [3,
+ len(self.kdf),
+ len(encoder.encode(self.oid.value)) - 1])
+
+ def __pubkey__(self):
+ return ec.EllipticCurvePublicNumbers(self.x, self.y, self.oid.curve()).public_key(default_backend())
+
+ def __bytearray__(self):
+ _b = bytearray()
+ _b += encoder.encode(self.oid.value)[1:]
+ # 0x04 || x || y
+ # where x and y are the same length
+ _xy = b'\x04' + self.x.to_mpibytes()[2:] + self.y.to_mpibytes()[2:]
+ _b += MPI(self.bytes_to_int(_xy, 'big')).to_mpibytes()
+ _b += self.kdf.__bytearray__()
+
+ return _b
+
+ def __copy__(self):
+ pkt = super(ECDHPub, self).__copy__()
+ pkt.oid = self.oid
+ pkt.kdf = copy.copy(self.kdf)
+ return pkt
+
+ def parse(self, packet):
+ """
+ Algorithm-Specific Fields for ECDH keys:
+
+ o a variable-length field containing a curve OID, formatted
+ as follows:
+
+ - a one-octet size of the following field; values 0 and
+ 0xFF are reserved for future extensions
+
+ - the octets representing a curve OID, defined in
+ Section 11
+
+ - MPI of an EC point representing a public key
+
+ o a variable-length field containing KDF parameters,
+ formatted as follows:
+
+ - a one-octet size of the following fields; values 0 and
+ 0xff are reserved for future extensions
+
+ - a one-octet value 01, reserved for future extensions
+
+ - a one-octet hash function ID used with a KDF
+
+ - a one-octet algorithm ID for the symmetric algorithm
+ used to wrap the symmetric key used for the message
+ encryption; see Section 8 for details
+ """
+ oidlen = packet[0]
+ del packet[0]
+ _oid = bytearray(b'\x06')
+ _oid.append(oidlen)
+ _oid += bytearray(packet[:oidlen])
+ # try:
+ oid, _ = decoder.decode(bytes(_oid))
+
+ # except:
+ # raise PGPError("Bad OID octet stream: b'{:s}'".format(''.join(['\\x{:02X}'.format(c) for c in _oid])))
+ self.oid = EllipticCurveOID(oid)
+ del packet[:oidlen]
+
+ # flen = (self.oid.bit_length // 8)
+ xy = bytearray(MPI(packet).to_mpibytes()[2:])
+ # xy = bytearray(MPI(packet).to_bytes(flen, 'big'))
+ # the first byte is just \x04
+ del xy[:1]
+ # now xy needs to be separated into x, y
+ xylen = len(xy)
+ x, y = xy[:xylen // 2], xy[xylen // 2:]
+ self.x = MPI(self.bytes_to_int(x))
+ self.y = MPI(self.bytes_to_int(y))
+
+ self.kdf.parse(packet)
+
+
+class String2Key(Field):
+ """
+ 3.7. String-to-Key (S2K) Specifiers
+
+ String-to-key (S2K) specifiers are used to convert passphrase strings
+ into symmetric-key encryption/decryption keys. They are used in two
+ places, currently: to encrypt the secret part of private keys in the
+ private keyring, and to convert passphrases to encryption keys for
+ symmetrically encrypted messages.
+
+ 3.7.1. String-to-Key (S2K) Specifier Types
+
+ There are three types of S2K specifiers currently supported, and
+ some reserved values:
+
+ ID S2K Type
+ -- --------
+ 0 Simple S2K
+ 1 Salted S2K
+ 2 Reserved value
+ 3 Iterated and Salted S2K
+ 100 to 110 Private/Experimental S2K
+
+ These are described in Sections 3.7.1.1 - 3.7.1.3.
+
+ 3.7.1.1. Simple S2K
+
+ This directly hashes the string to produce the key data. See below
+ for how this hashing is done.
+
+ Octet 0: 0x00
+ Octet 1: hash algorithm
+
+ Simple S2K hashes the passphrase to produce the session key. The
+ manner in which this is done depends on the size of the session key
+ (which will depend on the cipher used) and the size of the hash
+ algorithm's output. If the hash size is greater than the session key
+ size, the high-order (leftmost) octets of the hash are used as the
+ key.
+
+ If the hash size is less than the key size, multiple instances of the
+ hash context are created -- enough to produce the required key data.
+ These instances are preloaded with 0, 1, 2, ... octets of zeros (that
+ is to say, the first instance has no preloading, the second gets
+ preloaded with 1 octet of zero, the third is preloaded with two
+ octets of zeros, and so forth).
+
+ As the data is hashed, it is given independently to each hash
+ context. Since the contexts have been initialized differently, they
+ will each produce different hash output. Once the passphrase is
+ hashed, the output data from the multiple hashes is concatenated,
+ first hash leftmost, to produce the key data, with any excess octets
+ on the right discarded.
+
+ 3.7.1.2. Salted S2K
+
+ This includes a "salt" value in the S2K specifier -- some arbitrary
+ data -- that gets hashed along with the passphrase string, to help
+ prevent dictionary attacks.
+
+ Octet 0: 0x01
+ Octet 1: hash algorithm
+ Octets 2-9: 8-octet salt value
+
+ Salted S2K is exactly like Simple S2K, except that the input to the
+ hash function(s) consists of the 8 octets of salt from the S2K
+ specifier, followed by the passphrase.
+
+ 3.7.1.3. Iterated and Salted S2K
+
+ This includes both a salt and an octet count. The salt is combined
+ with the passphrase and the resulting value is hashed repeatedly.
+ This further increases the amount of work an attacker must do to try
+ dictionary attacks.
+
+ Octet 0: 0x03
+ Octet 1: hash algorithm
+ Octets 2-9: 8-octet salt value
+ Octet 10: count, a one-octet, coded value
+
+ The count is coded into a one-octet number using the following
+ formula:
+
+ #define EXPBIAS 6
+ count = ((Int32)16 + (c & 15)) << ((c >> 4) + EXPBIAS);
+
+ The above formula is in C, where "Int32" is a type for a 32-bit
+ integer, and the variable "c" is the coded count, Octet 10.
+
+ Iterated-Salted S2K hashes the passphrase and salt data multiple
+ times. The total number of octets to be hashed is specified in the
+ encoded count in the S2K specifier. Note that the resulting count
+ value is an octet count of how many octets will be hashed, not an
+ iteration count.
+
+ Initially, one or more hash contexts are set up as with the other S2K
+ algorithms, depending on how many octets of key data are needed.
+ Then the salt, followed by the passphrase data, is repeatedly hashed
+ until the number of octets specified by the octet count has been
+ hashed. The one exception is that if the octet count is less than
+ the size of the salt plus passphrase, the full salt plus passphrase
+ will be hashed even though that is greater than the octet count.
+ After the hashing is done, the data is unloaded from the hash
+ context(s) as with the other S2K algorithms.
+ """
+ @sdproperty
+ def encalg(self):
+ return self._encalg
+
+ @encalg.register(int)
+ @encalg.register(SymmetricKeyAlgorithm)
+ def encalg_int(self, val):
+ self._encalg = SymmetricKeyAlgorithm(val)
+
+ @sdproperty
+ def specifier(self):
+ return self._specifier
+
+ @specifier.register(int)
+ @specifier.register(String2KeyType)
+ def specifier_int(self, val):
+ self._specifier = String2KeyType(val)
+
+ @sdproperty
+ def halg(self):
+ return self._halg
+
+ @halg.register(int)
+ @halg.register(HashAlgorithm)
+ def halg_int(self, val):
+ self._halg = HashAlgorithm(val)
+
+ @sdproperty
+ def count(self):
+ return (16 + (self._count & 15)) << ((self._count >> 4) + 6)
+
+ @count.register(int)
+ def count_int(self, val):
+ if val < 0 or val > 255: # pragma: no cover
+ raise ValueError("count must be between 0 and 256")
+ self._count = val
+
+ def __init__(self):
+ super(String2Key, self).__init__()
+ self.usage = 0
+ self.encalg = 0
+ self.specifier = 0
+ self.iv = None
+
+ # specifier-specific fields
+ # simple, salted, iterated
+ self.halg = 0
+
+ # salted, iterated
+ self.salt = bytearray()
+
+ # iterated
+ self.count = 0
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes.append(self.usage)
+ if bool(self):
+ _bytes.append(self.encalg)
+ _bytes.append(self.specifier)
+ if self.specifier >= String2KeyType.Simple:
+ _bytes.append(self.halg)
+ if self.specifier >= String2KeyType.Salted:
+ _bytes += self.salt
+ if self.specifier == String2KeyType.Iterated:
+ _bytes.append(self._count)
+ if self.iv is not None:
+ _bytes += self.iv
+ return _bytes
+
+ def __len__(self):
+ return len(self.__bytearray__())
+
+ def __bool__(self):
+ return self.usage in [254, 255]
+
+ def __nonzero__(self):
+ return self.__bool__()
+
+ def __copy__(self):
+ s2k = String2Key()
+ s2k.usage = self.usage
+ s2k.encalg = self.encalg
+ s2k.specifier = self.specifier
+ s2k.iv = self.iv
+ s2k.halg = self.halg
+ s2k.salt = copy.copy(self.salt)
+ s2k.count = self._count
+ return s2k
+
+ def parse(self, packet, iv=True):
+ self.usage = packet[0]
+ del packet[0]
+
+ if bool(self):
+ self.encalg = packet[0]
+ del packet[0]
+
+ self.specifier = packet[0]
+ del packet[0]
+
+ if self.specifier >= String2KeyType.Simple:
+ # this will always be true
+ self.halg = packet[0]
+ del packet[0]
+
+ if self.specifier >= String2KeyType.Salted:
+ self.salt = packet[:8]
+ del packet[:8]
+
+ if self.specifier == String2KeyType.Iterated:
+ self.count = packet[0]
+ del packet[0]
+
+ if iv:
+ self.iv = packet[:(self.encalg.block_size // 8)]
+ del packet[:(self.encalg.block_size // 8)]
+
+ def derive_key(self, passphrase):
+ ##TODO: raise an exception if self.usage is not 254 or 255
+ keylen = self.encalg.key_size
+ hashlen = self.halg.digest_size * 8
+
+ ctx = int(math.ceil((keylen / hashlen)))
+
+ # Simple S2K - always done
+ hsalt = b''
+ hpass = passphrase.encode('latin-1')
+
+ # salted, iterated S2K
+ if self.specifier >= String2KeyType.Salted:
+ hsalt = bytes(self.salt)
+
+ count = len(hsalt + hpass)
+ if self.specifier == String2KeyType.Iterated and self.count > len(hsalt + hpass):
+ count = self.count
+
+ hcount = (count // len(hsalt + hpass))
+ hleft = count - (hcount * len(hsalt + hpass))
+
+ hashdata = ((hsalt + hpass) * hcount) + (hsalt + hpass)[:hleft]
+
+ h = []
+ for i in range(0, ctx):
+ _h = self.halg.hasher
+ _h.update(b'\x00' * i)
+ _h.update(hashdata)
+ h.append(_h)
+
+ # GC some stuff
+ del hsalt
+ del hpass
+ del hashdata
+
+ # and return the key!
+ return b''.join(hc.digest() for hc in h)[:(keylen // 8)]
+
+
+class ECKDF(Field):
+ """
+ o a variable-length field containing KDF parameters,
+ formatted as follows:
+
+ - a one-octet size of the following fields; values 0 and
+ 0xff are reserved for future extensions
+
+ - a one-octet value 01, reserved for future extensions
+
+ - a one-octet hash function ID used with a KDF
+
+ - a one-octet algorithm ID for the symmetric algorithm
+ used to wrap the symmetric key used for the message
+ encryption; see Section 8 for details
+ """
+ @sdproperty
+ def halg(self):
+ return self._halg
+
+ @halg.register(int)
+ @halg.register(HashAlgorithm)
+ def halg_int(self, val):
+ self._halg = HashAlgorithm(val)
+
+ @sdproperty
+ def encalg(self):
+ return self._encalg
+
+ @encalg.register(int)
+ @encalg.register(SymmetricKeyAlgorithm)
+ def encalg_int(self, val):
+ self._encalg = SymmetricKeyAlgorithm(val)
+
+ def __init__(self):
+ super(ECKDF, self).__init__()
+ self.halg = 0
+ self.encalg = 0
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes.append(len(self) - 1)
+ _bytes.append(0x01)
+ _bytes.append(self.halg)
+ _bytes.append(self.encalg)
+ return _bytes
+
+ def __len__(self):
+ return 4
+
+ def parse(self, packet):
+ # packet[0] should always be 3
+ # packet[1] should always be 1
+ # TODO: this assert is likely not necessary, but we should raise some kind of exception
+ # if parsing fails due to these fields being incorrect
+ assert packet[:2] == b'\x03\x01'
+ del packet[:2]
+
+ self.halg = packet[0]
+ del packet[0]
+
+ self.encalg = packet[0]
+ del packet[0]
+
+ def derive_key(self, s, curve, pkalg, fingerprint):
+ # wrapper around the Concatenation KDF method provided by cryptography
+ # assemble the additional data as defined in RFC 6637:
+ # Param = curve_OID_len || curve_OID || public_key_alg_ID || 03 || 01 || KDF_hash_ID || KEK_alg_ID for AESKeyWrap || "Anonymous
+ data = bytearray()
+ data += encoder.encode(curve.value)[1:]
+ data.append(pkalg)
+ data += b'\x03\x01'
+ data.append(self.halg)
+ data.append(self.encalg)
+ data += b'Anonymous Sender '
+ data += binascii.unhexlify(fingerprint.replace(' ', ''))
+
+ ckdf = ConcatKDFHash(algorithm=getattr(hashes, self.halg.name)(), length=self.encalg.key_size // 8, otherinfo=bytes(data), backend=default_backend())
+ return ckdf.derive(s)
+
+
+class PrivKey(PubKey):
+ __privfields__ = ()
+
+ @property
+ def __mpis__(self):
+ for i in super(PrivKey, self).__mpis__:
+ yield i
+
+ for i in self.__privfields__:
+ yield i
+
+ def __init__(self):
+ super(PrivKey, self).__init__()
+
+ self.s2k = String2Key()
+ self.encbytes = bytearray()
+ self.chksum = bytearray()
+
+ for field in self.__privfields__:
+ setattr(self, field, MPI(0))
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(PrivKey, self).__bytearray__()
+
+ _bytes += self.s2k.__bytearray__()
+ if self.s2k:
+ _bytes += self.encbytes
+
+ else:
+ for field in self.__privfields__:
+ _bytes += getattr(self, field).to_mpibytes()
+
+ if self.s2k.usage == 0:
+ _bytes += self.chksum
+
+ return _bytes
+
+ def __len__(self):
+ l = super(PrivKey, self).__len__() + len(self.s2k) + len(self.chksum)
+ if self.s2k:
+ l += len(self.encbytes)
+
+ else:
+ l += sum(len(getattr(self, i)) for i in self.__privfields__)
+
+ return l
+
+ def __copy__(self):
+ pk = super(PrivKey, self).__copy__()
+ pk.s2k = copy.copy(self.s2k)
+ pk.encbytes = copy.copy(self.encbytes)
+ pk.chksum = copy.copy(self.chksum)
+ return pk
+
+ @abc.abstractmethod
+ def __privkey__(self):
+ """return the requisite *PrivateKey class from the cryptography library"""
+
+ @abc.abstractmethod
+ def _generate(self, key_size):
+ """Generate a new PrivKey"""
+
+ def _compute_chksum(self):
+ chs = sum(sum(bytearray(c.to_mpibytes())) for c in self) % 65536
+ self.chksum = bytearray(self.int_to_bytes(chs, 2))
+
+ def publen(self):
+ return super(PrivKey, self).__len__()
+
+ def encrypt_keyblob(self, passphrase, enc_alg, hash_alg):
+ # PGPy will only ever use iterated and salted S2k mode
+ self.s2k.usage = 254
+ self.s2k.encalg = enc_alg
+ self.s2k.specifier = String2KeyType.Iterated
+ self.s2k.iv = enc_alg.gen_iv()
+ self.s2k.halg = hash_alg
+ self.s2k.salt = bytearray(os.urandom(8))
+ self.s2k.count = hash_alg.tuned_count
+
+ # now that String-to-Key is ready to go, derive sessionkey from passphrase
+ # and then unreference passphrase
+ sessionkey = self.s2k.derive_key(passphrase)
+ del passphrase
+
+ pt = bytearray()
+ for pf in self.__privfields__:
+ pt += getattr(self, pf).to_mpibytes()
+
+ # append a SHA-1 hash of the plaintext so far to the plaintext
+ pt += hashlib.new('sha1', pt).digest()
+
+ # encrypt
+ self.encbytes = bytearray(_encrypt(bytes(pt), bytes(sessionkey), enc_alg, bytes(self.s2k.iv)))
+
+ # delete pt and clear self
+ del pt
+ self.clear()
+
+ @abc.abstractmethod
+ def decrypt_keyblob(self, passphrase):
+ if not self.s2k: # pragma: no cover
+ # not encrypted
+ return
+
+ # Encryption/decryption of the secret data is done in CFB mode using
+ # the key created from the passphrase and the Initial Vector from the
+ # packet. A different mode is used with V3 keys (which are only RSA)
+ # than with other key formats. (...)
+ #
+ # With V4 keys, a simpler method is used. All secret MPI values are
+ # encrypted in CFB mode, including the MPI bitcount prefix.
+
+ # derive the session key from our passphrase, and then unreference passphrase
+ sessionkey = self.s2k.derive_key(passphrase)
+ del passphrase
+
+ # attempt to decrypt this key
+ pt = _decrypt(bytes(self.encbytes), bytes(sessionkey), self.s2k.encalg, bytes(self.s2k.iv))
+
+ # check the hash to see if we decrypted successfully or not
+ if self.s2k.usage == 254 and not pt[-20:] == hashlib.new('sha1', pt[:-20]).digest():
+ # if the usage byte is 254, key material is followed by a 20-octet sha-1 hash of the rest
+ # of the key material block
+ raise PGPDecryptionError("Passphrase was incorrect!")
+
+ if self.s2k.usage == 255 and not self.bytes_to_int(pt[-2:]) == (sum(bytearray(pt[:-2])) % 65536): # pragma: no cover
+ # if the usage byte is 255, key material is followed by a 2-octet checksum of the rest
+ # of the key material block
+ raise PGPDecryptionError("Passphrase was incorrect!")
+
+ return bytearray(pt)
+
+ def sign(self, sigdata, hash_alg):
+ return NotImplemented # pragma: no cover
+
+ def clear(self):
+ """delete and re-initialize all private components to zero"""
+ for field in self.__privfields__:
+ delattr(self, field)
+ setattr(self, field, MPI(0))
+
+
+class OpaquePrivKey(PrivKey, OpaquePubKey): # pragma: no cover
+ def __privkey__(self):
+ return NotImplemented
+
+ def _generate(self, key_size):
+ # return NotImplemented
+ raise NotImplementedError()
+
+ def decrypt_keyblob(self, passphrase):
+ return NotImplemented
+
+
+class RSAPriv(PrivKey, RSAPub):
+ __privfields__ = ('d', 'p', 'q', 'u')
+
+ def __privkey__(self):
+ return rsa.RSAPrivateNumbers(self.p, self.q, self.d,
+ rsa.rsa_crt_dmp1(self.d, self.p),
+ rsa.rsa_crt_dmq1(self.d, self.q),
+ rsa.rsa_crt_iqmp(self.p, self.q),
+ rsa.RSAPublicNumbers(self.e, self.n)).private_key(default_backend())
+
+ def _generate(self, key_size):
+ if any(c != 0 for c in self): # pragma: no cover
+ raise PGPError("key is already populated")
+
+ # generate some big numbers!
+ pk = rsa.generate_private_key(65537, key_size, default_backend())
+ pkn = pk.private_numbers()
+
+ self.n = MPI(pkn.public_numbers.n)
+ self.e = MPI(pkn.public_numbers.e)
+ self.d = MPI(pkn.d)
+ self.p = MPI(pkn.p)
+ self.q = MPI(pkn.q)
+ # from the RFC:
+ # "- MPI of u, the multiplicative inverse of p, mod q."
+ # or, simply, p^-1 mod p
+ # rsa.rsa_crt_iqmp(p, q) normally computes q^-1 mod p,
+ # so if we swap the values around we get the answer we want
+ self.u = MPI(rsa.rsa_crt_iqmp(pkn.q, pkn.p))
+
+ del pkn
+ del pk
+
+ self._compute_chksum()
+
+ def parse(self, packet):
+ super(RSAPriv, self).parse(packet)
+ self.s2k.parse(packet)
+
+ if not self.s2k:
+ self.d = MPI(packet)
+ self.p = MPI(packet)
+ self.q = MPI(packet)
+ self.u = MPI(packet)
+
+ if self.s2k.usage == 0:
+ self.chksum = packet[:2]
+ del packet[:2]
+
+ else:
+ ##TODO: this needs to be bounded to the length of the encrypted key material
+ self.encbytes = packet
+
+ def decrypt_keyblob(self, passphrase):
+ kb = super(RSAPriv, self).decrypt_keyblob(passphrase)
+ del passphrase
+
+ self.d = MPI(kb)
+ self.p = MPI(kb)
+ self.q = MPI(kb)
+ self.u = MPI(kb)
+
+ if self.s2k.usage in [254, 255]:
+ self.chksum = kb
+ del kb
+
+ def sign(self, sigdata, hash_alg):
+ signer = self.__privkey__().signer(padding.PKCS1v15(), hash_alg)
+ signer.update(sigdata)
+ return signer.finalize()
+
+
+class DSAPriv(PrivKey, DSAPub):
+ __privfields__ = ('x',)
+
+ def __privkey__(self):
+ params = dsa.DSAParameterNumbers(self.p, self.q, self.g)
+ pn = dsa.DSAPublicNumbers(self.y, params)
+ return dsa.DSAPrivateNumbers(self.x, pn).private_key(default_backend())
+
+ def _generate(self, key_size):
+ if any(c != 0 for c in self): # pragma: no cover
+ raise PGPError("key is already populated")
+
+ # generate some big numbers!
+ pk = dsa.generate_private_key(key_size, default_backend())
+ pkn = pk.private_numbers()
+
+ self.p = MPI(pkn.public_numbers.parameter_numbers.p)
+ self.q = MPI(pkn.public_numbers.parameter_numbers.q)
+ self.g = MPI(pkn.public_numbers.parameter_numbers.g)
+ self.y = MPI(pkn.public_numbers.y)
+ self.x = MPI(pkn.x)
+
+ del pkn
+ del pk
+
+ self._compute_chksum()
+
+ def parse(self, packet):
+ super(DSAPriv, self).parse(packet)
+ self.s2k.parse(packet)
+
+ if not self.s2k:
+ self.x = MPI(packet)
+
+ else:
+ self.encbytes = packet
+
+ if self.s2k.usage in [0, 255]:
+ self.chksum = packet[:2]
+ del packet[:2]
+
+ def decrypt_keyblob(self, passphrase):
+ kb = super(DSAPriv, self).decrypt_keyblob(passphrase)
+ del passphrase
+
+ self.x = MPI(kb)
+
+ if self.s2k.usage in [254, 255]:
+ self.chksum = kb
+ del kb
+
+ def sign(self, sigdata, hash_alg):
+ signer = self.__privkey__().signer(hash_alg)
+ signer.update(sigdata)
+ return signer.finalize()
+
+
+class ElGPriv(PrivKey, ElGPub):
+ __privfields__ = ('x', )
+
+ def __privkey__(self):
+ raise NotImplementedError()
+
+ def _generate(self, key_size):
+ raise NotImplementedError(PubKeyAlgorithm.ElGamal)
+
+ def parse(self, packet):
+ super(ElGPriv, self).parse(packet)
+ self.s2k.parse(packet)
+
+ if not self.s2k:
+ self.x = MPI(packet)
+
+ else:
+ self.encbytes = packet
+
+ if self.s2k.usage in [0, 255]:
+ self.chksum = packet[:2]
+ del packet[:2]
+
+ def decrypt_keyblob(self, passphrase):
+ kb = super(ElGPriv, self).decrypt_keyblob(passphrase)
+ del passphrase
+
+ self.x = MPI(kb)
+
+ if self.s2k.usage in [254, 255]:
+ self.chksum = kb
+ del kb
+
+
+class ECDSAPriv(PrivKey, ECDSAPub):
+ __privfields__ = ('s', )
+
+ def __privkey__(self):
+ ecp = ec.EllipticCurvePublicNumbers(self.x, self.y, self.oid.curve())
+ return ec.EllipticCurvePrivateNumbers(self.s, ecp).private_key(default_backend())
+
+ def _generate(self, oid):
+ if any(c != 0 for c in self): # pragma: no cover
+ raise PGPError("Key is already populated!")
+
+ self.oid = EllipticCurveOID(oid)
+
+ if not self.oid.can_gen:
+ raise ValueError("Curve not currently supported: {}".format(oid.name))
+
+ pk = ec.generate_private_key(self.oid.curve(), default_backend())
+ pubn = pk.public_key().public_numbers()
+ self.x = MPI(pubn.x)
+ self.y = MPI(pubn.y)
+ self.s = MPI(pk.private_numbers().private_value)
+
+ def parse(self, packet):
+ super(ECDSAPriv, self).parse(packet)
+ self.s2k.parse(packet)
+
+ if not self.s2k:
+ self.s = MPI(packet)
+
+ if self.s2k.usage == 0:
+ self.chksum = packet[:2]
+ del packet[:2]
+
+ else:
+ ##TODO: this needs to be bounded to the length of the encrypted key material
+ self.encbytes = packet
+
+ def decrypt_keyblob(self, passphrase):
+ kb = super(ECDSAPriv, self).decrypt_keyblob(passphrase)
+ del passphrase
+
+ self.s = MPI(kb)
+
+ def sign(self, sigdata, hash_alg):
+ signer = self.__privkey__().signer(ec.ECDSA(hash_alg))
+ signer.update(sigdata)
+ return signer.finalize()
+
+
+class ECDHPriv(ECDSAPriv, ECDHPub):
+ def __bytearray__(self):
+ _b = ECDHPub.__bytearray__(self)
+ _b += self.s2k.__bytearray__()
+ if not self.s2k:
+ _b += self.s.to_mpibytes()
+
+ if self.s2k.usage == 0:
+ _b += self.chksum
+
+ else:
+ _b += self.encbytes
+
+ return _b
+
+ def __len__(self):
+ # because of the inheritance used for this, ECDSAPub.__len__ is called instead of ECDHPub.__len__
+ # the only real difference is self.kdf, so we can just add that
+ return super(ECDHPriv, self).__len__() + len(self.kdf)
+
+ def _generate(self, oid):
+ ECDSAPriv._generate(self, oid)
+ self.kdf.halg = self.oid.kdf_halg
+ self.kdf.encalg = self.oid.kek_alg
+
+ def publen(self):
+ return ECDHPub.__len__(self)
+
+ def parse(self, packet):
+ ECDHPub.parse(self, packet)
+ self.s2k.parse(packet)
+
+ if not self.s2k:
+ self.s = MPI(packet)
+
+ if self.s2k.usage == 0:
+ self.chksum = packet[:2]
+ del packet[:2]
+
+ else:
+ ##TODO: this needs to be bounded to the length of the encrypted key material
+ self.encbytes = packet
+
+
+class CipherText(MPIs):
+ def __init__(self):
+ super(CipherText, self).__init__()
+ for i in self.__mpis__:
+ setattr(self, i, MPI(0))
+
+ @classmethod
+ @abc.abstractmethod
+ def encrypt(cls, encfn, *args):
+ """create and populate a concrete CipherText class instance"""
+
+ @abc.abstractmethod
+ def decrypt(self, decfn, *args):
+ """decrypt the ciphertext contained in this CipherText instance"""
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ for i in self:
+ _bytes += i.to_mpibytes()
+ return _bytes
+
+
+class RSACipherText(CipherText):
+ __mpis__ = ('me_mod_n', )
+
+ @classmethod
+ def encrypt(cls, encfn, *args):
+ ct = cls()
+ ct.me_mod_n = MPI(cls.bytes_to_int(encfn(*args)))
+ return ct
+
+ def decrypt(self, decfn, *args):
+ return decfn(*args)
+
+ def parse(self, packet):
+ self.me_mod_n = MPI(packet)
+
+
+class ElGCipherText(CipherText):
+ __mpis__ = ('gk_mod_p', 'myk_mod_p')
+
+ @classmethod
+ def encrypt(cls, encfn, *args):
+ raise NotImplementedError()
+
+ def decrypt(self, decfn, *args):
+ raise NotImplementedError()
+
+ def parse(self, packet):
+ self.gk_mod_p = MPI(packet)
+ self.myk_mod_p = MPI(packet)
+
+
+class ECDHCipherText(CipherText):
+ __mpis__ = ('vX', 'vY')
+
+ @classmethod
+ def encrypt(cls, pk, *args):
+ """
+ For convenience, the synopsis of the encoding method is given below;
+ however, this section, [NIST-SP800-56A], and [RFC3394] are the
+ normative sources of the definition.
+
+ Obtain the authenticated recipient public key R
+ Generate an ephemeral key pair {v, V=vG}
+ Compute the shared point S = vR;
+ m = symm_alg_ID || session key || checksum || pkcs5_padding;
+ curve_OID_len = (byte)len(curve_OID);
+ Param = curve_OID_len || curve_OID || public_key_alg_ID || 03
+ || 01 || KDF_hash_ID || KEK_alg_ID for AESKeyWrap || "Anonymous
+ Sender " || recipient_fingerprint;
+ Z_len = the key size for the KEK_alg_ID used with AESKeyWrap
+ Compute Z = KDF( S, Z_len, Param );
+ Compute C = AESKeyWrap( Z, m ) as per [RFC3394]
+ VB = convert point V to the octet string
+ Output (MPI(VB) || len(C) || C).
+
+ The decryption is the inverse of the method given. Note that the
+ recipient obtains the shared secret by calculating
+ """
+ # *args should be:
+ # - m
+ #
+ _m, = args
+
+ # m may need to be PKCS5-padded
+ padder = PKCS7(64).padder()
+ m = padder.update(_m) + padder.finalize()
+
+ km = pk.keymaterial
+
+ ct = cls()
+
+ # generate ephemeral key pair, then store it in ct
+ v = ec.generate_private_key(km.oid.curve(), default_backend())
+ ct.vX = MPI(v.public_key().public_numbers().x)
+ ct.vY = MPI(v.public_key().public_numbers().y)
+
+ # compute the shared point S
+ s = v.exchange(ec.ECDH(), km.__pubkey__())
+
+ # derive the wrapping key
+ z = km.kdf.derive_key(s, km.oid, PubKeyAlgorithm.ECDH, pk.fingerprint)
+
+ # compute C
+ ct.c = aes_key_wrap(z, m, default_backend())
+
+ return ct
+
+ def decrypt(self, pk, *args):
+ km = pk.keymaterial
+ # assemble the public component of ephemeral key v
+ v = ec.EllipticCurvePublicNumbers(self.vX, self.vY, km.oid.curve()).public_key(default_backend())
+
+ # compute s using the inverse of how it was derived during encryption
+ s = km.__privkey__().exchange(ec.ECDH(), v)
+
+ # derive the wrapping key
+ z = km.kdf.derive_key(s, km.oid, PubKeyAlgorithm.ECDH, pk.fingerprint)
+
+ # unwrap and unpad m
+ _m = aes_key_unwrap(z, self.c, default_backend())
+
+ padder = PKCS7(64).unpadder()
+ return padder.update(_m) + padder.finalize()
+
+ def __init__(self):
+ super(ECDHCipherText, self).__init__()
+ self.c = bytearray(0)
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _xy = b'\x04' + self.vX.to_mpibytes()[2:] + self.vY.to_mpibytes()[2:]
+ _bytes += MPI(self.bytes_to_int(_xy, 'big')).to_mpibytes()
+ _bytes.append(len(self.c))
+ _bytes += self.c
+
+ return _bytes
+
+ def parse(self, packet):
+ # self.v = MPI(packet)
+ xy = bytearray(MPI(packet).to_mpibytes()[2:])
+ del xy[:1]
+ xylen = len(xy)
+ x, y = xy[:xylen // 2], xy[xylen // 2:]
+ self.vX = MPI(self.bytes_to_int(x))
+ self.vY = MPI(self.bytes_to_int(y))
+
+ clen = packet[0]
+ del packet[0]
+
+ self.c += packet[:clen]
+ del packet[:clen]
diff --git a/src/leap/mx/vendor/pgpy/packet/packets.py b/src/leap/mx/vendor/pgpy/packet/packets.py
new file mode 100644
index 0000000..38a7200
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/packet/packets.py
@@ -0,0 +1,1605 @@
+""" packet.py
+"""
+import abc
+import binascii
+import calendar
+import copy
+import hashlib
+import os
+import re
+
+from datetime import datetime
+
+import six
+
+from cryptography.hazmat.primitives import constant_time
+from cryptography.hazmat.primitives.asymmetric import padding
+
+from .fields import DSAPriv, DSAPub, DSASignature
+from .fields import ECDSAPub, ECDSAPriv, ECDSASignature
+from .fields import ECDHPub, ECDHPriv, ECDHCipherText
+from .fields import ElGCipherText, ElGPriv, ElGPub
+from .fields import OpaquePubKey
+from .fields import OpaquePrivKey
+from .fields import RSACipherText, RSAPriv, RSAPub, RSASignature
+from .fields import String2Key
+from .fields import SubPackets
+from .fields import UserAttributeSubPackets
+
+from .types import Packet
+from .types import Primary
+from .types import Private
+from .types import Public
+from .types import Sub
+from .types import VersionedPacket
+
+from ..constants import CompressionAlgorithm
+from ..constants import HashAlgorithm
+from ..constants import PubKeyAlgorithm
+from ..constants import SignatureType
+from ..constants import SymmetricKeyAlgorithm
+from ..constants import TrustFlags
+from ..constants import TrustLevel
+
+from ..decorators import sdproperty
+
+from ..errors import PGPDecryptionError
+
+from ..symenc import _decrypt
+from ..symenc import _encrypt
+
+from ..types import Fingerprint
+
+__all__ = ['PKESessionKey',
+ 'PKESessionKeyV3',
+ 'Signature',
+ 'SignatureV4',
+ 'SKESessionKey',
+ 'SKESessionKeyV4',
+ 'OnePassSignature',
+ 'OnePassSignatureV3',
+ 'PrivKey',
+ 'PubKey',
+ 'PubKeyV4',
+ 'PrivKeyV4',
+ 'PrivSubKey',
+ 'PrivSubKeyV4',
+ 'CompressedData',
+ 'SKEData',
+ 'Marker',
+ 'LiteralData',
+ 'Trust',
+ 'UserID',
+ 'PubSubKey',
+ 'PubSubKeyV4',
+ 'UserAttribute',
+ 'IntegrityProtectedSKEData',
+ 'IntegrityProtectedSKEDataV1',
+ 'MDC']
+
+
+class PKESessionKey(VersionedPacket):
+ __typeid__ = 0x01
+ __ver__ = 0
+
+ @abc.abstractmethod
+ def decrypt_sk(self, pk):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def encrypt_sk(self, pk, symalg, symkey):
+ raise NotImplementedError()
+
+
+class PKESessionKeyV3(PKESessionKey):
+ """
+ 5.1. Public-Key Encrypted Session Key Packets (Tag 1)
+
+ A Public-Key Encrypted Session Key packet holds the session key used
+ to encrypt a message. Zero or more Public-Key Encrypted Session Key
+ packets and/or Symmetric-Key Encrypted Session Key packets may
+ precede a Symmetrically Encrypted Data Packet, which holds an
+ encrypted message. The message is encrypted with the session key,
+ and the session key is itself encrypted and stored in the Encrypted
+ Session Key packet(s). The Symmetrically Encrypted Data Packet is
+ preceded by one Public-Key Encrypted Session Key packet for each
+ OpenPGP key to which the message is encrypted. The recipient of the
+ message finds a session key that is encrypted to their public key,
+ decrypts the session key, and then uses the session key to decrypt
+ the message.
+
+ The body of this packet consists of:
+
+ - A one-octet number giving the version number of the packet type.
+ The currently defined value for packet version is 3.
+
+ - An eight-octet number that gives the Key ID of the public key to
+ which the session key is encrypted. If the session key is
+ encrypted to a subkey, then the Key ID of this subkey is used
+ here instead of the Key ID of the primary key.
+
+ - A one-octet number giving the public-key algorithm used.
+
+ - A string of octets that is the encrypted session key. This
+ string takes up the remainder of the packet, and its contents are
+ dependent on the public-key algorithm used.
+
+ Algorithm Specific Fields for RSA encryption
+
+ - multiprecision integer (MPI) of RSA encrypted value m**e mod n.
+
+ Algorithm Specific Fields for Elgamal encryption:
+
+ - MPI of Elgamal (Diffie-Hellman) value g**k mod p.
+
+ - MPI of Elgamal (Diffie-Hellman) value m * y**k mod p.
+
+ The value "m" in the above formulas is derived from the session key
+ as follows. First, the session key is prefixed with a one-octet
+ algorithm identifier that specifies the symmetric encryption
+ algorithm used to encrypt the following Symmetrically Encrypted Data
+ Packet. Then a two-octet checksum is appended, which is equal to the
+ sum of the preceding session key octets, not including the algorithm
+ identifier, modulo 65536. This value is then encoded as described in
+ PKCS#1 block encoding EME-PKCS1-v1_5 in Section 7.2.1 of [RFC3447] to
+ form the "m" value used in the formulas above. See Section 13.1 of
+ this document for notes on OpenPGP's use of PKCS#1.
+
+ Note that when an implementation forms several PKESKs with one
+ session key, forming a message that can be decrypted by several keys,
+ the implementation MUST make a new PKCS#1 encoding for each key.
+
+ An implementation MAY accept or use a Key ID of zero as a "wild card"
+ or "speculative" Key ID. In this case, the receiving implementation
+ would try all available private keys, checking for a valid decrypted
+ session key. This format helps reduce traffic analysis of messages.
+ """
+ __ver__ = 3
+
+ @sdproperty
+ def encrypter(self):
+ return self._encrypter
+
+ @encrypter.register(bytearray)
+ def encrypter_bin(self, val):
+ self._encrypter = binascii.hexlify(val).upper().decode('latin-1')
+
+ @sdproperty
+ def pkalg(self):
+ return self._pkalg
+
+ @pkalg.register(int)
+ @pkalg.register(PubKeyAlgorithm)
+ def pkalg_int(self, val):
+ self._pkalg = PubKeyAlgorithm(val)
+
+ _c = {PubKeyAlgorithm.RSAEncryptOrSign: RSACipherText,
+ PubKeyAlgorithm.RSAEncrypt: RSACipherText,
+ PubKeyAlgorithm.ElGamal: ElGCipherText,
+ PubKeyAlgorithm.FormerlyElGamalEncryptOrSign: ElGCipherText,
+ PubKeyAlgorithm.ECDH: ECDHCipherText}
+
+ ct = _c.get(self._pkalg, None)
+ self.ct = ct() if ct is not None else ct
+
+ def __init__(self):
+ super(PKESessionKeyV3, self).__init__()
+ self.encrypter = bytearray(8)
+ self.pkalg = 0
+ self.ct = None
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(PKESessionKeyV3, self).__bytearray__()
+ _bytes += binascii.unhexlify(self.encrypter.encode())
+ _bytes += bytearray([self.pkalg])
+ _bytes += self.ct.__bytearray__() if self.ct is not None else b'\x00' * (self.header.length - 10)
+ return _bytes
+
+ def __copy__(self):
+ sk = self.__class__()
+ sk.header = copy.copy(self.header)
+ sk._encrypter = self._encrypter
+ sk.pkalg = self.pkalg
+ if self.ct is not None:
+ sk.ct = copy.copy(self.ct)
+
+ return sk
+
+ def decrypt_sk(self, pk):
+ if self.pkalg == PubKeyAlgorithm.RSAEncryptOrSign:
+ # pad up ct with null bytes if necessary
+ ct = self.ct.me_mod_n.to_mpibytes()[2:]
+ ct = b'\x00' * ((pk.keymaterial.__privkey__().key_size // 8) - len(ct)) + ct
+
+ decrypter = pk.keymaterial.__privkey__().decrypt
+ decargs = (ct, padding.PKCS1v15(),)
+
+ elif self.pkalg == PubKeyAlgorithm.ECDH:
+ decrypter = pk
+ decargs = ()
+
+ else:
+ raise NotImplementedError(self.pkalg)
+
+ m = bytearray(self.ct.decrypt(decrypter, *decargs))
+
+ """
+ The value "m" in the above formulas is derived from the session key
+ as follows. First, the session key is prefixed with a one-octet
+ algorithm identifier that specifies the symmetric encryption
+ algorithm used to encrypt the following Symmetrically Encrypted Data
+ Packet. Then a two-octet checksum is appended, which is equal to the
+ sum of the preceding session key octets, not including the algorithm
+ identifier, modulo 65536. This value is then encoded as described in
+ PKCS#1 block encoding EME-PKCS1-v1_5 in Section 7.2.1 of [RFC3447] to
+ form the "m" value used in the formulas above. See Section 13.1 of
+ this document for notes on OpenPGP's use of PKCS#1.
+ """
+
+ symalg = SymmetricKeyAlgorithm(m[0])
+ del m[0]
+
+ symkey = m[:symalg.key_size // 8]
+ del m[:symalg.key_size // 8]
+
+ checksum = self.bytes_to_int(m[:2])
+ del m[:2]
+
+ if not sum(symkey) % 65536 == checksum: # pragma: no cover
+ raise PGPDecryptionError("{:s} decryption failed".format(self.pkalg.name))
+
+ return (symalg, symkey)
+
+ def encrypt_sk(self, pk, symalg, symkey):
+ m = bytearray(self.int_to_bytes(symalg) + symkey)
+ m += self.int_to_bytes(sum(bytearray(symkey)) % 65536, 2)
+
+ if self.pkalg == PubKeyAlgorithm.RSAEncryptOrSign:
+ encrypter = pk.keymaterial.__pubkey__().encrypt
+ encargs = (bytes(m), padding.PKCS1v15(),)
+
+ elif self.pkalg == PubKeyAlgorithm.ECDH:
+ encrypter = pk
+ encargs = (bytes(m),)
+
+ else:
+ raise NotImplementedError(self.pkalg)
+
+ self.ct = self.ct.encrypt(encrypter, *encargs)
+ self.update_hlen()
+
+ def parse(self, packet):
+ super(PKESessionKeyV3, self).parse(packet)
+ self.encrypter = packet[:8]
+ del packet[:8]
+
+ self.pkalg = packet[0]
+ del packet[0]
+
+ if self.ct is not None:
+ self.ct.parse(packet)
+
+ else: # pragma: no cover
+ del packet[:(self.header.length - 18)]
+
+
+class Signature(VersionedPacket):
+ __typeid__ = 0x02
+ __ver__ = 0
+
+
+class SignatureV4(Signature):
+ """
+ 5.2.3. Version 4 Signature Packet Format
+
+ The body of a version 4 Signature packet contains:
+
+ - One-octet version number (4).
+
+ - One-octet signature type.
+
+ - One-octet public-key algorithm.
+
+ - One-octet hash algorithm.
+
+ - Two-octet scalar octet count for following hashed subpacket data.
+ Note that this is the length in octets of all of the hashed
+ subpackets; a pointer incremented by this number will skip over
+ the hashed subpackets.
+
+ - Hashed subpacket data set (zero or more subpackets).
+
+ - Two-octet scalar octet count for the following unhashed subpacket
+ data. Note that this is the length in octets of all of the
+ unhashed subpackets; a pointer incremented by this number will
+ skip over the unhashed subpackets.
+
+ - Unhashed subpacket data set (zero or more subpackets).
+
+ - Two-octet field holding the left 16 bits of the signed hash
+ value.
+
+ - One or more multiprecision integers comprising the signature.
+ This portion is algorithm specific, as described above.
+
+ The concatenation of the data being signed and the signature data
+ from the version number through the hashed subpacket data (inclusive)
+ is hashed. The resulting hash value is what is signed. The left 16
+ bits of the hash are included in the Signature packet to provide a
+ quick test to reject some invalid signatures.
+
+ There are two fields consisting of Signature subpackets. The first
+ field is hashed with the rest of the signature data, while the second
+ is unhashed. The second set of subpackets is not cryptographically
+ protected by the signature and should include only advisory
+ information.
+
+ The algorithms for converting the hash function result to a signature
+ are described in a section below.
+ """
+ __ver__ = 4
+
+ @sdproperty
+ def sigtype(self):
+ return self._sigtype
+
+ @sigtype.register(int)
+ @sigtype.register(SignatureType)
+ def sigtype_int(self, val):
+ self._sigtype = SignatureType(val)
+
+ @sdproperty
+ def pubalg(self):
+ return self._pubalg
+
+ @pubalg.register(int)
+ @pubalg.register(PubKeyAlgorithm)
+ def pubalg_int(self, val):
+ self._pubalg = PubKeyAlgorithm(val)
+
+ sigs = {PubKeyAlgorithm.RSAEncryptOrSign: RSASignature,
+ PubKeyAlgorithm.RSAEncrypt: RSASignature,
+ PubKeyAlgorithm.RSASign: RSASignature,
+ PubKeyAlgorithm.DSA: DSASignature,
+ PubKeyAlgorithm.ECDSA: ECDSASignature, }
+
+ if self.pubalg in sigs:
+ self.signature = sigs[self.pubalg]()
+
+ @sdproperty
+ def halg(self):
+ return self._halg
+
+ @halg.register(int)
+ @halg.register(HashAlgorithm)
+ def halg_int(self, val):
+ try:
+ self._halg = HashAlgorithm(val)
+
+ except ValueError: # pragma: no cover
+ self._halg = val
+
+ @property
+ def signature(self):
+ return self._signature
+
+ @signature.setter
+ def signature(self, val):
+ self._signature = val
+
+ @property
+ def signer(self):
+ return self.subpackets['Issuer'][-1].issuer
+
+ def __init__(self):
+ super(Signature, self).__init__()
+ self._sigtype = None
+ self._pubalg = None
+ self._halg = None
+ self.subpackets = SubPackets()
+ self.hash2 = bytearray(2)
+ self.signature = None
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(Signature, self).__bytearray__()
+ _bytes += self.int_to_bytes(self.sigtype)
+ _bytes += self.int_to_bytes(self.pubalg)
+ _bytes += self.int_to_bytes(self.halg)
+ _bytes += self.subpackets.__bytearray__()
+ _bytes += self.hash2
+ _bytes += self.signature.__bytearray__()
+
+ return _bytes
+
+ def __copy__(self):
+ spkt = SignatureV4()
+ spkt.header = copy.copy(self.header)
+ spkt._sigtype = self._sigtype
+ spkt._pubalg = self._pubalg
+ spkt._halg = self._halg
+
+ spkt.subpackets = copy.copy(self.subpackets)
+ spkt.hash2 = copy.copy(self.hash2)
+ spkt.signature = copy.copy(self.signature)
+
+ return spkt
+
+ def update_hlen(self):
+ self.subpackets.update_hlen()
+ super(SignatureV4, self).update_hlen()
+
+ def parse(self, packet):
+ super(Signature, self).parse(packet)
+ self.sigtype = packet[0]
+ del packet[0]
+
+ self.pubalg = packet[0]
+ del packet[0]
+
+ self.halg = packet[0]
+ del packet[0]
+
+ self.subpackets.parse(packet)
+
+ self.hash2 = packet[:2]
+ del packet[:2]
+
+ self.signature.parse(packet)
+
+
+class SKESessionKey(VersionedPacket):
+ __typeid__ = 0x03
+ __ver__ = 0
+
+ @abc.abstractmethod
+ def decrypt_sk(self, passphrase):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def encrypt_sk(self, passphrase, sk):
+ raise NotImplementedError()
+
+
+class SKESessionKeyV4(SKESessionKey):
+ """
+ 5.3. Symmetric-Key Encrypted Session Key Packets (Tag 3)
+
+ The Symmetric-Key Encrypted Session Key packet holds the
+ symmetric-key encryption of a session key used to encrypt a message.
+ Zero or more Public-Key Encrypted Session Key packets and/or
+ Symmetric-Key Encrypted Session Key packets may precede a
+ Symmetrically Encrypted Data packet that holds an encrypted message.
+ The message is encrypted with a session key, and the session key is
+ itself encrypted and stored in the Encrypted Session Key packet or
+ the Symmetric-Key Encrypted Session Key packet.
+
+ If the Symmetrically Encrypted Data packet is preceded by one or
+ more Symmetric-Key Encrypted Session Key packets, each specifies a
+ passphrase that may be used to decrypt the message. This allows a
+ message to be encrypted to a number of public keys, and also to one
+ or more passphrases. This packet type is new and is not generated
+ by PGP 2.x or PGP 5.0.
+
+ The body of this packet consists of:
+
+ - A one-octet version number. The only currently defined version
+ is 4.
+
+ - A one-octet number describing the symmetric algorithm used.
+
+ - A string-to-key (S2K) specifier, length as defined above.
+
+ - Optionally, the encrypted session key itself, which is decrypted
+ with the string-to-key object.
+
+ If the encrypted session key is not present (which can be detected
+ on the basis of packet length and S2K specifier size), then the S2K
+ algorithm applied to the passphrase produces the session key for
+ decrypting the file, using the symmetric cipher algorithm from the
+ Symmetric-Key Encrypted Session Key packet.
+
+ If the encrypted session key is present, the result of applying the
+ S2K algorithm to the passphrase is used to decrypt just that
+ encrypted session key field, using CFB mode with an IV of all zeros.
+ The decryption result consists of a one-octet algorithm identifier
+ that specifies the symmetric-key encryption algorithm used to
+ encrypt the following Symmetrically Encrypted Data packet, followed
+ by the session key octets themselves.
+
+ Note: because an all-zero IV is used for this decryption, the S2K
+ specifier MUST use a salt value, either a Salted S2K or an
+ Iterated-Salted S2K. The salt value will ensure that the decryption
+ key is not repeated even if the passphrase is reused.
+ """
+ __ver__ = 4
+
+ @property
+ def symalg(self):
+ return self.s2k.encalg
+
+ def __init__(self):
+ super(SKESessionKeyV4, self).__init__()
+ self.s2k = String2Key()
+ self.ct = bytearray()
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(SKESessionKeyV4, self).__bytearray__()
+ _bytes += self.s2k.__bytearray__()[1:]
+ _bytes += self.ct
+ return _bytes
+
+ def __copy__(self):
+ sk = self.__class__()
+ sk.header = copy.copy(self.header)
+ sk.s2k = copy.copy(self.s2k)
+ sk.ct = self.ct[:]
+
+ return sk
+
+ def parse(self, packet):
+ super(SKESessionKeyV4, self).parse(packet)
+ # prepend a valid usage identifier so this parses correctly
+ packet.insert(0, 255)
+ self.s2k.parse(packet, iv=False)
+
+ ctend = self.header.length - len(self.s2k)
+ self.ct = packet[:ctend]
+ del packet[:ctend]
+
+ def decrypt_sk(self, passphrase):
+ # derive the first session key from our passphrase
+ sk = self.s2k.derive_key(passphrase)
+ del passphrase
+
+ # if there is no ciphertext, then the first session key is the session key being used
+ if len(self.ct) == 0:
+ return self.symalg, sk
+
+ # otherwise, we now need to decrypt the encrypted session key
+ m = bytearray(_decrypt(bytes(self.ct), sk, self.symalg))
+ del sk
+
+ symalg = SymmetricKeyAlgorithm(m[0])
+ del m[0]
+
+ return symalg, bytes(m)
+
+ def encrypt_sk(self, passphrase, sk):
+ # generate the salt and derive the key to encrypt sk with from it
+ self.s2k.salt = bytearray(os.urandom(8))
+ esk = self.s2k.derive_key(passphrase)
+ del passphrase
+
+ self.ct = _encrypt(self.int_to_bytes(self.symalg) + sk, esk, self.symalg)
+
+ # update header length and return sk
+ self.update_hlen()
+
+
+class OnePassSignature(VersionedPacket):
+ __typeid__ = 0x04
+ __ver__ = 0
+
+
+class OnePassSignatureV3(OnePassSignature):
+ """
+ 5.4. One-Pass Signature Packets (Tag 4)
+
+ The One-Pass Signature packet precedes the signed data and contains
+ enough information to allow the receiver to begin calculating any
+ hashes needed to verify the signature. It allows the Signature
+ packet to be placed at the end of the message, so that the signer
+ can compute the entire signed message in one pass.
+
+ A One-Pass Signature does not interoperate with PGP 2.6.x or
+ earlier.
+
+ The body of this packet consists of:
+
+ - A one-octet version number. The current version is 3.
+
+ - A one-octet signature type. Signature types are described in
+ Section 5.2.1.
+
+ - A one-octet number describing the hash algorithm used.
+
+ - A one-octet number describing the public-key algorithm used.
+
+ - An eight-octet number holding the Key ID of the signing key.
+
+ - A one-octet number holding a flag showing whether the signature
+ is nested. A zero value indicates that the next packet is
+ another One-Pass Signature packet that describes another
+ signature to be applied to the same message data.
+
+ Note that if a message contains more than one one-pass signature,
+ then the Signature packets bracket the message; that is, the first
+ Signature packet after the message corresponds to the last one-pass
+ packet and the final Signature packet corresponds to the first
+ one-pass packet.
+ """
+ __ver__ = 3
+
+ @sdproperty
+ def sigtype(self):
+ return self._sigtype
+
+ @sigtype.register(int)
+ @sigtype.register(SignatureType)
+ def sigtype_int(self, val):
+ self._sigtype = SignatureType(val)
+
+ @sdproperty
+ def pubalg(self):
+ return self._pubalg
+
+ @pubalg.register(int)
+ @pubalg.register(PubKeyAlgorithm)
+ def pubalg_int(self, val):
+ self._pubalg = PubKeyAlgorithm(val)
+ if self._pubalg in [PubKeyAlgorithm.RSAEncryptOrSign, PubKeyAlgorithm.RSAEncrypt, PubKeyAlgorithm.RSASign]:
+ self.signature = RSASignature()
+
+ elif self._pubalg == PubKeyAlgorithm.DSA:
+ self.signature = DSASignature()
+
+ @sdproperty
+ def halg(self):
+ return self._halg
+
+ @halg.register(int)
+ @halg.register(HashAlgorithm)
+ def halg_int(self, val):
+ try:
+ self._halg = HashAlgorithm(val)
+
+ except ValueError: # pragma: no cover
+ self._halg = val
+
+ @sdproperty
+ def signer(self):
+ return self._signer
+
+ @signer.register(str)
+ @signer.register(six.text_type)
+ def signer_str(self, val):
+ self._signer = val
+
+ @signer.register(bytearray)
+ def signer_bin(self, val):
+ self._signer = binascii.hexlify(val).upper().decode('latin-1')
+
+ def __init__(self):
+ super(OnePassSignatureV3, self).__init__()
+ self._sigtype = None
+ self._halg = None
+ self._pubalg = None
+ self._signer = b'\x00' * 8
+ self.nested = False
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(OnePassSignatureV3, self).__bytearray__()
+ _bytes += bytearray([self.sigtype])
+ _bytes += bytearray([self.halg])
+ _bytes += bytearray([self.pubalg])
+ _bytes += binascii.unhexlify(six.b(self.signer))
+ _bytes += bytearray([int(self.nested)])
+ return _bytes
+
+ def parse(self, packet):
+ super(OnePassSignatureV3, self).parse(packet)
+ self.sigtype = packet[0]
+ del packet[0]
+
+ self.halg = packet[0]
+ del packet[0]
+
+ self.pubalg = packet[0]
+ del packet[0]
+
+ self.signer = packet[:8]
+ del packet[:8]
+
+ self.nested = (packet[0] == 1)
+ del packet[0]
+
+
+class PrivKey(VersionedPacket, Primary, Private):
+ __typeid__ = 0x05
+ __ver__ = 0
+
+
+class PubKey(VersionedPacket, Primary, Public):
+ __typeid__ = 0x06
+ __ver__ = 0
+
+ @abc.abstractproperty
+ def fingerprint(self):
+ """compute and return the fingerprint of the key"""
+
+
+class PubKeyV4(PubKey):
+ __ver__ = 4
+
+ @sdproperty
+ def created(self):
+ return self._created
+
+ @created.register(datetime)
+ def created_datetime(self, val):
+ self._created = val
+
+ @created.register(int)
+ def created_int(self, val):
+ self.created = datetime.utcfromtimestamp(val)
+
+ @created.register(bytes)
+ @created.register(bytearray)
+ def created_bin(self, val):
+ self.created = self.bytes_to_int(val)
+
+ @sdproperty
+ def pkalg(self):
+ return self._pkalg
+
+ @pkalg.register(int)
+ @pkalg.register(PubKeyAlgorithm)
+ def pkalg_int(self, val):
+ self._pkalg = PubKeyAlgorithm(val)
+
+ _c = {
+ # True means public
+ (True, PubKeyAlgorithm.RSAEncryptOrSign): RSAPub,
+ (True, PubKeyAlgorithm.RSAEncrypt): RSAPub,
+ (True, PubKeyAlgorithm.RSASign): RSAPub,
+ (True, PubKeyAlgorithm.DSA): DSAPub,
+ (True, PubKeyAlgorithm.ElGamal): ElGPub,
+ (True, PubKeyAlgorithm.FormerlyElGamalEncryptOrSign): ElGPub,
+ (True, PubKeyAlgorithm.ECDSA): ECDSAPub,
+ (True, PubKeyAlgorithm.ECDH): ECDHPub,
+ # False means private
+ (False, PubKeyAlgorithm.RSAEncryptOrSign): RSAPriv,
+ (False, PubKeyAlgorithm.RSAEncrypt): RSAPriv,
+ (False, PubKeyAlgorithm.RSASign): RSAPriv,
+ (False, PubKeyAlgorithm.DSA): DSAPriv,
+ (False, PubKeyAlgorithm.ElGamal): ElGPriv,
+ (False, PubKeyAlgorithm.FormerlyElGamalEncryptOrSign): ElGPriv,
+ (False, PubKeyAlgorithm.ECDSA): ECDSAPriv,
+ (False, PubKeyAlgorithm.ECDH): ECDHPriv,
+ }
+
+ k = (self.public, self.pkalg)
+ km = _c.get(k, None)
+
+ self.keymaterial = (km or (OpaquePubKey if self.public else OpaquePrivKey))()
+
+ # km = _c.get(k, None)
+ # self.keymaterial = km() if km is not None else km
+
+ @property
+ def public(self):
+ return isinstance(self, PubKey) and not isinstance(self, PrivKey)
+
+ @property
+ def fingerprint(self):
+ # A V4 fingerprint is the 160-bit SHA-1 hash of the octet 0x99, followed by the two-octet packet length,
+ # followed by the entire Public-Key packet starting with the version field. The Key ID is the
+ # low-order 64 bits of the fingerprint.
+ fp = hashlib.new('sha1')
+
+ plen = self.keymaterial.publen()
+ bcde_len = self.int_to_bytes(6 + plen, 2)
+
+ # a.1) 0x99 (1 octet)
+ # a.2) high-order length octet
+ # a.3) low-order length octet
+ fp.update(b'\x99' + bcde_len[:1] + bcde_len[-1:])
+ # b) version number = 4 (1 octet);
+ fp.update(b'\x04')
+ # c) timestamp of key creation (4 octets);
+ fp.update(self.int_to_bytes(calendar.timegm(self.created.timetuple()), 4))
+ # d) algorithm (1 octet): 17 = DSA (example);
+ fp.update(self.int_to_bytes(self.pkalg))
+ # e) Algorithm-specific fields.
+ fp.update(self.keymaterial.__bytearray__()[:plen])
+
+ # and return the digest
+ return Fingerprint(fp.hexdigest().upper())
+
+ def __init__(self):
+ super(PubKeyV4, self).__init__()
+ self.created = datetime.utcnow()
+ self.pkalg = 0
+ self.keymaterial = None
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(PubKeyV4, self).__bytearray__()
+ _bytes += self.int_to_bytes(calendar.timegm(self.created.timetuple()), 4)
+ _bytes += self.int_to_bytes(self.pkalg)
+ _bytes += self.keymaterial.__bytearray__()
+ return _bytes
+
+ def __copy__(self):
+ pk = self.__class__()
+ pk.header = copy.copy(self.header)
+ pk.created = self.created
+ pk.pkalg = self.pkalg
+ pk.keymaterial = copy.copy(self.keymaterial)
+
+ return pk
+
+ def verify(self, subj, sigbytes, hash_alg):
+ return self.keymaterial.verify(subj, sigbytes, hash_alg)
+
+ def parse(self, packet):
+ super(PubKeyV4, self).parse(packet)
+
+ self.created = packet[:4]
+ del packet[:4]
+
+ self.pkalg = packet[0]
+ del packet[0]
+
+ # bound keymaterial to the remaining length of the packet
+ pend = self.header.length - 6
+ self.keymaterial.parse(packet[:pend])
+ del packet[:pend]
+
+
+class PrivKeyV4(PrivKey, PubKeyV4):
+ __ver__ = 4
+
+ @classmethod
+ def new(cls, key_algorithm, key_size):
+ # build a key packet
+ pk = PrivKeyV4()
+ pk.pkalg = key_algorithm
+ if pk.keymaterial is None:
+ raise NotImplementedError(key_algorithm)
+ pk.keymaterial._generate(key_size)
+ pk.update_hlen()
+ return pk
+
+ def pubkey(self):
+ # return a copy of ourselves, but just the public half
+ pk = PubKeyV4() if not isinstance(self, PrivSubKeyV4) else PubSubKeyV4()
+ pk.created = self.created
+ pk.pkalg = self.pkalg
+
+ # copy over MPIs
+ for pm in self.keymaterial.__pubfields__:
+ setattr(pk.keymaterial, pm, copy.copy(getattr(self.keymaterial, pm)))
+
+ if self.pkalg == PubKeyAlgorithm.ECDSA:
+ pk.keymaterial.oid = self.keymaterial.oid
+
+ if self.pkalg == PubKeyAlgorithm.ECDH:
+ pk.keymaterial.oid = self.keymaterial.oid
+ pk.keymaterial.kdf = copy.copy(self.keymaterial.kdf)
+
+ pk.update_hlen()
+ return pk
+
+ @property
+ def protected(self):
+ return bool(self.keymaterial.s2k)
+
+ @property
+ def unlocked(self):
+ if self.protected:
+ return 0 not in list(self.keymaterial)
+ return True # pragma: no cover
+
+ def protect(self, passphrase, enc_alg, hash_alg):
+ self.keymaterial.encrypt_keyblob(passphrase, enc_alg, hash_alg)
+ del passphrase
+ self.update_hlen()
+
+ def unprotect(self, passphrase):
+ self.keymaterial.decrypt_keyblob(passphrase)
+ del passphrase
+
+ def sign(self, sigdata, hash_alg):
+ return self.keymaterial.sign(sigdata, hash_alg)
+
+
+class PrivSubKey(VersionedPacket, Sub, Private):
+ __typeid__ = 0x07
+ __ver__ = 0
+
+
+class PrivSubKeyV4(PrivSubKey, PrivKeyV4):
+ __ver__ = 4
+
+
+class CompressedData(Packet):
+ """
+ 5.6. Compressed Data Packet (Tag 8)
+
+ The Compressed Data packet contains compressed data. Typically, this
+ packet is found as the contents of an encrypted packet, or following
+ a Signature or One-Pass Signature packet, and contains a literal data
+ packet.
+
+ The body of this packet consists of:
+
+ - One octet that gives the algorithm used to compress the packet.
+
+ - Compressed data, which makes up the remainder of the packet.
+
+ A Compressed Data Packet's body contains an block that compresses
+ some set of packets. See section "Packet Composition" for details on
+ how messages are formed.
+
+ ZIP-compressed packets are compressed with raw RFC 1951 [RFC1951]
+ DEFLATE blocks. Note that PGP V2.6 uses 13 bits of compression. If
+ an implementation uses more bits of compression, PGP V2.6 cannot
+ decompress it.
+
+ ZLIB-compressed packets are compressed with RFC 1950 [RFC1950] ZLIB-
+ style blocks.
+
+ BZip2-compressed packets are compressed using the BZip2 [BZ2]
+ algorithm.
+ """
+ __typeid__ = 0x08
+
+ @sdproperty
+ def calg(self):
+ return self._calg
+
+ @calg.register(int)
+ @calg.register(CompressionAlgorithm)
+ def calg_int(self, val):
+ self._calg = CompressionAlgorithm(val)
+
+ def __init__(self):
+ super(CompressedData, self).__init__()
+ self._calg = None
+ self.packets = []
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(CompressedData, self).__bytearray__()
+ _bytes += bytearray([self.calg])
+
+ _pb = bytearray()
+ for pkt in self.packets:
+ _pb += pkt.__bytearray__()
+ _bytes += self.calg.compress(bytes(_pb))
+
+ return _bytes
+
+ def parse(self, packet):
+ super(CompressedData, self).parse(packet)
+ self.calg = packet[0]
+ del packet[0]
+
+ cdata = bytearray(self.calg.decompress(packet[:self.header.length - 1]))
+ del packet[:self.header.length - 1]
+
+ while len(cdata) > 0:
+ self.packets.append(Packet(cdata))
+
+
+class SKEData(Packet):
+ """
+ 5.7. Symmetrically Encrypted Data Packet (Tag 9)
+
+ The Symmetrically Encrypted Data packet contains data encrypted with
+ a symmetric-key algorithm. When it has been decrypted, it contains
+ other packets (usually a literal data packet or compressed data
+ packet, but in theory other Symmetrically Encrypted Data packets or
+ sequences of packets that form whole OpenPGP messages).
+
+ The body of this packet consists of:
+
+ - Encrypted data, the output of the selected symmetric-key cipher
+ operating in OpenPGP's variant of Cipher Feedback (CFB) mode.
+
+ The symmetric cipher used may be specified in a Public-Key or
+ Symmetric-Key Encrypted Session Key packet that precedes the
+ Symmetrically Encrypted Data packet. In that case, the cipher
+ algorithm octet is prefixed to the session key before it is
+ encrypted. If no packets of these types precede the encrypted data,
+ the IDEA algorithm is used with the session key calculated as the MD5
+ hash of the passphrase, though this use is deprecated.
+
+ The data is encrypted in CFB mode, with a CFB shift size equal to the
+ cipher's block size. The Initial Vector (IV) is specified as all
+ zeros. Instead of using an IV, OpenPGP prefixes a string of length
+ equal to the block size of the cipher plus two to the data before it
+ is encrypted. The first block-size octets (for example, 8 octets for
+ a 64-bit block length) are random, and the following two octets are
+ copies of the last two octets of the IV. For example, in an 8-octet
+ block, octet 9 is a repeat of octet 7, and octet 10 is a repeat of
+ octet 8. In a cipher of length 16, octet 17 is a repeat of octet 15
+ and octet 18 is a repeat of octet 16. As a pedantic clarification,
+ in both these examples, we consider the first octet to be numbered 1.
+
+ After encrypting the first block-size-plus-two octets, the CFB state
+ is resynchronized. The last block-size octets of ciphertext are
+ passed through the cipher and the block boundary is reset.
+
+ The repetition of 16 bits in the random data prefixed to the message
+ allows the receiver to immediately check whether the session key is
+ incorrect. See the "Security Considerations" section for hints on
+ the proper use of this "quick check".
+ """
+ __typeid__ = 0x09
+
+ def __init__(self):
+ super(SKEData, self).__init__()
+ self.ct = bytearray()
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(SKEData, self).__bytearray__()
+ _bytes += self.ct
+ return _bytes
+
+ def __copy__(self):
+ skd = self.__class__()
+ skd.ct = self.ct[:]
+ return skd
+
+ def parse(self, packet):
+ super(SKEData, self).parse(packet)
+ self.ct = packet[:self.header.length]
+ del packet[:self.header.length]
+
+ def decrypt(self, key, alg): # pragma: no cover
+ pt = _decrypt(bytes(self.ct), bytes(key), alg)
+
+ iv = bytes(pt[:alg.block_size // 8])
+ del pt[:alg.block_size // 8]
+
+ ivl2 = bytes(pt[:2])
+ del pt[:2]
+
+ if not constant_time.bytes_eq(iv[-2:], ivl2):
+ raise PGPDecryptionError("Decryption failed")
+
+ return pt
+
+
+class Marker(Packet):
+ __typeid__ = 0x0a
+
+ def __init__(self):
+ super(Marker, self).__init__()
+ self.data = b'PGP'
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(Marker, self).__bytearray__()
+ _bytes += self.data
+ return _bytes
+
+ def parse(self, packet):
+ super(Marker, self).parse(packet)
+ self.data = packet[:self.header.length]
+ del packet[:self.header.length]
+
+
+class LiteralData(Packet):
+ """
+ 5.9. Literal Data Packet (Tag 11)
+
+ A Literal Data packet contains the body of a message; data that is
+ not to be further interpreted.
+
+ The body of this packet consists of:
+
+ - A one-octet field that describes how the data is formatted.
+
+ If it is a 'b' (0x62), then the Literal packet contains binary data.
+ If it is a 't' (0x74), then it contains text data, and thus may need
+ line ends converted to local form, or other text-mode changes. The
+ tag 'u' (0x75) means the same as 't', but also indicates that
+ implementation believes that the literal data contains UTF-8 text.
+
+ Early versions of PGP also defined a value of 'l' as a 'local' mode
+ for machine-local conversions. RFC 1991 [RFC1991] incorrectly stated
+ this local mode flag as '1' (ASCII numeral one). Both of these local
+ modes are deprecated.
+
+ - File name as a string (one-octet length, followed by a file
+ name). This may be a zero-length string. Commonly, if the
+ source of the encrypted data is a file, this will be the name of
+ the encrypted file. An implementation MAY consider the file name
+ in the Literal packet to be a more authoritative name than the
+ actual file name.
+
+ If the special name "_CONSOLE" is used, the message is considered to
+ be "for your eyes only". This advises that the message data is
+ unusually sensitive, and the receiving program should process it more
+ carefully, perhaps avoiding storing the received data to disk, for
+ example.
+
+ - A four-octet number that indicates a date associated with the
+ literal data. Commonly, the date might be the modification date
+ of a file, or the time the packet was created, or a zero that
+ indicates no specific time.
+
+ - The remainder of the packet is literal data.
+
+ Text data is stored with <CR><LF> text endings (i.e., network-
+ normal line endings). These should be converted to native line
+ endings by the receiving software.
+ """
+ __typeid__ = 0x0B
+
+ @sdproperty
+ def mtime(self):
+ return self._mtime
+
+ @mtime.register(datetime)
+ def mtime_datetime(self, val):
+ self._mtime = val
+
+ @mtime.register(int)
+ def mtime_int(self, val):
+ self.mtime = datetime.utcfromtimestamp(val)
+
+ @mtime.register(bytes)
+ @mtime.register(bytearray)
+ def mtime_bin(self, val):
+ self.mtime = self.bytes_to_int(val)
+
+ @property
+ def contents(self):
+ if self.format == 't':
+ return self._contents.decode('latin-1')
+
+ if self.format == 'u':
+ return self._contents.decode('utf-8')
+
+ return self._contents
+
+ def __init__(self):
+ super(LiteralData, self).__init__()
+ self.format = 'b'
+ self.filename = ''
+ self.mtime = datetime.utcnow()
+ self._contents = bytearray()
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(LiteralData, self).__bytearray__()
+ _bytes += self.format.encode('latin-1')
+ _bytes += bytearray([len(self.filename)])
+ _bytes += self.filename.encode('latin-1')
+ _bytes += self.int_to_bytes(calendar.timegm(self.mtime.timetuple()), 4)
+ _bytes += self._contents
+ return _bytes
+
+ def __copy__(self):
+ pkt = LiteralData()
+ pkt.header = copy.copy(self.header)
+ pkt.format = self.format
+ pkt.filename = self.filename
+ pkt.mtime = self.mtime
+ pkt._contents = self._contents[:]
+
+ return pkt
+
+ def parse(self, packet):
+ super(LiteralData, self).parse(packet)
+ self.format = chr(packet[0])
+ del packet[0]
+
+ fnl = packet[0]
+ del packet[0]
+
+ self.filename = packet[:fnl].decode()
+ del packet[:fnl]
+
+ self.mtime = packet[:4]
+ del packet[:4]
+
+ self._contents = packet[:self.header.length - (6 + fnl)]
+ del packet[:self.header.length - (6 + fnl)]
+
+
+class Trust(Packet):
+ """
+ 5.10. Trust Packet (Tag 12)
+
+ The Trust packet is used only within keyrings and is not normally
+ exported. Trust packets contain data that record the user's
+ specifications of which key holders are trustworthy introducers,
+ along with other information that implementing software uses for
+ trust information. The format of Trust packets is defined by a given
+ implementation.
+
+ Trust packets SHOULD NOT be emitted to output streams that are
+ transferred to other users, and they SHOULD be ignored on any input
+ other than local keyring files.
+ """
+ __typeid__ = 0x0C
+
+ @sdproperty
+ def trustlevel(self):
+ return self._trustlevel
+
+ @trustlevel.register(int)
+ @trustlevel.register(TrustLevel)
+ def trustlevel_int(self, val):
+ self._trustlevel = TrustLevel(val & 0x0F)
+
+ @sdproperty
+ def trustflags(self):
+ return self._trustflags
+
+ @trustflags.register(list)
+ def trustflags_list(self, val):
+ self._trustflags = val
+
+ @trustflags.register(int)
+ def trustflags_int(self, val):
+ self._trustflags = TrustFlags & val
+
+ def __init__(self):
+ super(Trust, self).__init__()
+ self.trustlevel = TrustLevel.Unknown
+ self.trustflags = []
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(Trust, self).__bytearray__()
+ _bytes += self.int_to_bytes(self.trustlevel + sum(self.trustflags), 2)
+ return _bytes
+
+ def parse(self, packet):
+ super(Trust, self).parse(packet)
+ # self.trustlevel = packet[0] & 0x1f
+ t = self.bytes_to_int(packet[:2])
+ del packet[:2]
+
+ self.trustlevel = t
+ self.trustflags = t
+
+
+class UserID(Packet):
+ """
+ 5.11. User ID Packet (Tag 13)
+
+ A User ID packet consists of UTF-8 text that is intended to represent
+ the name and email address of the key holder. By convention, it
+ includes an RFC 2822 [RFC2822] mail name-addr, but there are no
+ restrictions on its content. The packet length in the header
+ specifies the length of the User ID.
+ """
+ __typeid__ = 0x0D
+
+ def __init__(self):
+ super(UserID, self).__init__()
+ self.name = ""
+ self.comment = ""
+ self.email = ""
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(UserID, self).__bytearray__()
+ _bytes += self.text_to_bytes(self.name)
+ if self.comment:
+ _bytes += b' (' + self.text_to_bytes(self.comment) + b')'
+
+ if self.email:
+ _bytes += b' <' + self.text_to_bytes(self.email) + b'>'
+
+ return _bytes
+
+ def __copy__(self):
+ uid = UserID()
+ uid.header = copy.copy(self.header)
+ uid.name = self.name
+ uid.comment = self.comment
+ uid.email = self.email
+ return uid
+
+ def parse(self, packet):
+ super(UserID, self).parse(packet)
+
+ uid_text = packet[:self.header.length].decode('latin-1')
+ del packet[:self.header.length]
+
+ # came across a UID packet with no payload. If that happens, don't bother trying to parse anything!
+ if self.header.length > 0:
+ uid = re.match(r"""^
+ # name should always match something
+ (?P<name>.+?)
+ # comment *optionally* matches text in parens following name
+ # this should never come after email and must be followed immediately by
+ # either the email field, or the end of the packet.
+ (\ \((?P<comment>.+?)\)(?=(\ <|$)))?
+ # email *optionally* matches text in angle brackets following name or comment
+ # this should never come before a comment, if comment exists,
+ # but can immediately follow name if comment does not exist
+ (\ <(?P<email>.+)>)?
+ $
+ """, uid_text, flags=re.VERBOSE).groupdict()
+
+ self.name = uid['name']
+ self.comment = uid['comment'] or ""
+ self.email = uid['email'] or ""
+
+
+class PubSubKey(VersionedPacket, Sub, Public):
+ __typeid__ = 0x0E
+ __ver__ = 0
+
+
+class PubSubKeyV4(PubSubKey, PubKeyV4):
+ __ver__ = 4
+
+
+class UserAttribute(Packet):
+ """
+ 5.12. User Attribute Packet (Tag 17)
+
+ The User Attribute packet is a variation of the User ID packet. It
+ is capable of storing more types of data than the User ID packet,
+ which is limited to text. Like the User ID packet, a User Attribute
+ packet may be certified by the key owner ("self-signed") or any other
+ key owner who cares to certify it. Except as noted, a User Attribute
+ packet may be used anywhere that a User ID packet may be used.
+
+ While User Attribute packets are not a required part of the OpenPGP
+ standard, implementations SHOULD provide at least enough
+ compatibility to properly handle a certification signature on the
+ User Attribute packet. A simple way to do this is by treating the
+ User Attribute packet as a User ID packet with opaque contents, but
+ an implementation may use any method desired.
+
+ The User Attribute packet is made up of one or more attribute
+ subpackets. Each subpacket consists of a subpacket header and a
+ body. The header consists of:
+
+ - the subpacket length (1, 2, or 5 octets)
+
+ - the subpacket type (1 octet)
+
+ and is followed by the subpacket specific data.
+
+ The only currently defined subpacket type is 1, signifying an image.
+ An implementation SHOULD ignore any subpacket of a type that it does
+ not recognize. Subpacket types 100 through 110 are reserved for
+ private or experimental use.
+ """
+ __typeid__ = 0x11
+
+ @property
+ def image(self):
+ if 'Image' not in self.subpackets:
+ self.subpackets.addnew('Image')
+ return next(iter(self.subpackets['Image']))
+
+ def __init__(self):
+ super(UserAttribute, self).__init__()
+ self.subpackets = UserAttributeSubPackets()
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(UserAttribute, self).__bytearray__()
+ _bytes += self.subpackets.__bytearray__()
+ return _bytes
+
+ def parse(self, packet):
+ super(UserAttribute, self).parse(packet)
+
+ plen = len(packet)
+ while self.header.length > (plen - len(packet)):
+ self.subpackets.parse(packet)
+
+ def update_hlen(self):
+ self.subpackets.update_hlen()
+ super(UserAttribute, self).update_hlen()
+
+
+class IntegrityProtectedSKEData(VersionedPacket):
+ __typeid__ = 0x12
+ __ver__ = 0
+
+
+class IntegrityProtectedSKEDataV1(IntegrityProtectedSKEData):
+ """
+ 5.13. Sym. Encrypted Integrity Protected Data Packet (Tag 18)
+
+ The Symmetrically Encrypted Integrity Protected Data packet is a
+ variant of the Symmetrically Encrypted Data packet. It is a new
+ feature created for OpenPGP that addresses the problem of detecting a
+ modification to encrypted data. It is used in combination with a
+ Modification Detection Code packet.
+
+ There is a corresponding feature in the features Signature subpacket
+ that denotes that an implementation can properly use this packet
+ type. An implementation MUST support decrypting these packets and
+ SHOULD prefer generating them to the older Symmetrically Encrypted
+ Data packet when possible. Since this data packet protects against
+ modification attacks, this standard encourages its proliferation.
+ While blanket adoption of this data packet would create
+ interoperability problems, rapid adoption is nevertheless important.
+ An implementation SHOULD specifically denote support for this packet,
+ but it MAY infer it from other mechanisms.
+
+ For example, an implementation might infer from the use of a cipher
+ such as Advanced Encryption Standard (AES) or Twofish that a user
+ supports this feature. It might place in the unhashed portion of
+ another user's key signature a Features subpacket. It might also
+ present a user with an opportunity to regenerate their own self-
+ signature with a Features subpacket.
+
+ This packet contains data encrypted with a symmetric-key algorithm
+ and protected against modification by the SHA-1 hash algorithm. When
+ it has been decrypted, it will typically contain other packets (often
+ a Literal Data packet or Compressed Data packet). The last decrypted
+ packet in this packet's payload MUST be a Modification Detection Code
+ packet.
+
+ The body of this packet consists of:
+
+ - A one-octet version number. The only currently defined value is
+ 1.
+
+ - Encrypted data, the output of the selected symmetric-key cipher
+ operating in Cipher Feedback mode with shift amount equal to the
+ block size of the cipher (CFB-n where n is the block size).
+
+ The symmetric cipher used MUST be specified in a Public-Key or
+ Symmetric-Key Encrypted Session Key packet that precedes the
+ Symmetrically Encrypted Data packet. In either case, the cipher
+ algorithm octet is prefixed to the session key before it is
+ encrypted.
+
+ The data is encrypted in CFB mode, with a CFB shift size equal to the
+ cipher's block size. The Initial Vector (IV) is specified as all
+ zeros. Instead of using an IV, OpenPGP prefixes an octet string to
+ the data before it is encrypted. The length of the octet string
+ equals the block size of the cipher in octets, plus two. The first
+ octets in the group, of length equal to the block size of the cipher,
+ are random; the last two octets are each copies of their 2nd
+ preceding octet. For example, with a cipher whose block size is 128
+ bits or 16 octets, the prefix data will contain 16 random octets,
+ then two more octets, which are copies of the 15th and 16th octets,
+ respectively. Unlike the Symmetrically Encrypted Data Packet, no
+ special CFB resynchronization is done after encrypting this prefix
+ data. See "OpenPGP CFB Mode" below for more details.
+
+ The repetition of 16 bits in the random data prefixed to the message
+ allows the receiver to immediately check whether the session key is
+ incorrect.
+
+ The plaintext of the data to be encrypted is passed through the SHA-1
+ hash function, and the result of the hash is appended to the
+ plaintext in a Modification Detection Code packet. The input to the
+ hash function includes the prefix data described above; it includes
+ all of the plaintext, and then also includes two octets of values
+ 0xD3, 0x14. These represent the encoding of a Modification Detection
+ Code packet tag and length field of 20 octets.
+
+ The resulting hash value is stored in a Modification Detection Code
+ (MDC) packet, which MUST use the two octet encoding just given to
+ represent its tag and length field. The body of the MDC packet is
+ the 20-octet output of the SHA-1 hash.
+
+ The Modification Detection Code packet is appended to the plaintext
+ and encrypted along with the plaintext using the same CFB context.
+
+ During decryption, the plaintext data should be hashed with SHA-1,
+ including the prefix data as well as the packet tag and length field
+ of the Modification Detection Code packet. The body of the MDC
+ packet, upon decryption, is compared with the result of the SHA-1
+ hash.
+
+ Any failure of the MDC indicates that the message has been modified
+ and MUST be treated as a security problem. Failures include a
+ difference in the hash values, but also the absence of an MDC packet,
+ or an MDC packet in any position other than the end of the plaintext.
+ Any failure SHOULD be reported to the user.
+
+ Note: future designs of new versions of this packet should consider
+ rollback attacks since it will be possible for an attacker to change
+ the version back to 1.
+ """
+ __ver__ = 1
+
+ def __init__(self):
+ super(IntegrityProtectedSKEDataV1, self).__init__()
+ self.ct = bytearray()
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ _bytes += super(IntegrityProtectedSKEDataV1, self).__bytearray__()
+ _bytes += self.ct
+ return _bytes
+
+ def __copy__(self):
+ skd = self.__class__()
+ skd.ct = self.ct[:]
+ return skd
+
+ def parse(self, packet):
+ super(IntegrityProtectedSKEDataV1, self).parse(packet)
+ self.ct = packet[:self.header.length - 1]
+ del packet[:self.header.length - 1]
+
+ def encrypt(self, key, alg, data):
+ iv = alg.gen_iv()
+ data = iv + iv[-2:] + data
+
+ mdc = MDC()
+ mdc.mdc = binascii.hexlify(hashlib.new('SHA1', data + b'\xd3\x14').digest())
+ mdc.update_hlen()
+
+ data += mdc.__bytes__()
+ self.ct = _encrypt(data, key, alg)
+ self.update_hlen()
+
+ def decrypt(self, key, alg):
+ # iv, ivl2, pt = super(IntegrityProtectedSKEDataV1, self).decrypt(key, alg)
+ pt = _decrypt(bytes(self.ct), bytes(key), alg)
+
+ # do the MDC checks
+ _expected_mdcbytes = b'\xd3\x14' + hashlib.new('SHA1', pt[:-20]).digest()
+ if not constant_time.bytes_eq(bytes(pt[-22:]), _expected_mdcbytes):
+ raise PGPDecryptionError("Decryption failed") # pragma: no cover
+
+ iv = bytes(pt[:alg.block_size // 8])
+ del pt[:alg.block_size // 8]
+
+ ivl2 = bytes(pt[:2])
+ del pt[:2]
+
+ if not constant_time.bytes_eq(iv[-2:], ivl2):
+ raise PGPDecryptionError("Decryption failed") # pragma: no cover
+
+ return pt
+
+
+class MDC(Packet):
+ """
+ 5.14. Modification Detection Code Packet (Tag 19)
+
+ The Modification Detection Code packet contains a SHA-1 hash of
+ plaintext data, which is used to detect message modification. It is
+ only used with a Symmetrically Encrypted Integrity Protected Data
+ packet. The Modification Detection Code packet MUST be the last
+ packet in the plaintext data that is encrypted in the Symmetrically
+ Encrypted Integrity Protected Data packet, and MUST appear in no
+ other place.
+
+ A Modification Detection Code packet MUST have a length of 20 octets.
+ The body of this packet consists of:
+
+ - A 20-octet SHA-1 hash of the preceding plaintext data of the
+ Symmetrically Encrypted Integrity Protected Data packet,
+ including prefix data, the tag octet, and length octet of the
+ Modification Detection Code packet.
+
+ Note that the Modification Detection Code packet MUST always use a
+ new format encoding of the packet tag, and a one-octet encoding of
+ the packet length. The reason for this is that the hashing rules for
+ modification detection include a one-octet tag and one-octet length
+ in the data hash. While this is a bit restrictive, it reduces
+ complexity.
+ """
+ __typeid__ = 0x13
+
+ def __init__(self):
+ super(MDC, self).__init__()
+ self.mdc = ''
+
+ def __bytearray__(self):
+ return super(MDC, self).__bytearray__() + binascii.unhexlify(self.mdc)
+
+ def parse(self, packet):
+ super(MDC, self).parse(packet)
+ self.mdc = binascii.hexlify(packet[:20])
+ del packet[:20]
diff --git a/src/leap/mx/vendor/pgpy/packet/subpackets/__init__.py b/src/leap/mx/vendor/pgpy/packet/subpackets/__init__.py
new file mode 100644
index 0000000..b1e90a5
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/packet/subpackets/__init__.py
@@ -0,0 +1,7 @@
+from .types import Signature as Signature
+from .types import UserAttribute as UserAttribute
+
+from .signature import * # NOQA
+from .userattribute import * # NOQA
+
+__all__ = ['Signature', 'UserAttribute']
diff --git a/src/leap/mx/vendor/pgpy/packet/subpackets/signature.py b/src/leap/mx/vendor/pgpy/packet/subpackets/signature.py
new file mode 100644
index 0000000..8993009
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/packet/subpackets/signature.py
@@ -0,0 +1,887 @@
+""" signature.py
+
+Signature SubPackets
+"""
+import binascii
+import calendar
+
+from datetime import datetime
+from datetime import timedelta
+
+import six
+
+from .types import EmbeddedSignatureHeader
+from .types import Signature
+
+from ...constants import CompressionAlgorithm
+from ...constants import Features as _Features
+from ...constants import HashAlgorithm
+from ...constants import KeyFlags as _KeyFlags
+from ...constants import KeyServerPreferences as _KeyServerPreferences
+from ...constants import NotationDataFlags
+from ...constants import PubKeyAlgorithm
+from ...constants import RevocationKeyClass
+from ...constants import RevocationReason
+from ...constants import SymmetricKeyAlgorithm
+
+from ...decorators import sdproperty
+
+from ...types import Fingerprint
+
+
+__all__ = ['URI',
+ 'FlagList',
+ 'ByteFlag',
+ 'Boolean',
+ 'CreationTime',
+ 'SignatureExpirationTime',
+ 'ExportableCertification',
+ 'TrustSignature',
+ 'RegularExpression',
+ 'Revocable',
+ 'KeyExpirationTime',
+ 'PreferredSymmetricAlgorithms',
+ 'RevocationKey',
+ 'Issuer',
+ 'NotationData',
+ 'PreferredHashAlgorithms',
+ 'PreferredCompressionAlgorithms',
+ 'KeyServerPreferences',
+ 'PreferredKeyServer',
+ 'PrimaryUserID',
+ 'Policy',
+ 'KeyFlags',
+ 'SignersUserID',
+ 'ReasonForRevocation',
+ 'Features',
+ 'EmbeddedSignature']
+
+
+class URI(Signature):
+ @sdproperty
+ def uri(self):
+ return self._uri
+
+ @uri.register(str)
+ @uri.register(six.text_type)
+ def uri_str(self, val):
+ self._uri = val
+
+ @uri.register(bytearray)
+ def uri_bytearray(self, val):
+ self.uri = val.decode('latin-1')
+
+ def __init__(self):
+ super(URI, self).__init__()
+ self.uri = ""
+
+ def __bytearray__(self):
+ _bytes = super(URI, self).__bytearray__()
+ _bytes += self.uri.encode()
+ return _bytes
+
+ def parse(self, packet):
+ super(URI, self).parse(packet)
+ self.uri = packet[:(self.header.length - 1)]
+ del packet[:(self.header.length - 1)]
+
+
+class FlagList(Signature):
+ __flags__ = None
+
+ @sdproperty
+ def flags(self):
+ return self._flags
+
+ @flags.register(list)
+ @flags.register(tuple)
+ def flags_list(self, val):
+ self._flags = list(val)
+
+ @flags.register(int)
+ @flags.register(CompressionAlgorithm)
+ @flags.register(HashAlgorithm)
+ @flags.register(PubKeyAlgorithm)
+ @flags.register(SymmetricKeyAlgorithm)
+ def flags_int(self, val):
+ if self.__flags__ is None: # pragma: no cover
+ raise AttributeError("Error: __flags__ not set!")
+
+ self._flags.append(self.__flags__(val))
+
+ @flags.register(bytearray)
+ def flags_bytearray(self, val):
+ self.flags = self.bytes_to_int(val)
+
+ def __init__(self):
+ super(FlagList, self).__init__()
+ self.flags = []
+
+ def __bytearray__(self):
+ _bytes = super(FlagList, self).__bytearray__()
+ _bytes += b''.join(self.int_to_bytes(b) for b in self.flags)
+ return _bytes
+
+ def parse(self, packet):
+ super(FlagList, self).parse(packet)
+ for i in range(0, self.header.length - 1):
+ self.flags = packet[:1]
+ del packet[:1]
+
+
+class ByteFlag(Signature):
+ __flags__ = None
+
+ @sdproperty
+ def flags(self):
+ return self._flags
+
+ @flags.register(set)
+ @flags.register(list)
+ def flags_seq(self, val):
+ self._flags = set(val)
+
+ @flags.register(int)
+ @flags.register(_KeyFlags)
+ @flags.register(_Features)
+ def flags_int(self, val):
+ if self.__flags__ is None: # pragma: no cover
+ raise AttributeError("Error: __flags__ not set!")
+
+ self._flags |= (self.__flags__ & val)
+
+ @flags.register(bytearray)
+ def flags_bytearray(self, val):
+ self.flags = self.bytes_to_int(val)
+
+ def __init__(self):
+ super(ByteFlag, self).__init__()
+ self.flags = []
+
+ def __bytearray__(self):
+ _bytes = super(ByteFlag, self).__bytearray__()
+ _bytes += self.int_to_bytes(sum(self.flags))
+ # null-pad _bytes if they are not up to the end now
+ if len(_bytes) < len(self):
+ _bytes += b'\x00' * (len(self) - len(_bytes))
+ return _bytes
+
+ def parse(self, packet):
+ super(ByteFlag, self).parse(packet)
+ for i in range(0, self.header.length - 1):
+ self.flags = packet[:1]
+ del packet[:1]
+
+
+class Boolean(Signature):
+ @sdproperty
+ def bflag(self):
+ return self._bool
+
+ @bflag.register(bool)
+ def bflag_bool(self, val):
+ self._bool = val
+
+ @bflag.register(bytearray)
+ def bflag_bytearray(self, val):
+ self.bool = bool(self.bytes_to_int(val))
+
+ def __init__(self):
+ super(Boolean, self).__init__()
+ self.bflag = False
+
+ def __bytearray__(self):
+ _bytes = super(Boolean, self).__bytearray__()
+ _bytes += self.int_to_bytes(int(self.bflag))
+ return _bytes
+
+ def __bool__(self):
+ return self.bflag
+
+ def __nonzero__(self):
+ return self.__bool__()
+
+ def parse(self, packet):
+ super(Boolean, self).parse(packet)
+ self.bflag = packet[:1]
+ del packet[:1]
+
+
+class CreationTime(Signature):
+ """
+ 5.2.3.4. Signature Creation Time
+
+ (4-octet time field)
+
+ The time the signature was made.
+
+ MUST be present in the hashed area.
+ """
+ __typeid__ = 0x02
+
+ @sdproperty
+ def created(self):
+ return self._created
+
+ @created.register(datetime)
+ def created_datetime(self, val):
+ self._created = val
+
+ @created.register(int)
+ def created_int(self, val):
+ self.created = datetime.utcfromtimestamp(val)
+
+ @created.register(bytearray)
+ def created_bytearray(self, val):
+ self.created = self.bytes_to_int(val)
+
+ def __init__(self):
+ super(CreationTime, self).__init__()
+ self.created = datetime.utcnow()
+
+ def __bytearray__(self):
+ _bytes = super(CreationTime, self).__bytearray__()
+ _bytes += self.int_to_bytes(calendar.timegm(self.created.timetuple()), 4)
+ return _bytes
+
+ def parse(self, packet):
+ super(CreationTime, self).parse(packet)
+ self.created = packet[:4]
+ del packet[:4]
+
+
+class SignatureExpirationTime(Signature):
+ """
+ 5.2.3.10. Signature Expiration Time
+
+ (4-octet time field)
+
+ The validity period of the signature. This is the number of seconds
+ after the signature creation time that the signature expires. If
+ this is not present or has a value of zero, it never expires.
+ """
+ __typeid__ = 0x03
+
+ @sdproperty
+ def expires(self):
+ return self._expires
+
+ @expires.register(timedelta)
+ def expires_timedelta(self, val):
+ self._expires = val
+
+ @expires.register(int)
+ def expires_int(self, val):
+ self.expires = timedelta(seconds=val)
+
+ @expires.register(bytearray)
+ def expires_bytearray(self, val):
+ self.expires = self.bytes_to_int(val)
+
+ def __init__(self):
+ super(SignatureExpirationTime, self).__init__()
+ self.expires = 0
+
+ def __bytearray__(self):
+ _bytes = super(SignatureExpirationTime, self).__bytearray__()
+ _bytes += self.int_to_bytes(int(self.expires.total_seconds()), 4)
+ return _bytes
+
+ def parse(self, packet):
+ super(SignatureExpirationTime, self).parse(packet)
+ self.expires = packet[:4]
+ del packet[:4]
+
+
+class ExportableCertification(Boolean):
+ """
+ 5.2.3.11. Exportable Certification
+
+ (1 octet of exportability, 0 for not, 1 for exportable)
+
+ This subpacket denotes whether a certification signature is
+ "exportable", to be used by other users than the signature's issuer.
+ The packet body contains a Boolean flag indicating whether the
+ signature is exportable. If this packet is not present, the
+ certification is exportable; it is equivalent to a flag containing a
+ 1.
+
+ Non-exportable, or "local", certifications are signatures made by a
+ user to mark a key as valid within that user's implementation only.
+ Thus, when an implementation prepares a user's copy of a key for
+ transport to another user (this is the process of "exporting" the
+ key), any local certification signatures are deleted from the key.
+
+ The receiver of a transported key "imports" it, and likewise trims
+ any local certifications. In normal operation, there won't be any,
+ assuming the import is performed on an exported key. However, there
+ are instances where this can reasonably happen. For example, if an
+ implementation allows keys to be imported from a key database in
+ addition to an exported key, then this situation can arise.
+
+ Some implementations do not represent the interest of a single user
+ (for example, a key server). Such implementations always trim local
+ certifications from any key they handle.
+ """
+ __typeid__ = 0x04
+
+
+class TrustSignature(Signature):
+ """
+ 5.2.3.13. Trust Signature
+
+ (1 octet "level" (depth), 1 octet of trust amount)
+
+ Signer asserts that the key is not only valid but also trustworthy at
+ the specified level. Level 0 has the same meaning as an ordinary
+ validity signature. Level 1 means that the signed key is asserted to
+ be a valid trusted introducer, with the 2nd octet of the body
+ specifying the degree of trust. Level 2 means that the signed key is
+ asserted to be trusted to issue level 1 trust signatures, i.e., that
+ it is a "meta introducer". Generally, a level n trust signature
+ asserts that a key is trusted to issue level n-1 trust signatures.
+ The trust amount is in a range from 0-255, interpreted such that
+ values less than 120 indicate partial trust and values of 120 or
+ greater indicate complete trust. Implementations SHOULD emit values
+ of 60 for partial trust and 120 for complete trust.
+ """
+ __typeid__ = 0x05
+
+ @sdproperty
+ def level(self):
+ return self._level
+
+ @level.register(int)
+ def level_int(self, val):
+ self._level = val
+
+ @level.register(bytearray)
+ def level_bytearray(self, val):
+ self.level = self.bytes_to_int(val)
+
+ @sdproperty
+ def amount(self):
+ return self._amount
+
+ @amount.register(int)
+ def amount_int(self, val):
+ # clamp 'val' to the range 0-255
+ self._amount = max(0, min(val, 255))
+
+ @amount.register(bytearray)
+ def amount_bytearray(self, val):
+ self.amount = self.bytes_to_int(val)
+
+ def __init__(self):
+ super(TrustSignature, self).__init__()
+ self.level = 0
+ self.amount = 0
+
+ def __bytearray__(self):
+ _bytes = super(TrustSignature, self).__bytearray__()
+ _bytes += self.int_to_bytes(self.level)
+ _bytes += self.int_to_bytes(self.amount)
+ return _bytes
+
+ def parse(self, packet):
+ super(TrustSignature, self).parse(packet)
+ self.level = packet[:1]
+ del packet[:1]
+ self.amount = packet[:1]
+ del packet[:1]
+
+
+class RegularExpression(Signature):
+ """
+ 5.2.3.14. Regular Expression
+
+ (null-terminated regular expression)
+
+ Used in conjunction with trust Signature packets (of level > 0) to
+ limit the scope of trust that is extended. Only signatures by the
+ target key on User IDs that match the regular expression in the body
+ of this packet have trust extended by the trust Signature subpacket.
+ The regular expression uses the same syntax as the Henry Spencer's
+ "almost public domain" regular expression [REGEX] package. A
+ description of the syntax is found in Section 8 below.
+ """
+ __typeid__ = 0x06
+
+ @sdproperty
+ def regex(self):
+ return self._regex
+
+ @regex.register(str)
+ @regex.register(six.text_type)
+ def regex_str(self, val):
+ self._regex = val
+
+ @regex.register(bytearray)
+ def regex_bytearray(self, val):
+ self.regex = val.decode('latin-1')
+
+ def __init__(self):
+ super(RegularExpression, self).__init__()
+ self.regex = r''
+
+ def __bytearray__(self):
+ _bytes = super(RegularExpression, self).__bytearray__()
+ _bytes += self.regex.encode()
+ return _bytes
+
+ def parse(self, packet):
+ super(RegularExpression, self).parse(packet)
+ self.regex = packet[:(self.header.length - 1)]
+ del packet[:(self.header.length - 1)]
+
+
+class Revocable(Boolean):
+ """
+ 5.2.3.12. Revocable
+
+ (1 octet of revocability, 0 for not, 1 for revocable)
+
+ Signature's revocability status. The packet body contains a Boolean
+ flag indicating whether the signature is revocable. Signatures that
+ are not revocable have any later revocation signatures ignored. They
+ represent a commitment by the signer that he cannot revoke his
+ signature for the life of his key. If this packet is not present,
+ the signature is revocable.
+ """
+ __typeid__ = 0x07
+
+
+class KeyExpirationTime(SignatureExpirationTime):
+ """
+ 5.2.3.6. Key Expiration Time
+
+ (4-octet time field)
+
+ The validity period of the key. This is the number of seconds after
+ the key creation time that the key expires. If this is not present
+ or has a value of zero, the key never expires. This is found only on
+ a self-signature.
+ """
+ __typeid__ = 0x09
+
+
+class PreferredSymmetricAlgorithms(FlagList):
+ """
+ 5.2.3.7. Preferred Symmetric Algorithms
+
+ (array of one-octet values)
+
+ Symmetric algorithm numbers that indicate which algorithms the key
+ holder prefers to use. The subpacket body is an ordered list of
+ octets with the most preferred listed first. It is assumed that only
+ algorithms listed are supported by the recipient's software.
+ Algorithm numbers are in Section 9. This is only found on a self-
+ signature.
+ """
+ __typeid__ = 0x0B
+ __flags__ = SymmetricKeyAlgorithm
+
+
+class RevocationKey(Signature):
+ """
+ 5.2.3.15. Revocation Key
+
+ (1 octet of class, 1 octet of public-key algorithm ID, 20 octets of
+ fingerprint)
+
+ Authorizes the specified key to issue revocation signatures for this
+ key. Class octet must have bit 0x80 set. If the bit 0x40 is set,
+ then this means that the revocation information is sensitive. Other
+ bits are for future expansion to other kinds of authorizations. This
+ is found on a self-signature.
+
+ If the "sensitive" flag is set, the keyholder feels this subpacket
+ contains private trust information that describes a real-world
+ sensitive relationship. If this flag is set, implementations SHOULD
+ NOT export this signature to other users except in cases where the
+ data needs to be available: when the signature is being sent to the
+ designated revoker, or when it is accompanied by a revocation
+ signature from that revoker. Note that it may be appropriate to
+ isolate this subpacket within a separate signature so that it is not
+ combined with other subpackets that need to be exported.
+ """
+ __typeid__ = 0x0C
+
+ @sdproperty
+ def keyclass(self):
+ return self._keyclass
+
+ @keyclass.register(list)
+ def keyclass_list(self, val):
+ self._keyclass = val
+
+ @keyclass.register(int)
+ @keyclass.register(RevocationKeyClass)
+ def keyclass_int(self, val):
+ self._keyclass += RevocationKeyClass & val
+
+ @keyclass.register(bytearray)
+ def keyclass_bytearray(self, val):
+ self.keyclass = self.bytes_to_int(val)
+
+ @sdproperty
+ def algorithm(self):
+ return self._algorithm
+
+ @algorithm.register(int)
+ @algorithm.register(PubKeyAlgorithm)
+ def algorithm_int(self, val):
+ self._algorithm = PubKeyAlgorithm(val)
+
+ @algorithm.register(bytearray)
+ def algorithm_bytearray(self, val):
+ self.algorithm = self.bytes_to_int(val)
+
+ @sdproperty
+ def fingerprint(self):
+ return self._fingerprint
+
+ @fingerprint.register(str)
+ @fingerprint.register(six.text_type)
+ @fingerprint.register(Fingerprint)
+ def fingerprint_str(self, val):
+ self._fingerprint = Fingerprint(val)
+
+ @fingerprint.register(bytearray)
+ def fingerprint_bytearray(self, val):
+ self.fingerprint = ''.join('{:02x}'.format(c) for c in val).upper()
+
+ def __init__(self):
+ super(RevocationKey, self).__init__()
+ self.keyclass = []
+ self.algorithm = PubKeyAlgorithm.Invalid
+ self._fingerprint = ""
+
+ def __bytearray__(self):
+ _bytes = super(RevocationKey, self).__bytearray__()
+ _bytes += self.int_to_bytes(sum(self.keyclass))
+ _bytes += self.int_to_bytes(self.algorithm.value)
+ _bytes += self.fingerprint.__bytes__()
+ return _bytes
+
+ def parse(self, packet):
+ super(RevocationKey, self).parse(packet)
+ self.keyclass = packet[:1]
+ del packet[:1]
+ self.algorithm = packet[:1]
+ del packet[:1]
+ self.fingerprint = packet[:20]
+ del packet[:20]
+
+
+class Issuer(Signature):
+ __typeid__ = 0x10
+
+ @sdproperty
+ def issuer(self):
+ return self._issuer
+
+ @issuer.register(bytearray)
+ def issuer_bytearray(self, val):
+ self._issuer = binascii.hexlify(val).upper().decode('latin-1')
+
+ def __init__(self):
+ super(Issuer, self).__init__()
+ self.issuer = bytearray()
+
+ def __bytearray__(self):
+ _bytes = super(Issuer, self).__bytearray__()
+ _bytes += binascii.unhexlify(self._issuer.encode())
+ return _bytes
+
+ def parse(self, packet):
+ super(Issuer, self).parse(packet)
+ self.issuer = packet[:8]
+ del packet[:8]
+
+
+class NotationData(Signature):
+ __typeid__ = 0x14
+
+ @sdproperty
+ def flags(self):
+ return self._flags
+
+ @flags.register(list)
+ def flags_list(self, val):
+ self._flags = val
+
+ @flags.register(int)
+ @flags.register(NotationDataFlags)
+ def flags_int(self, val):
+ self.flags += NotationDataFlags & val
+
+ @flags.register(bytearray)
+ def flags_bytearray(self, val):
+ self.flags = self.bytes_to_int(val)
+
+ @sdproperty
+ def name(self):
+ return self._name
+
+ @name.register(str)
+ @name.register(six.text_type)
+ def name_str(self, val):
+ self._name = val
+
+ @name.register(bytearray)
+ def name_bytearray(self, val):
+ self.name = val.decode('latin-1')
+
+ @sdproperty
+ def value(self):
+ return self._value
+
+ @value.register(str)
+ @value.register(six.text_type)
+ def value_str(self, val):
+ self._value = val
+
+ @value.register(bytearray)
+ def value_bytearray(self, val):
+ if NotationDataFlags.HumanReadable in self.flags:
+ self.value = val.decode('latin-1')
+
+ else: # pragma: no cover
+ self._value = val
+
+ def __init__(self):
+ super(NotationData, self).__init__()
+ self.flags = [0, 0, 0, 0]
+ self.name = ""
+ self.value = ""
+
+ def __bytearray__(self):
+ _bytes = super(NotationData, self).__bytearray__()
+ _bytes += self.int_to_bytes(sum(self.flags)) + b'\x00\x00\x00'
+ _bytes += self.int_to_bytes(len(self.name), 2)
+ _bytes += self.int_to_bytes(len(self.value), 2)
+ _bytes += self.name.encode()
+ _bytes += self.value if isinstance(self.value, bytearray) else self.value.encode()
+ return bytes(_bytes)
+
+ def parse(self, packet):
+ super(NotationData, self).parse(packet)
+ self.flags = packet[:1]
+ del packet[:4]
+ nlen = self.bytes_to_int(packet[:2])
+ del packet[:2]
+ vlen = self.bytes_to_int(packet[:2])
+ del packet[:2]
+ self.name = packet[:nlen]
+ del packet[:nlen]
+ self.value = packet[:vlen]
+ del packet[:vlen]
+
+
+class PreferredHashAlgorithms(FlagList):
+ __typeid__ = 0x15
+ __flags__ = HashAlgorithm
+
+
+class PreferredCompressionAlgorithms(FlagList):
+ __typeid__ = 0x16
+ __flags__ = CompressionAlgorithm
+
+
+class KeyServerPreferences(FlagList):
+ __typeid__ = 0x17
+ __flags__ = _KeyServerPreferences
+
+
+class PreferredKeyServer(URI):
+ __typeid__ = 0x18
+
+
+class PrimaryUserID(Signature):
+ __typeid__ = 0x19
+
+ @sdproperty
+ def primary(self):
+ return self._primary
+
+ @primary.register(bool)
+ def primary_bool(self, val):
+ self._primary = val
+
+ @primary.register(bytearray)
+ def primary_byrearray(self, val):
+ self.primary = bool(self.bytes_to_int(val))
+
+ def __init__(self):
+ super(PrimaryUserID, self).__init__()
+ self.primary = True
+
+ def __bytearray__(self):
+ _bytes = super(PrimaryUserID, self).__bytearray__()
+ _bytes += self.int_to_bytes(int(self.primary))
+ return _bytes
+
+ def __bool__(self):
+ return self.primary
+
+ def __nonzero__(self):
+ return self.__bool__()
+
+ def parse(self, packet):
+ super(PrimaryUserID, self).parse(packet)
+ self.primary = packet[:1]
+ del packet[:1]
+
+
+class Policy(URI):
+ __typeid__ = 0x1a
+
+
+class KeyFlags(ByteFlag):
+ __typeid__ = 0x1B
+ __flags__ = _KeyFlags
+
+
+class SignersUserID(Signature):
+ __typeid__ = 0x1C
+
+ @sdproperty
+ def userid(self):
+ return self._userid
+
+ @userid.register(str)
+ @userid.register(six.text_type)
+ def userid_str(self, val):
+ self._userid = val
+
+ @userid.register(bytearray)
+ def userid_bytearray(self, val):
+ self.userid = val.decode('latin-1')
+
+ def __init__(self):
+ super(SignersUserID, self).__init__()
+ self.userid = ""
+
+ def __bytearray__(self):
+ _bytes = super(SignersUserID, self).__bytearray__()
+ _bytes += self.userid.encode()
+ return _bytes
+
+ def parse(self, packet):
+ super(SignersUserID, self).parse(packet)
+ self.userid = packet[:(self.header.length - 1)]
+ del packet[:(self.header.length - 1)]
+
+
+class ReasonForRevocation(Signature):
+ __typeid__ = 0x1D
+
+ @sdproperty
+ def code(self):
+ return self._code
+
+ @code.register(int)
+ @code.register(RevocationReason)
+ def code_int(self, val):
+ self._code = RevocationReason(val)
+
+ @code.register(bytearray)
+ def code_bytearray(self, val):
+ self.code = self.bytes_to_int(val)
+
+ @sdproperty
+ def string(self):
+ return self._string
+
+ @string.register(str)
+ @string.register(six.text_type)
+ def string_str(self, val):
+ self._string = val
+
+ @string.register(bytearray)
+ def string_bytearray(self, val):
+ self.string = val.decode('latin-1')
+
+ def __init__(self):
+ super(ReasonForRevocation, self).__init__()
+ self.code = 0x00
+ self.string = ""
+
+ def __bytearray__(self):
+ _bytes = super(ReasonForRevocation, self).__bytearray__()
+ _bytes += self.int_to_bytes(self.code)
+ _bytes += self.string.encode()
+ return _bytes
+
+ def parse(self, packet):
+ super(ReasonForRevocation, self).parse(packet)
+ self.code = packet[:1]
+ del packet[:1]
+ self.string = packet[:(self.header.length - 2)]
+ del packet[:(self.header.length - 2)]
+
+
+class Features(ByteFlag):
+ __typeid__ = 0x1E
+ __flags__ = _Features
+
+
+##TODO: obtain subpacket type 0x1F - Signature Target
+
+
+class EmbeddedSignature(Signature):
+ __typeid__ = 0x20
+
+ @sdproperty
+ def _sig(self):
+ return self._sigpkt
+
+ @_sig.setter
+ def _sig(self, val):
+ esh = EmbeddedSignatureHeader()
+ esh.version = val.header.version
+ val.header = esh
+ val.update_hlen()
+ self._sigpkt = val
+
+ @property
+ def sigtype(self):
+ return self._sig.sigtype
+
+ @property
+ def pubalg(self):
+ return self._sig.pubalg
+
+ @property
+ def halg(self):
+ return self._sig.halg
+
+ @property
+ def subpackets(self):
+ return self._sig.subpackets
+
+ @property
+ def hash2(self): # pragma: no cover
+ return self._sig.hash2
+
+ @property
+ def signature(self):
+ return self._sig.signature
+
+ @property
+ def signer(self):
+ return self._sig.signer
+
+ def __init__(self):
+ super(EmbeddedSignature, self).__init__()
+ from ..packets import SignatureV4
+ self._sigpkt = SignatureV4()
+ self._sigpkt.header = EmbeddedSignatureHeader()
+
+ def __bytearray__(self):
+ return super(EmbeddedSignature, self).__bytearray__() + self._sigpkt.__bytearray__()
+
+ def parse(self, packet):
+ super(EmbeddedSignature, self).parse(packet)
+ self._sig.parse(packet)
diff --git a/src/leap/mx/vendor/pgpy/packet/subpackets/types.py b/src/leap/mx/vendor/pgpy/packet/subpackets/types.py
new file mode 100644
index 0000000..7cf58d1
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/packet/subpackets/types.py
@@ -0,0 +1,136 @@
+""" subpacket.py
+"""
+import abc
+
+from ..types import VersionedHeader
+
+from ...decorators import sdproperty
+
+from ...types import Dispatchable
+from ...types import Header as _Header
+
+__all__ = ['Header',
+ 'EmbeddedSignatureHeader',
+ 'SubPacket',
+ 'Signature',
+ 'UserAttribute',
+ 'Opaque']
+
+
+class Header(_Header):
+ @sdproperty
+ def critical(self):
+ return self._critical
+
+ @critical.register(bool)
+ def critical_bool(self, val):
+ self._critical = val
+
+ @sdproperty
+ def typeid(self):
+ return self._typeid
+
+ @typeid.register(int)
+ def typeid_int(self, val):
+ self._typeid = val & 0x7f
+
+ @typeid.register(bytes)
+ @typeid.register(bytearray)
+ def typeid_bin(self, val):
+ v = self.bytes_to_int(val)
+ self.typeid = v
+ self.critical = bool(v & 0x80)
+
+ def __init__(self):
+ super(Header, self).__init__()
+ self.typeid = b'\x00'
+ self.critical = False
+
+ def parse(self, packet):
+ self.length = packet
+
+ self.typeid = packet[:1]
+ del packet[:1]
+
+ def __len__(self):
+ return self.llen + 1
+
+ def __bytearray__(self):
+ _bytes = bytearray(self.encode_length(self.length))
+ _bytes += self.int_to_bytes((int(self.critical) << 7) + self.typeid)
+ return _bytes
+
+
+class EmbeddedSignatureHeader(VersionedHeader):
+ def __bytearray__(self):
+ return bytearray([self.version])
+
+ def parse(self, packet):
+ self.tag = 2
+ super(EmbeddedSignatureHeader, self).parse(packet)
+
+
+class SubPacket(Dispatchable):
+ __headercls__ = Header
+
+ def __init__(self):
+ super(SubPacket, self).__init__()
+ self.header = Header()
+
+ # if self.__typeid__ not in [-1, None]:
+ if (self.header.typeid == 0x00 and
+ (not hasattr(self.__typeid__, '__abstractmethod__')) and
+ (self.__typeid__ not in [-1, None])):
+ self.header.typeid = self.__typeid__
+
+ def __bytearray__(self):
+ return self.header.__bytearray__()
+
+ def __len__(self):
+ return (self.header.llen + self.header.length)
+
+ def __repr__(self):
+ return "<{} [0x{:02x}] at 0x{:x}>".format(self.__class__.__name__, self.header.typeid, id(self))
+
+ def update_hlen(self):
+ self.header.length = (len(self.__bytearray__()) - len(self.header)) + 1
+
+ @abc.abstractmethod
+ def parse(self, packet): # pragma: no cover
+ if self.header._typeid == 0:
+ self.header.parse(packet)
+
+
+class Signature(SubPacket):
+ __typeid__ = -1
+
+
+class UserAttribute(SubPacket):
+ __typeid__ = -1
+
+
+class Opaque(Signature, UserAttribute):
+ __typeid__ = None
+
+ @sdproperty
+ def payload(self):
+ return self._payload
+
+ @payload.register(bytes)
+ @payload.register(bytearray)
+ def payload_bin(self, val):
+ self._payload = bytearray(val)
+
+ def __init__(self):
+ super(Opaque, self).__init__()
+ self.payload = b''
+
+ def __bytearray__(self):
+ _bytes = super(Opaque, self).__bytearray__()
+ _bytes += self.payload
+ return _bytes
+
+ def parse(self, packet):
+ super(Opaque, self).parse(packet)
+ self.payload = packet[:(self.header.length - 1)]
+ del packet[:(self.header.length - 1)]
diff --git a/src/leap/mx/vendor/pgpy/packet/subpackets/userattribute.py b/src/leap/mx/vendor/pgpy/packet/subpackets/userattribute.py
new file mode 100644
index 0000000..32f7410
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/packet/subpackets/userattribute.py
@@ -0,0 +1,109 @@
+""" userattribute.py
+"""
+import struct
+
+from .types import UserAttribute
+
+from ...constants import ImageEncoding
+
+from ...decorators import sdproperty
+
+from ...memoryview import memoryview
+
+
+__all__ = ('Image',)
+
+
+class Image(UserAttribute):
+ """
+ 5.12.1. The Image Attribute Subpacket
+
+ The Image Attribute subpacket is used to encode an image, presumably
+ (but not required to be) that of the key owner.
+
+ The Image Attribute subpacket begins with an image header. The first
+ two octets of the image header contain the length of the image
+ header. Note that unlike other multi-octet numerical values in this
+ document, due to a historical accident this value is encoded as a
+ little-endian number. The image header length is followed by a
+ single octet for the image header version. The only currently
+ defined version of the image header is 1, which is a 16-octet image
+ header. The first three octets of a version 1 image header are thus
+ 0x10, 0x00, 0x01.
+
+ The fourth octet of a version 1 image header designates the encoding
+ format of the image. The only currently defined encoding format is
+ the value 1 to indicate JPEG. Image format types 100 through 110 are
+ reserved for private or experimental use. The rest of the version 1
+ image header is made up of 12 reserved octets, all of which MUST be
+ set to 0.
+
+ The rest of the image subpacket contains the image itself. As the
+ only currently defined image type is JPEG, the image is encoded in
+ the JPEG File Interchange Format (JFIF), a standard file format for
+ JPEG images [JFIF].
+
+ An implementation MAY try to determine the type of an image by
+ examination of the image data if it is unable to handle a particular
+ version of the image header or if a specified encoding format value
+ is not recognized.
+ """
+ __typeid__ = 0x01
+
+ @sdproperty
+ def version(self):
+ return self._version
+
+ @version.register(int)
+ def version_int(self, val):
+ self._version = val
+
+ @sdproperty
+ def iencoding(self):
+ return self._iencoding
+
+ @iencoding.register(int)
+ @iencoding.register(ImageEncoding)
+ def iencoding_int(self, val):
+ try:
+ self._iencoding = ImageEncoding(val)
+
+ except ValueError: # pragma: no cover
+ self._iencoding = val
+
+ @sdproperty
+ def image(self):
+ return self._image
+
+ @image.register(bytes)
+ @image.register(bytearray)
+ def image_bin(self, val):
+ self._image = bytearray(val)
+
+ def __init__(self):
+ super(Image, self).__init__()
+ self.version = 1
+ self.iencoding = 1
+ self.image = bytearray()
+
+ def __bytearray__(self):
+ _bytes = super(Image, self).__bytearray__()
+
+ if self.version == 1:
+ # v1 image header length is always 16 bytes
+ # and stored little-endian due to an 'historical accident'
+ _bytes += struct.pack('<hbbiii', 16, self.version, self.iencoding, 0, 0, 0)
+
+ _bytes += self.image
+ return _bytes
+
+ def parse(self, packet):
+ super(Image, self).parse(packet)
+
+ # on Python 2, this will be the wrapper object from memoryview.py
+ with memoryview(packet) as _head:
+ _, self.version, self.iencoding, _, _, _ = struct.unpack_from('<hbbiii', _head[:16].tobytes())
+ del packet[:16]
+
+ self.image = packet[:(self.header.length - 17)]
+ del packet[:(self.header.length - 17)]
diff --git a/src/leap/mx/vendor/pgpy/packet/types.py b/src/leap/mx/vendor/pgpy/packet/types.py
new file mode 100644
index 0000000..e84ba67
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/packet/types.py
@@ -0,0 +1,288 @@
+""" types.py
+"""
+from __future__ import division
+
+import abc
+import copy
+
+import six
+
+from ..constants import PacketTag
+
+from ..decorators import sdproperty
+
+from ..types import Dispatchable
+from ..types import Field
+from ..types import Header as _Header
+
+__all__ = ['Header',
+ 'VersionedHeader',
+ 'Packet',
+ 'VersionedPacket',
+ 'Opaque',
+ 'Key',
+ 'Public',
+ 'Private',
+ 'Primary',
+ 'Sub',
+ 'MPI',
+ 'MPIs', ]
+
+
+class Header(_Header):
+ @sdproperty
+ def tag(self):
+ return self._tag
+
+ @tag.register(int)
+ @tag.register(PacketTag)
+ def tag_int(self, val):
+ _tag = (val & 0x3F) if self._lenfmt else ((val & 0x3C) >> 2)
+ try:
+ self._tag = PacketTag(_tag)
+
+ except ValueError: # pragma: no cover
+ self._tag = _tag
+
+ @property
+ def typeid(self):
+ return self.tag
+
+ def __init__(self):
+ super(Header, self).__init__()
+ self.tag = 0x00
+
+ def __bytearray__(self):
+ tag = 0x80 | (self._lenfmt << 6)
+ tag |= (self.tag) if self._lenfmt else ((self.tag << 2) | {1: 0, 2: 1, 4: 2, 0: 3}[self.llen])
+
+ _bytes = bytearray(self.int_to_bytes(tag))
+ _bytes += self.encode_length(self.length, self._lenfmt, self.llen)
+ return _bytes
+
+ def __len__(self):
+ return 1 + self.llen
+
+ def parse(self, packet):
+ """
+ There are two formats for headers
+
+ old style
+ ---------
+
+ Old style headers can be 1, 2, 3, or 6 octets long and are composed of a Tag and a Length.
+ If the header length is 1 octet (length_type == 3), then there is no Length field.
+
+ new style
+ ---------
+
+ New style headers can be 2, 3, or 6 octets long and are also composed of a Tag and a Length.
+
+
+ Packet Tag
+ ----------
+
+ The packet tag is the first byte, comprising the following fields:
+
+ +-------------+----------+---------------+---+---+---+---+----------+----------+
+ | byte | 1 |
+ +-------------+----------+---------------+---+---+---+---+----------+----------+
+ | bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
+ +-------------+----------+---------------+---+---+---+---+----------+----------+
+ | old-style | always 1 | packet format | packet tag | length type |
+ | description | | 0 = old-style | | 0 = 1 octet |
+ | | | 1 = new-style | | 1 = 2 octets |
+ | | | | | 2 = 5 octets |
+ | | | | | 3 = no length field |
+ +-------------+ + +---------------+---------------------+
+ | new-style | | | packet tag |
+ | description | | | |
+ +-------------+----------+---------------+-------------------------------------+
+
+ :param packet: raw packet bytes
+ """
+ self._lenfmt = ((packet[0] & 0x40) >> 6)
+ self.tag = packet[0]
+ if self._lenfmt == 0:
+ self.llen = (packet[0] & 0x03)
+ del packet[0]
+
+ if (self._lenfmt == 0 and self.llen > 0) or self._lenfmt == 1:
+ self.length = packet
+
+ else:
+ # indeterminate packet length
+ self.length = len(packet)
+
+
+class VersionedHeader(Header):
+ @sdproperty
+ def version(self):
+ return self._version
+
+ @version.register(int)
+ def version_int(self, val):
+ self._version = val
+
+ def __init__(self):
+ super(VersionedHeader, self).__init__()
+ self.version = 0
+
+ def __bytearray__(self):
+ _bytes = bytearray(super(VersionedHeader, self).__bytearray__())
+ _bytes += bytearray([self.version])
+ return _bytes
+
+ def parse(self, packet): # pragma: no cover
+ if self.tag == 0:
+ super(VersionedHeader, self).parse(packet)
+
+ if self.version == 0:
+ self.version = packet[0]
+ del packet[0]
+
+
+class Packet(Dispatchable):
+ __typeid__ = -1
+ __headercls__ = Header
+
+ def __init__(self):
+ super(Packet, self).__init__()
+ self.header = self.__headercls__()
+ if isinstance(self.__typeid__, six.integer_types):
+ self.header.tag = self.__typeid__
+
+ @abc.abstractmethod
+ def __bytearray__(self):
+ return self.header.__bytearray__()
+
+ def __len__(self):
+ return len(self.header) + self.header.length
+
+ def __repr__(self):
+ return "<{cls:s} [tag 0x{tag:02d}] at 0x{id:x}>".format(cls=self.__class__.__name__, tag=self.header.tag, id=id(self))
+
+ def update_hlen(self):
+ self.header.length = len(self.__bytearray__()) - len(self.header)
+
+ @abc.abstractmethod
+ def parse(self, packet):
+ if self.header.tag == 0:
+ self.header.parse(packet)
+
+
+class VersionedPacket(Packet):
+ __headercls__ = VersionedHeader
+
+ def __init__(self):
+ super(VersionedPacket, self).__init__()
+ if isinstance(self.__ver__, six.integer_types):
+ self.header.version = self.__ver__
+
+ def __repr__(self):
+ return "<{cls:s} [tag 0x{tag:02d}][v{ver:d}] at 0x{id:x}>".format(cls=self.__class__.__name__, tag=self.header.tag,
+ ver=self.header.version, id=id(self))
+
+
+class Opaque(Packet):
+ __typeid__ = None
+
+ @sdproperty
+ def payload(self):
+ return self._payload
+
+ @payload.register(bytearray)
+ @payload.register(bytes)
+ def payload_bin(self, val):
+ self._payload = val
+
+ def __init__(self):
+ super(Opaque, self).__init__()
+ self.payload = b''
+
+ def __bytearray__(self):
+ _bytes = super(Opaque, self).__bytearray__()
+ _bytes += self.payload
+ return _bytes
+
+ def parse(self, packet): # pragma: no cover
+ super(Opaque, self).parse(packet)
+ pend = self.header.length
+ if hasattr(self.header, 'version'):
+ pend -= 1
+
+ self.payload = packet[:pend]
+ del packet[:pend]
+
+
+# key marker classes for convenience
+class Key(object):
+ pass
+
+
+class Public(Key):
+ pass
+
+
+class Private(Key):
+ pass
+
+
+class Primary(Key):
+ pass
+
+
+class Sub(Key):
+ pass
+
+
+# This is required for class MPI to work in both Python 2 and 3
+if not six.PY2:
+ long = int
+
+
+class MPI(long):
+ def __new__(cls, num):
+ mpi = num
+
+ if isinstance(num, (bytes, bytearray)):
+ if isinstance(num, bytes): # pragma: no cover
+ num = bytearray(num)
+
+ fl = ((MPIs.bytes_to_int(num[:2]) + 7) // 8)
+ del num[:2]
+
+ mpi = MPIs.bytes_to_int(num[:fl])
+ del num[:fl]
+
+ return super(MPI, cls).__new__(cls, mpi)
+
+ def byte_length(self):
+ return ((self.bit_length() + 7) // 8)
+
+ def to_mpibytes(self):
+ return MPIs.int_to_bytes(self.bit_length(), 2) + MPIs.int_to_bytes(self, self.byte_length())
+
+ def __len__(self):
+ return self.byte_length() + 2
+
+
+class MPIs(Field):
+ # this differs from MPI in that it's subclasses hold/parse several MPI fields
+ # and, in the case of v4 private keys, also a String2Key specifier/information.
+ __mpis__ = ()
+
+ def __len__(self):
+ return sum(len(i) for i in self)
+
+ def __iter__(self):
+ """yield all components of an MPI so it can be iterated over"""
+ for i in self.__mpis__:
+ yield getattr(self, i)
+
+ def __copy__(self):
+ pk = self.__class__()
+ for m in self.__mpis__:
+ setattr(pk, m, copy.copy(getattr(self, m)))
+
+ return pk
diff --git a/src/leap/mx/vendor/pgpy/pgp.py b/src/leap/mx/vendor/pgpy/pgp.py
new file mode 100644
index 0000000..82b3a0b
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/pgp.py
@@ -0,0 +1,2527 @@
+""" pgp.py
+
+this is where the armorable PGP block objects live
+"""
+import binascii
+import calendar
+import collections
+import contextlib
+import copy
+import functools
+import itertools
+import operator
+import os
+import re
+import warnings
+import weakref
+
+import six
+
+from datetime import datetime
+
+from cryptography.hazmat.primitives import hashes
+
+from .constants import CompressionAlgorithm
+from .constants import Features
+from .constants import HashAlgorithm
+from .constants import ImageEncoding
+from .constants import KeyFlags
+from .constants import NotationDataFlags
+from .constants import PacketTag
+from .constants import PubKeyAlgorithm
+from .constants import RevocationKeyClass
+from .constants import RevocationReason
+from .constants import SignatureType
+from .constants import SymmetricKeyAlgorithm
+
+from .decorators import KeyAction
+
+from .errors import PGPDecryptionError
+from .errors import PGPError
+
+from .packet import Key
+from .packet import MDC
+from .packet import Packet
+from .packet import Primary
+from .packet import Private
+from .packet import PubKeyV4
+from .packet import PubSubKeyV4
+from .packet import PrivKeyV4
+from .packet import PrivSubKeyV4
+from .packet import Public
+from .packet import Sub
+from .packet import UserID
+from .packet import UserAttribute
+
+from .packet.packets import CompressedData
+from .packet.packets import IntegrityProtectedSKEData
+from .packet.packets import IntegrityProtectedSKEDataV1
+from .packet.packets import LiteralData
+from .packet.packets import OnePassSignature
+from .packet.packets import OnePassSignatureV3
+from .packet.packets import PKESessionKey
+from .packet.packets import PKESessionKeyV3
+from .packet.packets import Signature
+from .packet.packets import SignatureV4
+from .packet.packets import SKEData
+from .packet.packets import Marker
+from .packet.packets import SKESessionKey
+from .packet.packets import SKESessionKeyV4
+
+from .packet.types import Opaque
+
+from .types import Armorable
+from .types import Fingerprint
+from .types import ParentRef
+from .types import PGPObject
+from .types import SignatureVerification
+from .types import SorteDeque
+
+__all__ = ['PGPSignature',
+ 'PGPUID',
+ 'PGPMessage',
+ 'PGPKey',
+ 'PGPKeyring']
+
+
+class PGPSignature(Armorable, ParentRef, PGPObject):
+ @property
+ def __sig__(self):
+ return self._signature.signature.__sig__()
+
+ @property
+ def cipherprefs(self):
+ """
+ A ``list`` of preferred symmetric algorithms specified in this signature, if any. Otherwise, an empty ``list``.
+ """
+ if 'PreferredSymmetricAlgorithms' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['h_PreferredSymmetricAlgorithms'])).flags
+ return []
+
+ @property
+ def compprefs(self):
+ """
+ A ``list`` of preferred compression algorithms specified in this signature, if any. Otherwise, an empty ``list``.
+ """
+ if 'PreferredCompressionAlgorithms' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['h_PreferredCompressionAlgorithms'])).flags
+ return []
+
+ @property
+ def created(self):
+ """
+ A :py:obj:`~datetime.datetime` of when this signature was created.
+ """
+ return self._signature.subpackets['h_CreationTime'][-1].created
+
+ @property
+ def embedded(self):
+ return self.parent is not None
+
+ @property
+ def expires_at(self):
+ """
+ A :py:obj:`~datetime.datetime` of when this signature expires, if a signature expiration date is specified.
+ Otherwise, ``None``
+ """
+ if 'SignatureExpirationTime' in self._signature.subpackets:
+ expd = next(iter(self._signature.subpackets['SignatureExpirationTime'])).expires
+ return self.created + expd
+ return None
+
+ @property
+ def exportable(self):
+ """
+ ``False`` if this signature is marked as being not exportable. Otherwise, ``True``.
+ """
+ if 'ExportableCertification' in self._signature.subpackets:
+ return bool(next(iter(self._signature.subpackets['ExportableCertification'])))
+
+ return True
+
+ @property
+ def features(self):
+ """
+ A ``set`` of implementation features specified in this signature, if any. Otherwise, an empty ``set``.
+ """
+ if 'Features' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['Features'])).flags
+ return set()
+
+ @property
+ def hash2(self):
+ return self._signature.hash2
+
+ @property
+ def hashprefs(self):
+ """
+ A ``list`` of preferred hash algorithms specified in this signature, if any. Otherwise, an empty ``list``.
+ """
+ if 'PreferredHashAlgorithms' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['h_PreferredHashAlgorithms'])).flags
+ return []
+
+ @property
+ def hash_algorithm(self):
+ """
+ The :py:obj:`~constants.HashAlgorithm` used when computing this signature.
+ """
+ return self._signature.halg
+
+ @property
+ def is_expired(self):
+ """
+ ``True`` if the signature has an expiration date, and is expired. Otherwise, ``False``
+ """
+ expires_at = self.expires_at
+ if expires_at is not None and expires_at != self.created:
+ return expires_at < datetime.utcnow()
+
+ return False
+
+ @property
+ def key_algorithm(self):
+ """
+ The :py:obj:`~constants.PubKeyAlgorithm` of the key that generated this signature.
+ """
+ return self._signature.pubalg
+
+ @property
+ def key_expiration(self):
+ if 'KeyExpirationTime' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['KeyExpirationTime'])).expires
+ return None
+
+ @property
+ def key_flags(self):
+ """
+ A ``set`` of :py:obj:`~constants.KeyFlags` specified in this signature, if any. Otherwise, an empty ``set``.
+ """
+ if 'KeyFlags' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['h_KeyFlags'])).flags
+ return set()
+
+ @property
+ def keyserver(self):
+ """
+ The preferred key server specified in this signature, if any. Otherwise, an empty ``str``.
+ """
+ if 'PreferredKeyServer' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['h_PreferredKeyServer'])).uri
+ return ''
+
+ @property
+ def keyserverprefs(self):
+ """
+ A ``list`` of :py:obj:`~constants.KeyServerPreferences` in this signature, if any. Otherwise, an empty ``list``.
+ """
+ if 'KeyServerPreferences' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['h_KeyServerPreferences'])).flags
+ return []
+
+ @property
+ def magic(self):
+ return "SIGNATURE"
+
+ @property
+ def notation(self):
+ """
+ A ``dict`` of notation data in this signature, if any. Otherwise, an empty ``dict``.
+ """
+ return dict((nd.name, nd.value) for nd in self._signature.subpackets['NotationData'])
+
+ @property
+ def policy_uri(self):
+ """
+ The policy URI specified in this signature, if any. Otherwise, an empty ``str``.
+ """
+ if 'Policy' in self._signature.subpackets:
+ return next(iter(self._signature.subpackets['Policy'])).uri
+ return ''
+
+ @property
+ def revocable(self):
+ """
+ ``False`` if this signature is marked as being not revocable. Otherwise, ``True``.
+ """
+ if 'Revocable' in self._signature.subpackets:
+ return bool(next(iter(self._signature.subpackets['Revocable'])))
+ return True
+
+ @property
+ def revocation_key(self):
+ if 'RevocationKey' in self._signature.subpackets:
+ raise NotImplementedError()
+ return None
+
+ @property
+ def signer(self):
+ """
+ The 16-character Key ID of the key that generated this signature.
+ """
+ return self._signature.signer
+
+ @property
+ def target_signature(self):
+ return NotImplemented
+
+ @property
+ def type(self):
+ """
+ The :py:obj:`~constants.SignatureType` of this signature.
+ """
+ return self._signature.sigtype
+
+ @classmethod
+ def new(cls, sigtype, pkalg, halg, signer):
+ sig = PGPSignature()
+
+ sigpkt = SignatureV4()
+ sigpkt.header.tag = 2
+ sigpkt.header.version = 4
+ sigpkt.subpackets.addnew('CreationTime', hashed=True, created=datetime.utcnow())
+ sigpkt.subpackets.addnew('Issuer', _issuer=signer)
+
+ sigpkt.sigtype = sigtype
+ sigpkt.pubalg = pkalg
+
+ if halg is not None:
+ sigpkt.halg = halg
+
+ sig._signature = sigpkt
+ return sig
+
+ def __init__(self):
+ """
+ PGPSignature objects represent OpenPGP compliant signatures.
+
+ PGPSignature implements the ``__str__`` method, the output of which will be the signature object in
+ OpenPGP-compliant ASCII-armored format.
+
+ PGPSignature implements the ``__bytes__`` method, the output of which will be the signature object in
+ OpenPGP-compliant binary format.
+ """
+ super(PGPSignature, self).__init__()
+ self._signature = None
+
+ def __bytearray__(self):
+ return self._signature.__bytearray__()
+
+ def __repr__(self):
+ return "<PGPSignature [{:s}] object at 0x{:02x}>".format(self.type.name, id(self))
+
+ def __lt__(self, other):
+ return self.created < other.created
+
+ def __or__(self, other):
+ if isinstance(other, Signature):
+ if self._signature is None:
+ self._signature = other
+ return self
+
+ ##TODO: this is not a great way to do this
+ if other.__class__.__name__ == 'EmbeddedSignature':
+ self._signature = other
+ return self
+
+ raise TypeError
+
+ def __copy__(self):
+ # because the default shallow copy isn't actually all that useful,
+ # and deepcopy does too much work
+ sig = super(PGPSignature, self).__copy__()
+ # sig = PGPSignature()
+ # sig.ascii_headers = self.ascii_headers.copy()
+ sig |= copy.copy(self._signature)
+ return sig
+
+ def hashdata(self, subject):
+ _data = bytearray()
+
+ if isinstance(subject, six.string_types):
+ subject = subject.encode('latin-1')
+
+ """
+ All signatures are formed by producing a hash over the signature
+ data, and then using the resulting hash in the signature algorithm.
+ """
+
+ if self.type == SignatureType.BinaryDocument:
+ """
+ For binary document signatures (type 0x00), the document data is
+ hashed directly.
+ """
+ _data += bytearray(subject)
+
+ if self.type == SignatureType.CanonicalDocument:
+ """
+ For text document signatures (type 0x01), the
+ document is canonicalized by converting line endings to <CR><LF>,
+ and the resulting data is hashed.
+ """
+ _data += re.subn(br'\r?\n', b'\r\n', subject)[0]
+
+ if self.type in {SignatureType.Generic_Cert, SignatureType.Persona_Cert, SignatureType.Casual_Cert,
+ SignatureType.Positive_Cert, SignatureType.CertRevocation, SignatureType.Subkey_Binding,
+ SignatureType.PrimaryKey_Binding}:
+ """
+ When a signature is made over a key, the hash data starts with the
+ octet 0x99, followed by a two-octet length of the key, and then body
+ of the key packet. (Note that this is an old-style packet header for
+ a key packet with two-octet length.) ...
+ Key revocation signatures (types 0x20 and 0x28)
+ hash only the key being revoked.
+ """
+ _s = b''
+ if isinstance(subject, PGPUID):
+ _s = subject._parent.hashdata
+
+ elif isinstance(subject, PGPKey) and not subject.is_primary:
+ _s = subject._parent.hashdata
+
+ elif isinstance(subject, PGPKey) and subject.is_primary:
+ _s = subject.hashdata
+
+ if len(_s) > 0:
+ _data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s
+
+ if self.type in {SignatureType.Subkey_Binding, SignatureType.PrimaryKey_Binding}:
+ """
+ A subkey binding signature
+ (type 0x18) or primary key binding signature (type 0x19) then hashes
+ the subkey using the same format as the main key (also using 0x99 as
+ the first octet).
+ """
+ if subject.is_primary:
+ _s = subject.subkeys[self.signer].hashdata
+
+ else:
+ _s = subject.hashdata
+
+ _data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s
+
+ if self.type in {SignatureType.KeyRevocation, SignatureType.SubkeyRevocation, SignatureType.DirectlyOnKey}:
+ """
+ The signature is calculated directly on the key being revoked. A
+ revoked key is not to be used. Only revocation signatures by the
+ key being revoked, or by an authorized revocation key, should be
+ considered valid revocation signatures.
+
+ Subkey revocation signature
+ The signature is calculated directly on the subkey being revoked.
+ A revoked subkey is not to be used. Only revocation signatures
+ by the top-level signature key that is bound to this subkey, or
+ by an authorized revocation key, should be considered valid
+ revocation signatures.
+
+ Signature directly on a key
+ This signature is calculated directly on a key. It binds the
+ information in the Signature subpackets to the key, and is
+ appropriate to be used for subpackets that provide information
+ about the key, such as the Revocation Key subpacket. It is also
+ appropriate for statements that non-self certifiers want to make
+ about the key itself, rather than the binding between a key and a
+ name.
+ """
+ _s = subject.hashdata
+ _data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s
+
+ if self.type in {SignatureType.Generic_Cert, SignatureType.Persona_Cert, SignatureType.Casual_Cert,
+ SignatureType.Positive_Cert, SignatureType.CertRevocation}:
+ """
+ A certification signature (type 0x10 through 0x13) hashes the User
+ ID being bound to the key into the hash context after the above
+ data. ... A V4 certification
+ hashes the constant 0xB4 for User ID certifications or the constant
+ 0xD1 for User Attribute certifications, followed by a four-octet
+ number giving the length of the User ID or User Attribute data, and
+ then the User ID or User Attribute data.
+
+ ...
+
+ The [certificate revocation] signature
+ is computed over the same data as the certificate that it
+ revokes, and should have a later creation date than that
+ certificate.
+ """
+
+ _s = subject.hashdata
+ if subject.is_uid:
+ _data += b'\xb4'
+
+ else:
+ _data += b'\xd1'
+
+ _data += self.int_to_bytes(len(_s), 4) + _s
+
+ # if this is a new signature, do update_hlen
+ if 0 in list(self._signature.signature):
+ self._signature.update_hlen()
+
+ """
+ Once the data body is hashed, then a trailer is hashed. (...)
+ A V4 signature hashes the packet body
+ starting from its first field, the version number, through the end
+ of the hashed subpacket data. Thus, the fields hashed are the
+ signature version, the signature type, the public-key algorithm, the
+ hash algorithm, the hashed subpacket length, and the hashed
+ subpacket body.
+
+ V4 signatures also hash in a final trailer of six octets: the
+ version of the Signature packet, i.e., 0x04; 0xFF; and a four-octet,
+ big-endian number that is the length of the hashed data from the
+ Signature packet (note that this number does not include these final
+ six octets).
+ """
+
+ hcontext = bytearray()
+ hcontext.append(self._signature.header.version if not self.embedded else self._signature._sig.header.version)
+ hcontext.append(self.type)
+ hcontext.append(self.key_algorithm)
+ hcontext.append(self.hash_algorithm)
+ hcontext += self._signature.subpackets.__hashbytearray__()
+ hlen = len(hcontext)
+ _data += hcontext
+ _data += b'\x04\xff'
+ _data += self.int_to_bytes(hlen, 4)
+ return bytes(_data)
+
+ def make_onepass(self):
+ onepass = OnePassSignatureV3()
+ onepass.sigtype = self.type
+ onepass.halg = self.hash_algorithm
+ onepass.pubalg = self.key_algorithm
+ onepass.signer = self.signer
+ onepass.update_hlen()
+ return onepass
+
+ def parse(self, packet):
+ unarmored = self.ascii_unarmor(packet)
+ data = unarmored['body']
+
+ if unarmored['magic'] is not None and unarmored['magic'] != 'SIGNATURE':
+ raise ValueError('Expected: SIGNATURE. Got: {}'.format(str(unarmored['magic'])))
+
+ if unarmored['headers'] is not None:
+ self.ascii_headers = unarmored['headers']
+
+ # load *one* packet from data
+ pkt = Packet(data)
+ if pkt.header.tag == PacketTag.Signature and not isinstance(pkt, Opaque):
+ self._signature = pkt
+
+ else:
+ raise ValueError('Expected: Signature. Got: {:s}'.format(pkt.__class__.__name__))
+
+
+class PGPUID(ParentRef):
+ @property
+ def __sig__(self):
+ return list(self._signatures)
+
+ @property
+ def name(self):
+ """If this is a User ID, the stored name. If this is not a User ID, this will be an empty string."""
+ return self._uid.name if isinstance(self._uid, UserID) else ""
+
+ @property
+ def comment(self):
+ """
+ If this is a User ID, this will be the stored comment. If this is not a User ID, or there is no stored comment,
+ this will be an empty string.,
+ """
+
+ return self._uid.comment if isinstance(self._uid, UserID) else ""
+
+ @property
+ def email(self):
+ """
+ If this is a User ID, this will be the stored email address. If this is not a User ID, or there is no stored
+ email address, this will be an empty string.
+ """
+ return self._uid.email if isinstance(self._uid, UserID) else ""
+
+ @property
+ def image(self):
+ """
+ If this is a User Attribute, this will be the stored image. If this is not a User Attribute, this will be ``None``.
+ """
+ return self._uid.image.image if isinstance(self._uid, UserAttribute) else None
+
+ @property
+ def is_primary(self):
+ """
+ If the most recent, valid self-signature specifies this as being primary, this will be True. Otherwise, Faqlse.
+ """
+ return bool(next(iter(self.selfsig._signature.subpackets['h_PrimaryUserID']), False))
+
+ @property
+ def is_uid(self):
+ """
+ ``True`` if this is a User ID, otherwise False.
+ """
+ return isinstance(self._uid, UserID)
+
+ @property
+ def is_ua(self):
+ """
+ ``True`` if this is a User Attribute, otherwise False.
+ """
+ return isinstance(self._uid, UserAttribute)
+
+ @property
+ def selfsig(self):
+ """
+ This will be the most recent, self-signature of this User ID or Attribute. If there isn't one, this will be ``None``.
+ """
+ if self.parent is not None:
+ return next((sig for sig in reversed(self._signatures) if sig.signer == self.parent.fingerprint.keyid), None)
+
+ @property
+ def signers(self):
+ """
+ This will be a set of all of the key ids which have signed this User ID or Attribute.
+ """
+ return set(s.signer for s in self.__sig__)
+
+ @property
+ def hashdata(self):
+ if self.is_uid:
+ return self._uid.__bytearray__()[len(self._uid.header):]
+
+ if self.is_ua:
+ return self._uid.subpackets.__bytearray__()
+
+ @classmethod
+ def new(cls, pn, comment="", email=""):
+ """
+ Create a new User ID or photo.
+
+ :param pn: User ID name, or photo. If this is a ``bytearray``, it will be loaded as a photo.
+ Otherwise, it will be used as the name field for a User ID.
+ :type pn: ``bytearray``, ``str``, ``unicode``
+ :param comment: The comment field for a User ID. Ignored if this is a photo.
+ :type comment: ``str``, ``unicode``
+ :param email: The email address field for a User ID. Ignored if this is a photo.
+ :type email: ``str``, ``unicode``
+ :returns: :py:obj:`PGPUID`
+ """
+ uid = PGPUID()
+ if isinstance(pn, bytearray):
+ uid._uid = UserAttribute()
+ uid._uid.image.image = pn
+ uid._uid.image.iencoding = ImageEncoding.encodingof(pn)
+ uid._uid.update_hlen()
+
+ else:
+ uid._uid = UserID()
+ uid._uid.name = pn
+ uid._uid.comment = comment
+ uid._uid.email = email
+ uid._uid.update_hlen()
+
+ return uid
+
+ def __init__(self):
+ """
+ PGPUID objects represent User IDs and User Attributes for keys.
+
+ PGPUID implements the ``__format__`` method for User IDs, returning a string in the format
+ 'name (comment) <email>', leaving out any comment or email fields that are not present.
+ """
+ super(PGPUID, self).__init__()
+ self._uid = None
+ self._signatures = SorteDeque()
+
+ def __repr__(self):
+ if self.selfsig is not None:
+ return "<PGPUID [{:s}][{}] at 0x{:02X}>".format(self._uid.__class__.__name__, self.selfsig.created, id(self))
+ return "<PGPUID [{:s}] at 0x{:02X}>".format(self._uid.__class__.__name__, id(self))
+
+ def __lt__(self, other): # pragma: no cover
+ if self.is_uid == other.is_uid:
+ if self.is_primary == other.is_primary:
+ return self.selfsig > other.selfsig
+
+ if self.is_primary:
+ return True
+
+ return False
+
+ if self.is_uid and other.is_ua:
+ return True
+
+ if self.is_ua and other.is_uid:
+ return False
+
+ def __or__(self, other):
+ if isinstance(other, PGPSignature):
+ self._signatures.insort(other)
+ if self.parent is not None and self in self.parent._uids:
+ self.parent._uids.resort(self)
+
+ return self
+
+ if isinstance(other, UserID) and self._uid is None:
+ self._uid = other
+ return self
+
+ if isinstance(other, UserAttribute) and self._uid is None:
+ self._uid = other
+ return self
+
+ raise TypeError("unsupported operand type(s) for |: '{:s}' and '{:s}'"
+ "".format(self.__class__.__name__, other.__class__.__name__))
+
+ def __copy__(self):
+ # because the default shallow copy isn't actually all that useful,
+ # and deepcopy does too much work
+ uid = PGPUID()
+ uid |= copy.copy(self._uid)
+ for sig in self._signatures:
+ uid |= copy.copy(sig)
+ return uid
+
+ def __format__(self, format_spec):
+ if self.is_uid:
+ comment = six.u("") if self.comment == "" else six.u(" ({:s})").format(self.comment)
+ email = six.u("") if self.email == "" else six.u(" <{:s}>").format(self.email)
+ return six.u("{:s}{:s}{:s}").format(self.name, comment, email)
+
+ raise NotImplementedError
+
+
+class PGPMessage(Armorable, PGPObject):
+ @staticmethod
+ def dash_unescape(text):
+ return re.subn(r'^- -', '-', text, flags=re.MULTILINE)[0]
+
+ @staticmethod
+ def dash_escape(text):
+ return re.subn(r'^-', '- -', text, flags=re.MULTILINE)[0]
+
+ @property
+ def encrypters(self):
+ """A ``set`` containing all key ids (if any) to which this message was encrypted."""
+ return set(m.encrypter for m in self._sessionkeys if isinstance(m, PKESessionKey))
+
+ @property
+ def filename(self):
+ """If applicable, returns the original filename of the message. Otherwise, returns an empty string."""
+ if self.type == 'literal':
+ return self._message.filename
+ return ''
+
+ @property
+ def is_compressed(self):
+ """``True`` if this message will be compressed when exported"""
+ return self._compression != CompressionAlgorithm.Uncompressed
+
+ @property
+ def is_encrypted(self):
+ """``True`` if this message is encrypted; otherwise, ``False``"""
+ return isinstance(self._message, (SKEData, IntegrityProtectedSKEData))
+
+ @property
+ def is_sensitive(self):
+ """``True`` if this message is marked sensitive; otherwise ``False``"""
+ return self.type == 'literal' and self._message.filename == '_CONSOLE'
+
+ @property
+ def is_signed(self):
+ """
+ ``True`` if this message is signed; otherwise, ``False``.
+ Should always be ``False`` if the message is encrypted.
+ """
+ return len(self._signatures) > 0
+
+ @property
+ def issuers(self):
+ """A ``set`` containing all key ids (if any) which have signed or encrypted this message."""
+ return self.encrypters | self.signers
+
+ @property
+ def magic(self):
+ if self.type == 'cleartext':
+ return "SIGNATURE"
+ return "MESSAGE"
+
+ @property
+ def message(self):
+ """The message contents"""
+ if self.type == 'cleartext':
+ return self.bytes_to_text(self._message)
+
+ if self.type == 'literal':
+ return self._message.contents
+
+ if self.type == 'encrypted':
+ return self._message
+
+ @property
+ def signatures(self):
+ """A ``set`` containing all key ids (if any) which have signed this message."""
+ return list(self._signatures)
+
+ @property
+ def signers(self):
+ """A ``set`` containing all key ids (if any) which have signed this message."""
+ return set(m.signer for m in self._signatures)
+
+ @property
+ def type(self):
+ ##TODO: it might be better to use an Enum for the output of this
+ if isinstance(self._message, (six.string_types, six.binary_type, bytearray)):
+ return 'cleartext'
+
+ if isinstance(self._message, LiteralData):
+ return 'literal'
+
+ if isinstance(self._message, (SKEData, IntegrityProtectedSKEData)):
+ return 'encrypted'
+
+ raise NotImplementedError
+
+ def __init__(self):
+ """
+ PGPMessage objects represent OpenPGP message compositions.
+
+ PGPMessage implements the `__str__` method, the output of which will be the message composition in
+ OpenPGP-compliant ASCII-armored format.
+
+ PGPMessage implements the `__bytes__` method, the output of which will be the message composition in
+ OpenPGP-compliant binary format.
+
+ Any signatures within the PGPMessage that are marked as being non-exportable will not be included in the output
+ of either of those methods.
+ """
+ super(PGPMessage, self).__init__()
+ self._compression = CompressionAlgorithm.Uncompressed
+ self._message = None
+ self._mdc = None
+ self._signatures = SorteDeque()
+ self._sessionkeys = []
+
+ def __bytearray__(self):
+ if self.is_compressed:
+ comp = CompressedData()
+ comp.calg = self._compression
+ comp.packets = [pkt for pkt in self]
+ comp.update_hlen()
+ return comp.__bytearray__()
+
+ _bytes = bytearray()
+ for pkt in self:
+ _bytes += pkt.__bytearray__()
+ return _bytes
+
+ def __str__(self):
+ if self.type == 'cleartext':
+ tmpl = u"-----BEGIN PGP SIGNED MESSAGE-----\n" \
+ u"{hhdr:s}\n" \
+ u"{cleartext:s}\n" \
+ u"{signature:s}"
+
+ # only add a Hash: header if we actually have at least one signature
+ hashes = set(s.hash_algorithm.name for s in self.signatures)
+ hhdr = 'Hash: {hashes:s}\n'.format(hashes=','.join(sorted(hashes))) if hashes else ''
+
+ return tmpl.format(hhdr=hhdr,
+ cleartext=self.dash_escape(self.bytes_to_text(self._message)),
+ signature=super(PGPMessage, self).__str__())
+
+ return super(PGPMessage, self).__str__()
+
+ def __iter__(self):
+ if self.type == 'cleartext':
+ for sig in self._signatures:
+ yield sig
+
+ elif self.is_encrypted:
+ for pkt in self._sessionkeys:
+ yield pkt
+ yield self.message
+
+ else:
+ ##TODO: is it worth coming up with a way of disabling one-pass signing?
+ for sig in self._signatures:
+ ops = sig.make_onepass()
+ if sig is not self._signatures[-1]:
+ ops.nested = True
+ yield ops
+
+ yield self._message
+ if self._mdc is not None: # pragma: no cover
+ yield self._mdc
+
+ for sig in self._signatures:
+ yield sig
+
+ def __or__(self, other):
+ if isinstance(other, Marker):
+ return self
+
+ if isinstance(other, CompressedData):
+ self._compression = other.calg
+ for pkt in other.packets:
+ self |= pkt
+ return self
+
+ if isinstance(other, (six.string_types, six.binary_type, bytearray)):
+ if self._message is None:
+ self._message = self.text_to_bytes(other)
+ return self
+
+ if isinstance(other, (LiteralData, SKEData, IntegrityProtectedSKEData)):
+ if self._message is None:
+ self._message = other
+ return self
+
+ if isinstance(other, MDC):
+ if self._mdc is None:
+ self._mdc = other
+ return self
+
+ if isinstance(other, OnePassSignature):
+ # these are "generated" on the fly during composition
+ return self
+
+ if isinstance(other, Signature):
+ other = PGPSignature() | other
+
+ if isinstance(other, PGPSignature):
+ self._signatures.insort(other)
+ return self
+
+ if isinstance(other, (PKESessionKey, SKESessionKey)):
+ self._sessionkeys.append(other)
+ return self
+
+ if isinstance(other, PGPMessage):
+ self._message = other._message
+ self._mdc = other._mdc
+ self._compression = other._compression
+ self._sessionkeys += other._sessionkeys
+ self._signatures += other._signatures
+ return self
+
+ raise NotImplementedError(str(type(other)))
+
+ def __copy__(self):
+ msg = super(PGPMessage, self).__copy__()
+ msg._compression = self._compression
+ msg._message = copy.copy(self._message)
+ msg._mdc = copy.copy(self._mdc)
+
+ for sig in self._signatures:
+ msg |= copy.copy(sig)
+
+ for sk in self._sessionkeys:
+ msg |= copy.copy(sk)
+
+ return msg
+
+ @classmethod
+ def new(cls, message, **kwargs):
+ """
+ Create a new PGPMessage object.
+
+ :param message: The message to be stored.
+ :type message: ``str``, ``unicode``, ``bytes``, ``bytearray``
+ :returns: :py:obj:`PGPMessage`
+
+ The following optional keyword arguments can be used with :py:meth:`PGPMessage.new`:
+
+ :keyword file: if True, ``message`` should be a path to a file. The contents of that file will be read and used
+ as the contents of the message.
+ :type file: ``bool``
+ :keyword cleartext: if True, the message will be cleartext with inline signatures.
+ :type cleartext: ``bool``
+ :keyword sensitive: if True, the filename will be set to '_CONSOLE' to signal other OpenPGP clients to treat
+ this message as being 'for your eyes only'. Ignored if cleartext is True.
+ :type sensitive: ``bool``
+ :keyword format: Set the message format identifier. Ignored if cleartext is True.
+ :type format: ``str``
+ :keyword compression: Set the compression algorithm for the new message.
+ Defaults to :py:obj:`CompressionAlgorithm.ZIP`. Ignored if cleartext is True.
+ :keyword encoding: Set the Charset header for the message.
+ :type encoding: ``str`` representing a valid codec in codecs
+ """
+ # TODO: have 'codecs' above (in :type encoding:) link to python documentation page on codecs
+ cleartext = kwargs.pop('cleartext', False)
+ format = kwargs.pop('format', None)
+ sensitive = kwargs.pop('sensitive', False)
+ compression = kwargs.pop('compression', CompressionAlgorithm.ZIP)
+ file = kwargs.pop('file', False)
+ charset = kwargs.pop('encoding', None)
+
+ filename = ''
+ mtime = datetime.utcnow()
+
+ msg = PGPMessage()
+
+ if charset:
+ msg.charset = charset
+
+ # if format in 'tu' and isinstance(message, (six.binary_type, bytearray)):
+ # # if message format is text or unicode and we got binary data, we'll need to transcode it to UTF-8
+ # message =
+
+ if file and os.path.isfile(message):
+ filename = message
+ message = bytearray(os.path.getsize(filename))
+ mtime = datetime.utcfromtimestamp(os.path.getmtime(filename))
+
+ with open(filename, 'rb') as mf:
+ mf.readinto(message)
+
+ # if format is None, we can try to detect it
+ if format is None:
+ if isinstance(message, six.text_type):
+ # message is definitely UTF-8 already
+ format = 'u'
+
+ elif cls.is_ascii(message):
+ # message is probably text
+ format = 't'
+
+ else:
+ # message is probably binary
+ format = 'b'
+
+ # if message is a binary type and we're building a textual message, we need to transcode the bytes to UTF-8
+ if isinstance(message, (six.binary_type, bytearray)) and (cleartext or format in 'tu'):
+ message = message.decode(charset or 'utf-8')
+
+ if cleartext:
+ msg |= message
+
+ else:
+ # load literal data
+ lit = LiteralData()
+ lit._contents = bytearray(msg.text_to_bytes(message))
+ lit.filename = '_CONSOLE' if sensitive else os.path.basename(filename)
+ lit.mtime = mtime
+ lit.format = format
+
+ # if cls.is_ascii(message):
+ # lit.format = 't'
+
+ lit.update_hlen()
+
+ msg |= lit
+ msg._compression = compression
+
+ return msg
+
+ def encrypt(self, passphrase, sessionkey=None, **prefs):
+ """
+ Encrypt the contents of this message using a passphrase.
+ :param passphrase: The passphrase to use for encrypting this message.
+ :type passphrase: ``str``, ``unicode``, ``bytes``
+
+ :optional param sessionkey: Provide a session key to use when encrypting something. Default is ``None``.
+ If ``None``, a session key of the appropriate length will be generated randomly.
+
+ .. warning::
+
+ Care should be taken when making use of this option! Session keys *absolutely need*
+ to be unpredictable! Use the ``gen_key()`` method on the desired
+ :py:obj:`~constants.SymmetricKeyAlgorithm` to generate the session key!
+
+ :type sessionkey: ``bytes``, ``str``
+ :raises: :py:exc:`~errors.PGPEncryptionError`
+ :returns: A new :py:obj:`PGPMessage` containing the encrypted contents of this message.
+ """
+ cipher_algo = prefs.pop('cipher', SymmetricKeyAlgorithm.AES256)
+ hash_algo = prefs.pop('hash', HashAlgorithm.SHA256)
+
+ # set up a new SKESessionKeyV4
+ skesk = SKESessionKeyV4()
+ skesk.s2k.usage = 255
+ skesk.s2k.specifier = 3
+ skesk.s2k.halg = hash_algo
+ skesk.s2k.encalg = cipher_algo
+ skesk.s2k.count = skesk.s2k.halg.tuned_count
+
+ if sessionkey is None:
+ sessionkey = cipher_algo.gen_key()
+ skesk.encrypt_sk(passphrase, sessionkey)
+ del passphrase
+
+ msg = PGPMessage() | skesk
+
+ if not self.is_encrypted:
+ skedata = IntegrityProtectedSKEDataV1()
+ skedata.encrypt(sessionkey, cipher_algo, self.__bytes__())
+ msg |= skedata
+
+ else:
+ msg |= self
+
+ return msg
+
+ def decrypt(self, passphrase):
+ """
+ Attempt to decrypt this message using a passphrase.
+
+ :param passphrase: The passphrase to use to attempt to decrypt this message.
+ :type passphrase: ``str``, ``unicode``, ``bytes``
+ :raises: :py:exc:`~errors.PGPDecryptionError` if decryption failed for any reason.
+ :returns: A new :py:obj:`PGPMessage` containing the decrypted contents of this message
+ """
+ if not self.is_encrypted:
+ raise PGPError("This message is not encrypted!")
+
+ for skesk in iter(sk for sk in self._sessionkeys if isinstance(sk, SKESessionKey)):
+ try:
+ symalg, key = skesk.decrypt_sk(passphrase)
+ decmsg = PGPMessage()
+ decmsg.parse(self.message.decrypt(key, symalg))
+
+ except (TypeError, ValueError, NotImplementedError, PGPDecryptionError):
+ continue
+
+ else:
+ del passphrase
+ break
+
+ else:
+ raise PGPDecryptionError("Decryption failed")
+
+ return decmsg
+
+ def parse(self, packet):
+ unarmored = self.ascii_unarmor(packet)
+ data = unarmored['body']
+
+ if unarmored['magic'] is not None and unarmored['magic'] not in ['MESSAGE', 'SIGNATURE']:
+ raise ValueError('Expected: MESSAGE. Got: {}'.format(str(unarmored['magic'])))
+
+ if unarmored['headers'] is not None:
+ self.ascii_headers = unarmored['headers']
+
+ # cleartext signature
+ if unarmored['magic'] == 'SIGNATURE':
+ # the composition for this will be the 'cleartext' as a str,
+ # followed by one or more signatures (each one loaded into a PGPSignature)
+ self |= self.dash_unescape(unarmored['cleartext'])
+ while len(data) > 0:
+ pkt = Packet(data)
+ if not isinstance(pkt, Signature): # pragma: no cover
+ warnings.warn("Discarded unexpected packet: {:s}".format(pkt.__class__.__name__), stacklevel=2)
+ continue
+ self |= PGPSignature() | pkt
+
+ else:
+ while len(data) > 0:
+ self |= Packet(data)
+
+
+class PGPKey(Armorable, ParentRef, PGPObject):
+ """
+ 11.1. Transferable Public Keys
+
+ OpenPGP users may transfer public keys. The essential elements of a
+ transferable public key are as follows:
+
+ - One Public-Key packet
+
+ - Zero or more revocation signatures
+ - One or more User ID packets
+
+ - After each User ID packet, zero or more Signature packets
+ (certifications)
+
+ - Zero or more User Attribute packets
+
+ - After each User Attribute packet, zero or more Signature packets
+ (certifications)
+
+ - Zero or more Subkey packets
+
+ - After each Subkey packet, one Signature packet, plus optionally a
+ revocation
+
+ The Public-Key packet occurs first. Each of the following User ID
+ packets provides the identity of the owner of this public key. If
+ there are multiple User ID packets, this corresponds to multiple
+ means of identifying the same unique individual user; for example, a
+ user may have more than one email address, and construct a User ID
+ for each one.
+
+ Immediately following each User ID packet, there are zero or more
+ Signature packets. Each Signature packet is calculated on the
+ immediately preceding User ID packet and the initial Public-Key
+ packet. The signature serves to certify the corresponding public key
+ and User ID. In effect, the signer is testifying to his or her
+ belief that this public key belongs to the user identified by this
+ User ID.
+
+ Within the same section as the User ID packets, there are zero or
+ more User Attribute packets. Like the User ID packets, a User
+ Attribute packet is followed by zero or more Signature packets
+ calculated on the immediately preceding User Attribute packet and the
+ initial Public-Key packet.
+
+ User Attribute packets and User ID packets may be freely intermixed
+ in this section, so long as the signatures that follow them are
+ maintained on the proper User Attribute or User ID packet.
+
+ After the User ID packet or Attribute packet, there may be zero or
+ more Subkey packets. In general, subkeys are provided in cases where
+ the top-level public key is a signature-only key. However, any V4
+ key may have subkeys, and the subkeys may be encryption-only keys,
+ signature-only keys, or general-purpose keys. V3 keys MUST NOT have
+ subkeys.
+
+ Each Subkey packet MUST be followed by one Signature packet, which
+ should be a subkey binding signature issued by the top-level key.
+ For subkeys that can issue signatures, the subkey binding signature
+ MUST contain an Embedded Signature subpacket with a primary key
+ binding signature (0x19) issued by the subkey on the top-level key.
+
+ Subkey and Key packets may each be followed by a revocation Signature
+ packet to indicate that the key is revoked. Revocation signatures
+ are only accepted if they are issued by the key itself, or by a key
+ that is authorized to issue revocations via a Revocation Key
+ subpacket in a self-signature by the top-level key.
+
+ Transferable public-key packet sequences may be concatenated to allow
+ transferring multiple public keys in one operation.
+
+ 11.2. Transferable Secret Keys
+
+ OpenPGP users may transfer secret keys. The format of a transferable
+ secret key is the same as a transferable public key except that
+ secret-key and secret-subkey packets are used instead of the public
+ key and public-subkey packets. Implementations SHOULD include self-
+ signatures on any user IDs and subkeys, as this allows for a complete
+ public key to be automatically extracted from the transferable secret
+ key. Implementations MAY choose to omit the self-signatures,
+ especially if a transferable public key accompanies the transferable
+ secret key.
+ """
+ @property
+ def __key__(self):
+ return self._key.keymaterial
+
+ @property
+ def __sig__(self):
+ return list(self._signatures)
+
+ @property
+ def created(self):
+ """A :py:obj:`~datetime.datetime` object of the creation date and time of the key, in UTC."""
+ return self._key.created
+
+ @property
+ def expires_at(self):
+ """A :py:obj:`~datetime.datetime` object of when this key is to be considered expired, if any. Otherwise, ``None``"""
+ try:
+ expires = min(sig.key_expiration for sig in itertools.chain(iter(uid.selfsig for uid in self.userids), self.self_signatures)
+ if sig.key_expiration is not None)
+
+ except ValueError:
+ return None
+
+ else:
+ return (self.created + expires)
+
+ @property
+ def fingerprint(self):
+ """The fingerprint of this key, as a :py:obj:`~pgpy.types.Fingerprint` object."""
+ if self._key:
+ return self._key.fingerprint
+
+ @property
+ def hashdata(self):
+ # when signing a key, only the public portion of the keys is hashed
+ # if this is a private key, the private components of the key material need to be left out
+ if self.is_public:
+ return self._key.__bytearray__()[len(self._key.header):]
+
+ pub = self._key.pubkey()
+ return pub.__bytearray__()[len(pub.header):]
+
+ @property
+ def is_expired(self):
+ """``True`` if this key is expired, otherwise ``False``"""
+ expires = self.expires_at
+ if expires is not None:
+ return expires <= datetime.utcnow()
+
+ return False
+
+ @property
+ def is_primary(self):
+ """``True`` if this is a primary key; ``False`` if this is a subkey"""
+ return isinstance(self._key, Primary) and not isinstance(self._key, Sub)
+
+ @property
+ def is_protected(self):
+ """``True`` if this is a private key that is protected with a passphrase, otherwise ``False``"""
+ if self.is_public:
+ return False
+
+ return self._key.protected
+
+ @property
+ def is_public(self):
+ """``True`` if this is a public key, otherwise ``False``"""
+ return isinstance(self._key, Public) and not isinstance(self._key, Private)
+
+ @property
+ def is_unlocked(self):
+ """``False`` if this is a private key that is protected with a passphrase and has not yet been unlocked, otherwise ``True``"""
+ if self.is_public:
+ return True
+
+ if not self.is_protected:
+ return True
+
+ return self._key.unlocked
+
+ @property
+ def key_algorithm(self):
+ """The :py:obj:`constants.PubKeyAlgorithm` pertaining to this key"""
+ return self._key.pkalg
+
+ @property
+ def key_size(self):
+ """*new in 0.4.1*
+ The size pertaining to this key. ``int`` for non-EC key algorithms; :py:obj:`constants.EllipticCurveOID` for EC keys.
+ """
+ if self.key_algorithm in {PubKeyAlgorithm.ECDSA, PubKeyAlgorithm.ECDH}:
+ return self._key.keymaterial.oid
+ return next(iter(self._key.keymaterial)).bit_length()
+
+ @property
+ def magic(self):
+ return '{:s} KEY BLOCK'.format('PUBLIC' if (isinstance(self._key, Public) and not isinstance(self._key, Private)) else
+ 'PRIVATE' if isinstance(self._key, Private) else '')
+
+ @property
+ def pubkey(self):
+ """If the :py:obj:`PGPKey` object is a private key, this method returns a corresponding public key object with
+ all the trimmings. Otherwise, returns ``None``
+ """
+ if not self.is_public:
+ if self._sibling is None or isinstance(self._sibling, weakref.ref):
+ # create a new key shell
+ pub = PGPKey()
+ pub.ascii_headers = self.ascii_headers.copy()
+
+ # get the public half of the primary key
+ pub._key = self._key.pubkey()
+
+ # get the public half of each subkey
+ for skid, subkey in self.subkeys.items():
+ pub.subkeys[skid] = subkey.pubkey
+
+ # copy user ids and user attributes
+ for uid in self._uids:
+ pub |= copy.copy(uid)
+
+ # copy signatures that weren't copied with uids
+ for sig in self._signatures:
+ if sig.parent is None:
+ pub |= copy.copy(sig)
+
+ # keep connect the two halves using a weak reference
+ self._sibling = weakref.ref(pub)
+ pub._sibling = weakref.ref(self)
+
+ return self._sibling()
+ return None
+
+ @pubkey.setter
+ def pubkey(self, pubkey):
+ if self.is_public:
+ raise TypeError("cannot add public sibling to pubkey")
+
+ if not pubkey.is_public:
+ raise TypeError("sibling must be public")
+
+ if self._sibling is not None and self._sibling() is not None:
+ raise ValueError("public key reference already set")
+
+ if pubkey.fingerprint != self.fingerprint:
+ raise ValueError("key fingerprint mismatch")
+
+ # TODO: sync packets with sibling
+ self._sibling = weakref.ref(pubkey)
+ pubkey._sibling = weakref.ref(self)
+
+ @property
+ def self_signatures(self):
+ keyid, keytype = (self.fingerprint.keyid, SignatureType.DirectlyOnKey) if self.is_primary \
+ else (self.parent.fingerprint.keyid, SignatureType.Subkey_Binding)
+
+ ##TODO: filter out revoked signatures as well
+ for sig in iter(sig for sig in self._signatures
+ if all([sig.type == keytype, sig.signer == keyid, not sig.is_expired])):
+ yield sig
+
+ @property
+ def signers(self):
+ """A ``set`` of key ids of keys that were used to sign this key"""
+ return {sig.signer for sig in self.__sig__}
+
+ @property
+ def subkeys(self):
+ """An :py:obj:`~collections.OrderedDict` of subkeys bound to this primary key, if applicable,
+ selected by 16-character keyid."""
+ return self._children
+
+ @property
+ def userids(self):
+ """A ``list`` of :py:obj:`PGPUID` objects containing User ID information about this key"""
+ return [ u for u in self._uids if u.is_uid ]
+
+ @property
+ def userattributes(self):
+ """A ``list`` of :py:obj:`PGPUID` objects containing one or more images associated with this key"""
+ return [u for u in self._uids if u.is_ua]
+
+ @classmethod
+ def new(cls, key_algorithm, key_size):
+ """
+ Generate a new PGP key
+
+ :param key_algorithm: Key algorithm to use.
+ :type key_algorithm: A :py:obj:`~constants.PubKeyAlgorithm`
+ :param key_size: Key size in bits, unless `key_algorithm` is :py:obj:`~constants.PubKeyAlgorithm.ECDSA` or
+ :py:obj:`~constants.PubKeyAlgorithm.ECDH`, in which case it should be the Curve OID to use.
+ :type key_size: ``int`` or :py:obj:`~constants.EllipticCurveOID`
+ :return: A newly generated :py:obj:`PGPKey`
+ """
+ # new private key shell first
+ key = PGPKey()
+
+ if key_algorithm in {PubKeyAlgorithm.RSAEncrypt, PubKeyAlgorithm.RSASign}: # pragma: no cover
+ warnings.warn('{:s} is deprecated - generating key using RSAEncryptOrSign'.format(key_algorithm.name))
+ key_algorithm = PubKeyAlgorithm.RSAEncryptOrSign
+
+ # generate some key data to match key_algorithm and key_size
+ key._key = PrivKeyV4.new(key_algorithm, key_size)
+
+ return key
+
+ def __init__(self):
+ """
+ PGPKey objects represent OpenPGP compliant keys along with all of their associated data.
+
+ PGPKey implements the `__str__` method, the output of which will be the key composition in
+ OpenPGP-compliant ASCII-armored format.
+
+ PGPKey implements the `__bytes__` method, the output of which will be the key composition in
+ OpenPGP-compliant binary format.
+
+ Any signatures within the PGPKey that are marked as being non-exportable will not be included in the output
+ of either of those methods.
+ """
+ super(PGPKey, self).__init__()
+ self._key = None
+ self._children = collections.OrderedDict()
+ self._signatures = SorteDeque()
+ self._uids = SorteDeque()
+ self._sibling = None
+
+ def __bytearray__(self):
+ _bytes = bytearray()
+ # us
+ _bytes += self._key.__bytearray__()
+ # our signatures; ignore embedded signatures
+ for sig in iter(s for s in self._signatures if not s.embedded and s.exportable):
+ _bytes += sig.__bytearray__()
+ # one or more User IDs, followed by their signatures
+ for uid in self._uids:
+ _bytes += uid._uid.__bytearray__()
+ for s in [s for s in uid._signatures if s.exportable]:
+ _bytes += s.__bytearray__()
+ # subkeys
+ for sk in self._children.values():
+ _bytes += sk.__bytearray__()
+
+ return _bytes
+
+ def __repr__(self):
+ if self._key is not None:
+ return "<PGPKey [{:s}][0x{:s}] at 0x{:02X}>" \
+ "".format(self._key.__class__.__name__, self.fingerprint.keyid, id(self))
+
+ return "<PGPKey [unknown] at 0x{:02X}>" \
+ "".format(id(self))
+
+ def __contains__(self, item):
+ if isinstance(item, PGPKey): # pragma: no cover
+ return item.fingerprint.keyid in self.subkeys
+
+ if isinstance(item, Fingerprint): # pragma: no cover
+ return item.keyid in self.subkeys
+
+ if isinstance(item, PGPUID):
+ return item in self._uids
+
+ if isinstance(item, PGPSignature):
+ return item in self._signatures
+
+ raise TypeError
+
+ def __or__(self, other, from_sib=False):
+ if isinstance(other, Key) and self._key is None:
+ self._key = other
+
+ elif isinstance(other, PGPKey) and not other.is_primary and other.is_public == self.is_public:
+ other._parent = self
+ self._children[other.fingerprint.keyid] = other
+
+ elif isinstance(other, PGPSignature):
+ self._signatures.insort(other)
+
+ # if this is a subkey binding signature that has embedded primary key binding signatures, add them to parent
+ if other.type == SignatureType.Subkey_Binding:
+ for es in iter(pkb for pkb in other._signature.subpackets['EmbeddedSignature']):
+ esig = PGPSignature() | es
+ esig._parent = other
+ self._signatures.insort(esig)
+
+ elif isinstance(other, PGPUID):
+ other._parent = weakref.ref(self)
+ self._uids.insort(other)
+
+ else:
+ raise TypeError("unsupported operand type(s) for |: '{:s}' and '{:s}'"
+ "".format(self.__class__.__name__, other.__class__.__name__))
+
+ if isinstance(self._sibling, weakref.ref) and not from_sib:
+ sib = self._sibling()
+ if sib is None:
+ self._sibling = None
+
+ else: # pragma: no cover
+ sib.__or__(copy.copy(other), True)
+
+ return self
+
+ def __copy__(self):
+ key = super(PGPKey, self).__copy__()
+ key._key = copy.copy(self._key)
+
+ for uid in self._uids:
+ key |= copy.copy(uid)
+
+ for id, subkey in self._children.items():
+ key |= copy.copy(subkey)
+
+ for sig in self._signatures:
+ if sig.embedded:
+ # embedded signatures don't need to be explicitly copied
+ continue
+
+ print(len(key._signatures))
+ key |= copy.copy(sig)
+
+ return key
+
+ def protect(self, passphrase, enc_alg, hash_alg):
+ """
+ Add a passphrase to a private key. If the key is already passphrase protected, it should be unlocked before
+ a new passphrase can be specified.
+
+ Has no effect on public keys.
+
+ :param passphrase: A passphrase to protect the key with
+ :type passphrase: ``str``, ``unicode``
+ :param enc_alg: Symmetric encryption algorithm to use to protect the key
+ :type enc_alg: :py:obj:`~constants.SymmetricKeyAlgorithm`
+ :param hash_alg: Hash algorithm to use in the String-to-Key specifier
+ :type hash_alg: :py:obj:`~constants.HashAlgorithm`
+ """
+ ##TODO: specify strong defaults for enc_alg and hash_alg
+ if self.is_public:
+ # we can't protect public keys because only private key material is ever protected
+ warnings.warn("Public keys cannot be passphrase-protected", stacklevel=2)
+ return
+
+ if self.is_protected and not self.is_unlocked:
+ # we can't protect a key that is already protected unless it is unlocked first
+ warnings.warn("This key is already protected with a passphrase - "
+ "please unlock it before attempting to specify a new passphrase", stacklevel=2)
+ return
+
+ for sk in itertools.chain([self], self.subkeys.values()):
+ sk._key.protect(passphrase, enc_alg, hash_alg)
+
+ del passphrase
+
+ @contextlib.contextmanager
+ def unlock(self, passphrase):
+ """
+ Context manager method for unlocking passphrase-protected private keys. Has no effect if the key is not both
+ private and passphrase-protected.
+
+ When the context managed block is exited, the unprotected private key material is removed.
+
+ Example::
+
+ privkey = PGPKey()
+ privkey.parse(keytext)
+
+ assert privkey.is_protected
+ assert privkey.is_unlocked is False
+ # privkey.sign("some text") <- this would raise an exception
+
+ with privkey.unlock("TheCorrectPassphrase"):
+ # privkey is now unlocked
+ assert privkey.is_unlocked
+ # so you can do things with it
+ sig = privkey.sign("some text")
+
+ # privkey is no longer unlocked
+ assert privkey.is_unlocked is False
+
+ Emits a :py:obj:`~warnings.UserWarning` if the key is public or not passphrase protected.
+
+ :param str passphrase: The passphrase to be used to unlock this key.
+ :raises: :py:exc:`~pgpy.errors.PGPDecryptionError` if the passphrase is incorrect
+ """
+ if self.is_public:
+ # we can't unprotect public keys because only private key material is ever protected
+ warnings.warn("Public keys cannot be passphrase-protected", stacklevel=3)
+ yield self
+ return
+
+ if not self.is_protected:
+ # we can't unprotect private keys that are not protected, because there is no ciphertext to decrypt
+ warnings.warn("This key is not protected with a passphrase", stacklevel=3)
+ yield self
+ return
+
+ try:
+ for sk in itertools.chain([self], self.subkeys.values()):
+ sk._key.unprotect(passphrase)
+ del passphrase
+ yield self
+
+ finally:
+ # clean up here by deleting the previously decrypted secret key material
+ for sk in itertools.chain([self], self.subkeys.values()):
+ sk._key.keymaterial.clear()
+
+ def add_uid(self, uid, selfsign=True, **prefs):
+ """
+ Add a User ID to this key.
+
+ :param uid: The user id to add
+ :type uid: :py:obj:`~pgpy.PGPUID`
+ :param selfsign: Whether or not to self-sign the user id before adding it
+ :type selfsign: ``bool``
+
+ Valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPKey.certify`.
+ Any such keyword arguments are ignored if selfsign is ``False``
+ """
+ uid._parent = self
+ if selfsign:
+ uid |= self.certify(uid, SignatureType.Positive_Cert, **prefs)
+
+ self |= uid
+
+ def get_uid(self, search):
+ """
+ Find and return a User ID that matches the search string given.
+
+ :param search: A text string to match name, comment, or email address against
+ :type search: ``str``, ``unicode``
+ :return: The first matching :py:obj:`~pgpy.PGPUID`, or ``None`` if no matches were found.
+ """
+ if self.is_primary:
+ return next((u for u in self._uids if search in filter(lambda a: a is not None, (u.name, u.comment, u.email))), None)
+ return self.parent.get_uid(search)
+
+ def del_uid(self, search):
+ """
+ Find and remove a user id that matches the search string given. This method does not modify the corresponding
+ :py:obj:`~pgpy.PGPUID` object; it only removes it from the list of user ids on the key.
+
+ :param search: A text string to match name, comment, or email address against
+ :type search: ``str``, ``unicode``
+ """
+ u = self.get_uid(search)
+
+ if u is None:
+ raise KeyError("uid '{:s}' not found".format(search))
+
+ u._parent = None
+ self._uids.remove(u)
+
+ def add_subkey(self, key, **prefs):
+ """
+ Add a key as a subkey to this key.
+ :param key: A private :py:obj:`~pgpy.PGPKey` that does not have any subkeys of its own
+
+ :keyword usage: A ``set`` of key usage flags, as :py:obj:`~constants.KeyFlags` for the subkey to be added.
+ :type usage: ``set``
+
+ Other valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPKey.certify`
+ """
+ if self.is_public:
+ raise PGPError("Cannot add a subkey to a public key. Add the subkey to the private component first!")
+
+ if key.is_public:
+ raise PGPError("Cannot add a public key as a subkey to this key")
+
+ if key.is_primary:
+ if len(key._children) > 0:
+ raise PGPError("Cannot add a key that already has subkeys as a subkey!")
+
+ # convert key into a subkey
+ npk = PrivSubKeyV4()
+ npk.pkalg = key._key.pkalg
+ npk.created = key._key.created
+ npk.keymaterial = key._key.keymaterial
+ key._key = npk
+ key._key.update_hlen()
+
+ self._children[key.fingerprint.keyid] = key
+ key._parent = self
+
+ ##TODO: skip this step if the key already has a subkey binding signature
+ bsig = self.bind(key, **prefs)
+ key |= bsig
+
+ def _get_key_flags(self, user=None):
+ if self.is_primary:
+ if user is not None:
+ user = self.get_uid(user)
+
+ elif len(self._uids) == 0:
+ return {KeyFlags.Certify}
+
+ else:
+ user = next(iter(self.userids))
+
+ # RFC 4880 says that primary keys *must* be capable of certification
+ return {KeyFlags.Certify} | user.selfsig.key_flags
+
+ return next(self.self_signatures).key_flags
+
+ def _sign(self, subject, sig, **prefs):
+ """
+ The actual signing magic happens here.
+ :param subject: The subject to sign
+ :param sig: The :py:obj:`PGPSignature` object the new signature is to be encapsulated within
+ :returns: ``sig``, after the signature is added to it.
+ """
+ user = prefs.pop('user', None)
+ uid = None
+ if user is not None:
+ uid = self.get_uid(user)
+
+ else:
+ uid = next(iter(self.userids), None)
+ if uid is None and self.parent is not None:
+ uid = next(iter(self.parent.userids), None)
+
+ if sig.hash_algorithm is None:
+ sig._signature.halg = uid.selfsig.hashprefs[0]
+
+ if uid is not None and sig.hash_algorithm not in uid.selfsig.hashprefs:
+ warnings.warn("Selected hash algorithm not in key preferences", stacklevel=4)
+
+ # signature options that can be applied at any level
+ expires = prefs.pop('expires', None)
+ notation = prefs.pop('notation', None)
+ revocable = prefs.pop('revocable', True)
+ policy_uri = prefs.pop('policy_uri', None)
+
+ if expires is not None:
+ # expires should be a timedelta, so if it's a datetime, turn it into a timedelta
+ if isinstance(expires, datetime):
+ expires = expires - self.created
+
+ sig._signature.subpackets.addnew('SignatureExpirationTime', hashed=True, expires=expires)
+
+ if revocable is False:
+ sig._signature.subpackets.addnew('Revocable', hashed=True, bflag=revocable)
+
+ if notation is not None:
+ for name, value in notation.items():
+ # mark all notations as human readable unless value is a bytearray
+ flags = NotationDataFlags.HumanReadable
+ if isinstance(value, bytearray):
+ flags = 0x00
+
+ sig._signature.subpackets.addnew('NotationData', hashed=True, flags=flags, name=name, value=value)
+
+ if policy_uri is not None:
+ sig._signature.subpackets.addnew('Policy', hashed=True, uri=policy_uri)
+
+ if user is not None and uid is not None:
+ signers_uid = "{:s}".format(uid)
+ sig._signature.subpackets.addnew('SignersUserID', hashed=True, userid=signers_uid)
+
+ # handle an edge case for timestamp signatures vs standalone signatures
+ if sig.type == SignatureType.Timestamp and len(sig._signature.subpackets._hashed_sp) > 1:
+ sig._signature.sigtype = SignatureType.Standalone
+
+ sigdata = sig.hashdata(subject)
+ h2 = sig.hash_algorithm.hasher
+ h2.update(sigdata)
+ sig._signature.hash2 = bytearray(h2.digest()[:2])
+
+ _sig = self._key.sign(sigdata, getattr(hashes, sig.hash_algorithm.name)())
+ if _sig is NotImplemented:
+ raise NotImplementedError(self.key_algorithm)
+
+ sig._signature.signature.from_signer(_sig)
+ sig._signature.update_hlen()
+
+ return sig
+
+ @KeyAction(KeyFlags.Sign, is_unlocked=True, is_public=False)
+ def sign(self, subject, **prefs):
+ """
+ Sign text, a message, or a timestamp using this key.
+
+ :param subject: The text to be signed
+ :type subject: ``str``, :py:obj:`~pgpy.PGPMessage`, ``None``
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public
+ :returns: :py:obj:`PGPSignature`
+
+ The following optional keyword arguments can be used with :py:meth:`PGPKey.sign`, as well as
+ :py:meth:`PGPKey.certify`, :py:meth:`PGPKey.revoke`, and :py:meth:`PGPKey.bind`:
+
+ :keyword expires: Set an expiration date for this signature
+ :type expires: :py:obj:`~datetime.datetime`, :py:obj:`~datetime.timedelta`
+ :keyword notation: Add arbitrary notation data to this signature.
+ :type notation: ``dict``
+ :keyword policy_uri: Add a URI to the signature that should describe the policy under which the signature
+ was issued.
+ :type policy_uri: ``str``
+ :keyword revocable: If ``False``, this signature will be marked non-revocable
+ :type revocable: ``bool``
+ :keyword user: Specify which User ID to use when creating this signature. Also adds a "Signer's User ID"
+ to the signature.
+ :type user: ``str``
+ """
+ sig_type = SignatureType.BinaryDocument
+ hash_algo = prefs.pop('hash', None)
+
+ if subject is None:
+ sig_type = SignatureType.Timestamp
+
+ if isinstance(subject, PGPMessage):
+ if subject.type == 'cleartext':
+ sig_type = SignatureType.CanonicalDocument
+
+ subject = subject.message
+
+ sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid)
+
+ return self._sign(subject, sig, **prefs)
+
+ @KeyAction(KeyFlags.Certify, is_unlocked=True, is_public=False)
+ def certify(self, subject, level=SignatureType.Generic_Cert, **prefs):
+ """
+ Sign a key or a user id within a key.
+
+ :param subject: The user id or key to be certified.
+ :type subject: :py:obj:`PGPKey`, :py:obj:`PGPUID`
+ :param level: :py:obj:`~constants.SignatureType.Generic_Cert`, :py:obj:`~constants.SignatureType.Persona_Cert`,
+ :py:obj:`~constants.SignatureType.Casual_Cert`, or :py:obj:`~constants.SignatureType.Positive_Cert`.
+ Only used if subject is a :py:obj:`PGPUID`; otherwise, it is ignored.
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public
+ :returns: :py:obj:`PGPSignature`
+
+ In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional
+ keyword arguments can be used with :py:meth:`PGPKey.certify`.
+
+ These optional keywords only make sense, and thus only have an effect, when self-signing a key or User ID:
+
+ :keyword usage: A ``set`` of key usage flags, as :py:obj:`~constants.KeyFlags`.
+ This keyword is ignored for non-self-certifications.
+ :type usage: ``set``
+ :keyword ciphers: A list of preferred symmetric ciphers, as :py:obj:`~constants.SymmetricKeyAlgorithm`.
+ This keyword is ignored for non-self-certifications.
+ :type ciphers: ``list``
+ :keyword hashes: A list of preferred hash algorithms, as :py:obj:`~constants.HashAlgorithm`.
+ This keyword is ignored for non-self-certifications.
+ :type hashes: ``list``
+ :keyword compression: A list of preferred compression algorithms, as :py:obj:`~constants.CompressionAlgorithm`.
+ This keyword is ignored for non-self-certifications.
+ :type compression: ``list``
+ :keyword key_expiration: Specify a key expiration date for when this key should expire, or a
+ :py:obj:`~datetime.timedelta` of how long after the key was created it should expire.
+ This keyword is ignored for non-self-certifications.
+ :type key_expiration: :py:obj:`datetime.datetime`, :py:obj:`datetime.timedelta`
+ :keyword keyserver: Specify the URI of the preferred key server of the user.
+ This keyword is ignored for non-self-certifications.
+ :type keyserver: ``str``, ``unicode``, ``bytes``
+ :keyword primary: Whether or not to consider the certified User ID as the primary one.
+ This keyword is ignored for non-self-certifications, and any certifications directly on keys.
+ :type primary: ``bool``
+
+ These optional keywords only make sense, and thus only have an effect, when signing another key or User ID:
+
+ :keyword trust: Specify the level and amount of trust to assert when certifying a public key. Should be a tuple
+ of two ``int`` s, specifying the trust level and trust amount. See
+ `RFC 4880 Section 5.2.3.13. Trust Signature <http://tools.ietf.org/html/rfc4880#section-5.2.3.13>`_
+ for more on what these values mean.
+ :type trust: ``tuple`` of two ``int`` s
+ :keyword regex: Specify a regular expression to constrain the specified trust signature in the resulting signature.
+ Symbolically signifies that the specified trust signature only applies to User IDs which match
+ this regular expression.
+ This is meaningless without also specifying trust level and amount.
+ :type regex: ``str``
+ """
+ hash_algo = prefs.pop('hash', None)
+ sig_type = level
+ if isinstance(subject, PGPKey):
+ sig_type = SignatureType.DirectlyOnKey
+
+ sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid)
+
+ # signature options that only make sense in certifications
+ usage = prefs.pop('usage', None)
+ exportable = prefs.pop('exportable', None)
+
+ if usage is not None:
+ sig._signature.subpackets.addnew('KeyFlags', hashed=True, flags=usage)
+
+ if exportable is not None:
+ sig._signature.subpackets.addnew('ExportableCertification', hashed=True, bflag=exportable)
+
+ keyfp = self.fingerprint
+ if isinstance(subject, PGPKey):
+ keyfp = subject.fingerprint
+ if isinstance(subject, PGPUID) and subject._parent is not None:
+ keyfp = subject._parent.fingerprint
+
+ if keyfp == self.fingerprint:
+ # signature options that only make sense in self-certifications
+ cipher_prefs = prefs.pop('ciphers', None)
+ hash_prefs = prefs.pop('hashes', None)
+ compression_prefs = prefs.pop('compression', None)
+ key_expires = prefs.pop('key_expiration', None)
+ keyserver_flags = prefs.pop('keyserver_flags', None)
+ keyserver = prefs.pop('keyserver', None)
+ primary_uid = prefs.pop('primary', None)
+
+ if key_expires is not None:
+ # key expires should be a timedelta, so if it's a datetime, turn it into a timedelta
+ if isinstance(key_expires, datetime):
+ key_expires = key_expires - self.created
+
+ sig._signature.subpackets.addnew('KeyExpirationTime', hashed=True, expires=key_expires)
+
+ if cipher_prefs is not None:
+ sig._signature.subpackets.addnew('PreferredSymmetricAlgorithms', hashed=True, flags=cipher_prefs)
+
+ if hash_prefs is not None:
+ sig._signature.subpackets.addnew('PreferredHashAlgorithms', hashed=True, flags=hash_prefs)
+ if sig.hash_algorithm is None:
+ sig._signature.halg = hash_prefs[0]
+
+ if compression_prefs is not None:
+ sig._signature.subpackets.addnew('PreferredCompressionAlgorithms', hashed=True, flags=compression_prefs)
+
+ if keyserver_flags is not None:
+ sig._signature.subpackets.addnew('KeyServerPreferences', hashed=True, flags=keyserver_flags)
+
+ if keyserver is not None:
+ sig._signature.subpackets.addnew('PreferredKeyServer', hashed=True, uri=keyserver)
+
+ if primary_uid is not None:
+ sig._signature.subpackets.addnew('PrimaryUserID', hashed=True, primary=primary_uid)
+
+ # Features is always set on self-signatures
+ sig._signature.subpackets.addnew('Features', hashed=True, flags=Features.pgpy_features)
+
+ else:
+ # signature options that only make sense in non-self-certifications
+ trust = prefs.pop('trust', None)
+ regex = prefs.pop('regex', None)
+
+ if trust is not None:
+ sig._signature.subpackets.addnew('TrustSignature', hashed=True, level=trust[0], amount=trust[1])
+
+ if regex is not None:
+ sig._signature.subpackets.addnew('RegularExpression', hashed=True, regex=regex)
+
+ return self._sign(subject, sig, **prefs)
+
+ @KeyAction(KeyFlags.Certify, is_unlocked=True, is_public=False)
+ def revoke(self, target, **prefs):
+ """
+ Revoke a key, a subkey, or all current certification signatures of a User ID that were generated by this key so far.
+
+ :param target: The key to revoke
+ :type target: :py:obj:`PGPKey`, :py:obj:`PGPUID`
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public
+ :returns: :py:obj:`PGPSignature`
+
+ In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional
+ keyword arguments can be used with :py:meth:`PGPKey.revoke`.
+
+ :keyword reason: Defaults to :py:obj:`constants.RevocationReason.NotSpecified`
+ :type reason: One of :py:obj:`constants.RevocationReason`.
+ :keyword comment: Defaults to an empty string.
+ :type comment: ``str``
+ """
+ hash_algo = prefs.pop('hash', None)
+ if isinstance(target, PGPUID):
+ sig_type = SignatureType.CertRevocation
+
+ elif isinstance(target, PGPKey):
+ ##TODO: check to make sure that the key that is being revoked:
+ # - is this key
+ # - is one of this key's subkeys
+ # - specifies this key as its revocation key
+ if target.is_primary:
+ sig_type = SignatureType.KeyRevocation
+
+ else:
+ sig_type = SignatureType.SubkeyRevocation
+
+ else: # pragma: no cover
+ raise TypeError
+
+ sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid)
+
+ # signature options that only make sense when revoking
+ reason = prefs.pop('reason', RevocationReason.NotSpecified)
+ comment = prefs.pop('comment', "")
+ sig._signature.subpackets.addnew('ReasonForRevocation', hashed=True, code=reason, string=comment)
+
+ return self._sign(target, sig, **prefs)
+
+ @KeyAction(is_unlocked=True, is_public=False)
+ def revoker(self, revoker, **prefs):
+ """
+ Generate a signature that specifies another key as being valid for revoking this key.
+
+ :param revoker: The :py:obj:`PGPKey` to specify as a valid revocation key.
+ :type revoker: :py:obj:`PGPKey`
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked
+ :raises: :py:exc:`~pgpy.errors.PGPError` if the key is public
+ :returns: :py:obj:`PGPSignature`
+
+ In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional
+ keyword arguments can be used with :py:meth:`PGPKey.revoker`.
+
+ :keyword sensitive: If ``True``, this sets the sensitive flag on the RevocationKey subpacket. Currently,
+ this has no other effect.
+ :type sensitive: ``bool``
+ """
+ hash_algo = prefs.pop('hash', None)
+
+ sig = PGPSignature.new(SignatureType.DirectlyOnKey, self.key_algorithm, hash_algo, self.fingerprint.keyid)
+
+ # signature options that only make sense when adding a revocation key
+ sensitive = prefs.pop('sensitive', False)
+ keyclass = RevocationKeyClass.Normal | (RevocationKeyClass.Sensitive if sensitive else 0x00)
+
+ sig._signature.subpackets.addnew('RevocationKey',
+ hashed=True,
+ algorithm=revoker.key_algorithm,
+ fingerprint=revoker.fingerprint,
+ keyclass=keyclass)
+
+ # revocation keys should really not be revocable themselves
+ prefs['revocable'] = False
+ return self._sign(self, sig, **prefs)
+
+ @KeyAction(is_unlocked=True, is_public=False)
+ def bind(self, key, **prefs):
+ """
+ Bind a subkey to this key.
+
+ Valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPkey.certify`
+ """
+ hash_algo = prefs.pop('hash', None)
+
+ if self.is_primary and not key.is_primary:
+ sig_type = SignatureType.Subkey_Binding
+
+ elif key.is_primary and not self.is_primary:
+ sig_type = SignatureType.PrimaryKey_Binding
+
+ else: # pragma: no cover
+ raise PGPError
+
+ sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid)
+
+ if sig_type == SignatureType.Subkey_Binding:
+ # signature options that only make sense in subkey binding signatures
+ usage = prefs.pop('usage', None)
+
+ if usage is not None:
+ sig._signature.subpackets.addnew('KeyFlags', hashed=True, flags=usage)
+
+ # if possible, have the subkey create a primary key binding signature
+ if key.key_algorithm.can_sign:
+ subkeyid = key.fingerprint.keyid
+ esig = None
+
+ if not key.is_public:
+ esig = key.bind(self)
+
+ elif subkeyid in self.subkeys: # pragma: no cover
+ esig = self.subkeys[subkeyid].bind(self)
+
+ if esig is not None:
+ sig._signature.subpackets.addnew('EmbeddedSignature', hashed=False, _sig=esig._signature)
+
+ return self._sign(key, sig, **prefs)
+
+ def verify(self, subject, signature=None):
+ """
+ Verify a subject with a signature using this key.
+
+ :param subject: The subject to verify
+ :type subject: ``str``, ``unicode``, ``None``, :py:obj:`PGPMessage`, :py:obj:`PGPKey`, :py:obj:`PGPUID`
+ :param signature: If the signature is detached, it should be specified here.
+ :type signature: :py:obj:`PGPSignature`
+ :returns: :py:obj:`~pgpy.types.SignatureVerification`
+ """
+ sspairs = []
+
+ # some type checking
+ if not isinstance(subject, (type(None), PGPMessage, PGPKey, PGPUID, PGPSignature, six.string_types, bytes, bytearray)):
+ raise TypeError("Unexpected subject value: {:s}".format(str(type(subject))))
+ if not isinstance(signature, (type(None), PGPSignature)):
+ raise TypeError("Unexpected signature value: {:s}".format(str(type(signature))))
+
+ def _filter_sigs(sigs):
+ _ids = {self.fingerprint.keyid} | set(self.subkeys)
+ return [ sig for sig in sigs if sig.signer in _ids ]
+
+ # collect signature(s)
+ if signature is None:
+ if isinstance(subject, PGPMessage):
+ sspairs += [ (sig, subject.message) for sig in _filter_sigs(subject.signatures) ]
+
+ if isinstance(subject, (PGPUID, PGPKey)):
+ sspairs += [ (sig, subject) for sig in _filter_sigs(subject.__sig__) ]
+
+ if isinstance(subject, PGPKey):
+ # user ids
+ sspairs += [ (sig, uid) for uid in subject.userids for sig in _filter_sigs(uid.__sig__) ]
+ # user attributes
+ sspairs += [ (sig, ua) for ua in subject.userattributes for sig in _filter_sigs(ua.__sig__) ]
+ # subkey binding signatures
+ sspairs += [ (sig, subkey) for subkey in subject.subkeys.values() for sig in _filter_sigs(subkey.__sig__) ]
+
+ elif signature.signer in {self.fingerprint.keyid} | set(self.subkeys):
+ sspairs += [(signature, subject)]
+
+ if len(sspairs) == 0:
+ raise PGPError("No signatures to verify")
+
+ # finally, start verifying signatures
+ sigv = SignatureVerification()
+ for sig, subj in sspairs:
+ if self.fingerprint.keyid != sig.signer and sig.signer in self.subkeys:
+ warnings.warn("Signature was signed with this key's subkey: {:s}. "
+ "Verifying with subkey...".format(sig.signer),
+ stacklevel=2)
+ sigv &= self.subkeys[sig.signer].verify(subj, sig)
+
+ else:
+ verified = self._key.verify(sig.hashdata(subj), sig.__sig__, getattr(hashes, sig.hash_algorithm.name)())
+ if verified is NotImplemented:
+ raise NotImplementedError(sig.key_algorithm)
+
+ sigv.add_sigsubj(sig, self.fingerprint.keyid, subj, verified)
+
+ return sigv
+
+ @KeyAction(KeyFlags.EncryptCommunications, KeyFlags.EncryptStorage, is_public=True)
+ def encrypt(self, message, sessionkey=None, **prefs):
+ """
+ Encrypt a PGPMessage using this key.
+
+ :param message: The message to encrypt.
+ :type message: :py:obj:`PGPMessage`
+ :optional param sessionkey: Provide a session key to use when encrypting something. Default is ``None``.
+ If ``None``, a session key of the appropriate length will be generated randomly.
+
+ .. warning::
+
+ Care should be taken when making use of this option! Session keys *absolutely need*
+ to be unpredictable! Use the ``gen_key()`` method on the desired
+ :py:obj:`~constants.SymmetricKeyAlgorithm` to generate the session key!
+ :type sessionkey: ``bytes``, ``str``
+
+ :raises: :py:exc:`~errors.PGPEncryptionError` if encryption failed for any reason.
+ :returns: A new :py:obj:`PGPMessage` with the encrypted contents of ``message``
+
+ The following optional keyword arguments can be used with :py:meth:`PGPKey.encrypt`:
+
+ :keyword cipher: Specifies the symmetric block cipher to use when encrypting the message.
+ :type cipher: :py:obj:`~constants.SymmetricKeyAlgorithm`
+ :keyword user: Specifies the User ID to use as the recipient for this encryption operation, for the purposes of
+ preference defaults and selection validation.
+ :type user: ``str``, ``unicode``
+ """
+ user = prefs.pop('user', None)
+ uid = None
+ if user is not None:
+ uid = self.get_uid(user)
+ else:
+ uid = next(iter(self.userids), None)
+ if uid is None and self.parent is not None:
+ uid = next(iter(self.parent.userids), None)
+ cipher_algo = prefs.pop('cipher', uid.selfsig.cipherprefs[0])
+
+ if cipher_algo not in uid.selfsig.cipherprefs:
+ warnings.warn("Selected symmetric algorithm not in key preferences", stacklevel=3)
+
+ if message.is_compressed and message._compression not in uid.selfsig.compprefs:
+ warnings.warn("Selected compression algorithm not in key preferences", stacklevel=3)
+
+ if sessionkey is None:
+ sessionkey = cipher_algo.gen_key()
+
+ # set up a new PKESessionKeyV3
+ pkesk = PKESessionKeyV3()
+ pkesk.encrypter = bytearray(binascii.unhexlify(self.fingerprint.keyid.encode('latin-1')))
+ pkesk.pkalg = self.key_algorithm
+ # pkesk.encrypt_sk(self.__key__, cipher_algo, sessionkey)
+ pkesk.encrypt_sk(self._key, cipher_algo, sessionkey)
+
+ if message.is_encrypted: # pragma: no cover
+ _m = message
+
+ else:
+ _m = PGPMessage()
+ skedata = IntegrityProtectedSKEDataV1()
+ skedata.encrypt(sessionkey, cipher_algo, message.__bytes__())
+ _m |= skedata
+
+ _m |= pkesk
+
+ return _m
+
+ def decrypt(self, message):
+ """
+ Decrypt a PGPMessage using this key.
+
+ :param message: An encrypted :py:obj:`PGPMessage`
+ :returns: A new :py:obj:`PGPMessage` with the decrypted contents of ``message``
+ """
+ if not message.is_encrypted:
+ warnings.warn("This message is not encrypted", stacklevel=2)
+ return message
+
+ if self.fingerprint.keyid not in message.issuers:
+ sks = set(self.subkeys)
+ mis = set(message.issuers)
+ if sks & mis:
+ skid = list(sks & mis)[0]
+ warnings.warn("Message was encrypted with this key's subkey: {:s}. "
+ "Decrypting with that...".format(skid),
+ stacklevel=2)
+ return self.subkeys[skid].decrypt(message)
+
+ raise PGPError("Cannot decrypt the provided message with this key")
+
+ pkesk = next(pk for pk in message._sessionkeys if pk.pkalg == self.key_algorithm and pk.encrypter == self.fingerprint.keyid)
+ alg, key = pkesk.decrypt_sk(self._key)
+
+ # now that we have the symmetric cipher used and the key, we can decrypt the actual message
+ decmsg = PGPMessage()
+ decmsg.parse(message.message.decrypt(key, alg))
+
+ return decmsg
+
+ def parse(self, data):
+ unarmored = self.ascii_unarmor(data)
+ data = unarmored['body']
+
+ if unarmored['magic'] is not None and 'KEY' not in unarmored['magic']:
+ raise ValueError('Expected: KEY. Got: {}'.format(str(unarmored['magic'])))
+
+ if unarmored['headers'] is not None:
+ self.ascii_headers = unarmored['headers']
+
+ # parse packets
+ # keys will hold other keys parsed here
+ keys = collections.OrderedDict()
+ # orphaned will hold all non-opaque orphaned packets
+ orphaned = []
+ # last holds the last non-signature thing processed
+
+ ##TODO: see issue #141 and fix this better
+ getpkt = lambda d: Packet(d) if len(d) > 0 else None # flake8: noqa
+ # some packets are filtered out
+ getpkt = filter(lambda p: p.header.tag != PacketTag.Trust, iter(functools.partial(getpkt, data), None))
+
+ def pktgrouper():
+ class PktGrouper(object):
+ def __init__(self):
+ self.last = None
+
+ def __call__(self, pkt):
+ if pkt.header.tag != PacketTag.Signature:
+ self.last = '{:02X}_{:s}'.format(id(pkt), pkt.__class__.__name__)
+ return self.last
+ return PktGrouper()
+
+ while True:
+ # print(type(p) for p in getpkt)
+ for group in iter(group for _, group in itertools.groupby(getpkt, key=pktgrouper()) if not _.endswith('Opaque')):
+ pkt = next(group)
+
+ # deal with pkt first
+ if isinstance(pkt, Key):
+ pgpobj = (self if self._key is None else PGPKey()) | pkt
+
+ elif isinstance(pkt, (UserID, UserAttribute)):
+ pgpobj = PGPUID() | pkt
+
+ else: # pragma: no cover
+ break
+
+ # add signatures to whatever we got
+ [ operator.ior(pgpobj, PGPSignature() | sig) for sig in group if not isinstance(sig, Opaque) ]
+
+ # and file away pgpobj
+ if isinstance(pgpobj, PGPKey):
+ if pgpobj.is_primary:
+ keys[(pgpobj.fingerprint.keyid, pgpobj.is_public)] = pgpobj
+
+ else:
+ keys[next(reversed(keys))] |= pgpobj
+
+ elif isinstance(pgpobj, PGPUID):
+ # parent is likely the most recently parsed primary key
+ keys[next(reversed(keys))] |= pgpobj
+
+ else: # pragma: no cover
+ break
+ else:
+ # finished normally
+ break
+
+ # this will only be reached called if the inner loop hit a break
+ warnings.warn("Warning: Orphaned packet detected! {:s}".format(repr(pkt)), stacklevel=2) # pragma: no cover
+ orphaned.append(pkt) # pragma: no cover
+ for pkt in group: # pragma: no cover
+ orphaned.append(pkt)
+
+ # remove the reference to self from keys
+ [ keys.pop((getattr(self, 'fingerprint.keyid', '~'), None), t) for t in (True, False) ]
+ # return {'keys': keys, 'orphaned': orphaned}
+ return keys
+
+
+class PGPKeyring(collections.Container, collections.Iterable, collections.Sized):
+ def __init__(self, *args):
+ """
+ PGPKeyring objects represent in-memory keyrings that can contain any combination of supported private and public
+ keys. It can not currently be conveniently exported to a format that can be understood by GnuPG.
+ """
+ super(PGPKeyring, self).__init__()
+ self._keys = {}
+ self._pubkeys = collections.deque()
+ self._privkeys = collections.deque()
+ self._aliases = collections.deque([{}])
+ self.load(*args)
+
+ def __contains__(self, alias):
+ aliases = set().union(*self._aliases)
+
+ if isinstance(alias, six.string_types):
+ return alias in aliases or alias.replace(' ', '') in aliases
+
+ return alias in aliases # pragma: no cover
+
+ def __len__(self):
+ return len(self._keys)
+
+ def __iter__(self): # pragma: no cover
+ for pgpkey in itertools.chain(self._pubkeys, self._privkeys):
+ yield pgpkey
+
+ def _get_key(self, alias):
+ for m in self._aliases:
+ if alias in m:
+ return self._keys[m[alias]]
+
+ if alias.replace(' ', '') in m:
+ return self._keys[m[alias.replace(' ', '')]]
+
+ raise KeyError(alias)
+
+ def _get_keys(self, alias):
+ return [self._keys[m[alias]] for m in self._aliases if alias in m]
+
+ def _sort_alias(self, alias):
+ # remove alias from all levels of _aliases, and sort by created time and key half
+ # so the order of _aliases from left to right:
+ # - newer keys come before older ones
+ # - private keys come before public ones
+ #
+ # this list is sorted in the opposite direction from that, because they will be placed into self._aliases
+ # from right to left.
+ pkids = sorted(list(set().union(m.pop(alias) for m in self._aliases if alias in m)),
+ key=lambda pkid: (self._keys[pkid].created, self._keys[pkid].is_public))
+
+ # drop the now-sorted aliases into place
+ for depth, pkid in enumerate(pkids):
+ self._aliases[depth][alias] = pkid
+
+ # finally, remove any empty dicts left over
+ while {} in self._aliases: # pragma: no cover
+ self._aliases.remove({})
+
+ def _add_alias(self, alias, pkid):
+ # brand new alias never seen before!
+ if alias not in self:
+ self._aliases[-1][alias] = pkid
+
+ # this is a duplicate alias->key link; ignore it
+ elif alias in self and pkid in set(m[alias] for m in self._aliases if alias in m):
+ pass # pragma: no cover
+
+ # this is an alias that already exists, but points to a key that is not already referenced by it
+ else:
+ adepth = len(self._aliases) - len([None for m in self._aliases if alias in m]) - 1
+ # all alias maps have this alias, so increase total depth by 1
+ if adepth == -1:
+ self._aliases.appendleft({})
+ adepth = 0
+
+ self._aliases[adepth][alias] = pkid
+ self._sort_alias(alias)
+
+ def _add_key(self, pgpkey):
+ pkid = id(pgpkey)
+ if pkid not in self._keys:
+ self._keys[pkid] = pgpkey
+
+ # add to _{pub,priv}keys if this is either a primary key, or a subkey without one
+ if pgpkey.parent is None:
+ if pgpkey.is_public:
+ self._pubkeys.append(pkid)
+
+ else:
+ self._privkeys.append(pkid)
+
+ # aliases
+ self._add_alias(pgpkey.fingerprint, pkid)
+ self._add_alias(pgpkey.fingerprint.keyid, pkid)
+ self._add_alias(pgpkey.fingerprint.shortid, pkid)
+ for uid in pgpkey.userids:
+ self._add_alias(uid.name, pkid)
+ if uid.comment:
+ self._add_alias(uid.comment, pkid)
+
+ if uid.email:
+ self._add_alias(uid.email, pkid)
+
+ # subkeys
+ for subkey in pgpkey.subkeys.values():
+ self._add_key(subkey)
+
+ def load(self, *args):
+ """
+ Load all keys provided into this keyring object.
+
+ :param \*args: Each arg in ``args`` can be any of the formats supported by :py:meth:`PGPKey.from_path` and
+ :py:meth:`PGPKey.from_blob`, or a ``list`` or ``tuple`` of these.
+ :type \*args: ``list``, ``tuple``, ``str``, ``unicode``, ``bytes``, ``bytearray``
+ :returns: a ``set`` containing the unique fingerprints of all of the keys that were loaded during this operation.
+ """
+ def _preiter(first, iterable):
+ yield first
+ for item in iterable:
+ yield item
+
+ loaded = set()
+ for key in iter(item for ilist in iter(ilist if isinstance(ilist, (tuple, list)) else [ilist] for ilist in args)
+ for item in ilist):
+ if os.path.isfile(key):
+ _key, keys = PGPKey.from_file(key)
+
+ else:
+ _key, keys = PGPKey.from_blob(key)
+
+ for ik in _preiter(_key, keys.values()):
+ self._add_key(ik)
+ loaded |= {ik.fingerprint} | {isk.fingerprint for isk in ik.subkeys.values()}
+
+ return list(loaded)
+
+ @contextlib.contextmanager
+ def key(self, identifier):
+ """
+ A context-manager method. Yields the first :py:obj:`PGPKey` object that matches the provided identifier.
+
+ :param identifier: The identifier to use to select a loaded key.
+ :type identifier: :py:exc:`PGPMessage`, :py:exc:`PGPSignature`, ``str``
+ :raises: :py:exc:`KeyError` if there is no loaded key that satisfies the identifier.
+ """
+ if isinstance(identifier, PGPMessage):
+ for issuer in identifier.issuers:
+ if issuer in self:
+ identifier = issuer
+ break
+
+ if isinstance(identifier, PGPSignature):
+ identifier = identifier.signer
+
+ yield self._get_key(identifier)
+
+ def fingerprints(self, keyhalf='any', keytype='any'):
+ """
+ List loaded fingerprints with some optional filtering.
+
+ :param str keyhalf: Can be 'any', 'public', or 'private'. If 'public', or 'private', the fingerprints of keys of the
+ the other type will not be included in the results.
+ :param str keytype: Can be 'any', 'primary', or 'sub'. If 'primary' or 'sub', the fingerprints of keys of the
+ the other type will not be included in the results.
+ :returns: a ``set`` of fingerprints of keys matching the filters specified.
+ """
+ return {pk.fingerprint for pk in self._keys.values()
+ if pk.is_primary in [True if keytype in ['primary', 'any'] else None,
+ False if keytype in ['sub', 'any'] else None]
+ if pk.is_public in [True if keyhalf in ['public', 'any'] else None,
+ False if keyhalf in ['private', 'any'] else None]}
+
+ def unload(self, key):
+ """
+ Unload a loaded key and its subkeys.
+
+ The easiest way to do this is to select a key using :py:meth:`PGPKeyring.key` first::
+
+ with keyring.key("DSA von TestKey") as key:
+ keyring.unload(key)
+
+ :param key: The key to unload.
+ :type key: :py:obj:`PGPKey`
+ """
+ assert isinstance(key, PGPKey)
+ pkid = id(key)
+ if pkid in self._keys:
+ # remove references
+ [ kd.remove(pkid) for kd in [self._pubkeys, self._privkeys] if pkid in kd ]
+ # remove the key
+ self._keys.pop(pkid)
+
+ # remove aliases
+ for m, a in [ (m, a) for m in self._aliases for a, p in m.items() if p == pkid ]:
+ m.pop(a)
+ # do a re-sort of this alias if it was not unique
+ if a in self:
+ self._sort_alias(a)
+
+ # if key is a primary key, unload its subkeys as well
+ if key.is_primary:
+ [ self.unload(sk) for sk in key.subkeys.values() ]
diff --git a/src/leap/mx/vendor/pgpy/symenc.py b/src/leap/mx/vendor/pgpy/symenc.py
new file mode 100644
index 0000000..3d81a7c
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/symenc.py
@@ -0,0 +1,58 @@
+""" symenc.py
+"""
+import six
+
+from cryptography.exceptions import UnsupportedAlgorithm
+
+from cryptography.hazmat.backends import default_backend
+
+from cryptography.hazmat.primitives.ciphers import Cipher
+from cryptography.hazmat.primitives.ciphers import modes
+
+from .errors import PGPDecryptionError
+from .errors import PGPEncryptionError
+from .errors import PGPInsecureCipher
+
+__all__ = ['_encrypt',
+ '_decrypt']
+
+
+def _encrypt(pt, key, alg, iv=None):
+ if iv is None:
+ iv = b'\x00' * (alg.block_size // 8)
+
+ if alg.is_insecure:
+ raise PGPInsecureCipher("{:s} is not secure. Do not use it for encryption!".format(alg.name))
+
+ if not callable(alg.cipher):
+ raise PGPEncryptionError("Cipher {:s} not supported".format(alg.name))
+
+ try:
+ encryptor = Cipher(alg.cipher(key), modes.CFB(iv), default_backend()).encryptor()
+
+ except UnsupportedAlgorithm as ex: # pragma: no cover
+ six.raise_from(PGPEncryptionError, ex)
+
+ else:
+ return bytearray(encryptor.update(pt) + encryptor.finalize())
+
+
+def _decrypt(ct, key, alg, iv=None):
+ if iv is None:
+ """
+ Instead of using an IV, OpenPGP prefixes a string of length
+ equal to the block size of the cipher plus two to the data before it
+ is encrypted. The first block-size octets (for example, 8 octets for
+ a 64-bit block length) are random, and the following two octets are
+ copies of the last two octets of the IV.
+ """
+ iv = b'\x00' * (alg.block_size // 8)
+
+ try:
+ decryptor = Cipher(alg.cipher(key), modes.CFB(iv), default_backend()).decryptor()
+
+ except UnsupportedAlgorithm as ex: # pragma: no cover
+ six.raise_from(PGPDecryptionError, ex)
+
+ else:
+ return bytearray(decryptor.update(ct) + decryptor.finalize())
diff --git a/src/leap/mx/vendor/pgpy/types.py b/src/leap/mx/vendor/pgpy/types.py
new file mode 100644
index 0000000..b549434
--- /dev/null
+++ b/src/leap/mx/vendor/pgpy/types.py
@@ -0,0 +1,729 @@
+""" types.py
+"""
+from __future__ import division
+
+import abc
+import base64
+import binascii
+import bisect
+import codecs
+import collections
+import operator
+import os
+import re
+import warnings
+import weakref
+
+from enum import EnumMeta
+from enum import IntEnum
+
+import six
+
+from ._author import __version__
+
+from .decorators import sdproperty
+
+from .errors import PGPError
+
+__all__ = ['Armorable',
+ 'ParentRef',
+ 'PGPObject',
+ 'Field',
+ 'Header',
+ 'MetaDispatchable',
+ 'Dispatchable',
+ 'SignatureVerification',
+ 'FlagEnumMeta',
+ 'FlagEnum',
+ 'Fingerprint',
+ 'SorteDeque']
+
+if six.PY2:
+ FileNotFoundError = IOError
+ re.ASCII = 0
+
+
+class Armorable(six.with_metaclass(abc.ABCMeta)):
+ __crc24_init__ = 0x0B704CE
+ __crc24_poly__ = 0x1864CFB
+
+ __armor_fmt__ = '-----BEGIN PGP {block_type}-----\n' \
+ '{headers}\n' \
+ '{packet}\n' \
+ '={crc}\n' \
+ '-----END PGP {block_type}-----\n'
+
+ @property
+ def charset(self):
+ return self.ascii_headers.get('Charset', 'utf-8')
+
+ @charset.setter
+ def charset(self, encoding):
+ self.ascii_headers['Charset'] = codecs.lookup(encoding).name
+
+ @staticmethod
+ def is_ascii(text):
+ if isinstance(text, six.string_types):
+ return bool(re.match(r'^[ -~\r\n]+$', text, flags=re.ASCII))
+
+ if isinstance(text, (bytes, bytearray)):
+ return bool(re.match(br'^[ -~\r\n]+$', text, flags=re.ASCII))
+
+ raise TypeError("Expected: ASCII input of type str, bytes, or bytearray") # pragma: no cover
+
+ @staticmethod
+ def ascii_unarmor(text):
+ """
+ Takes an ASCII-armored PGP block and returns the decoded byte value.
+
+ :param text: An ASCII-armored PGP block, to un-armor.
+ :raises: :py:exc:`ValueError` if ``text`` did not contain an ASCII-armored PGP block.
+ :raises: :py:exc:`TypeError` if ``text`` is not a ``str``, ``bytes``, or ``bytearray``
+ :returns: A ``dict`` containing information from ``text``, including the de-armored data.
+ """
+ m = {'magic': None, 'headers': None, 'body': bytearray(), 'crc': None}
+ if not Armorable.is_ascii(text):
+ m['body'] = bytearray(text)
+ return m
+
+ if isinstance(text, (bytes, bytearray)): # pragma: no cover
+ text = text.decode('latin-1')
+
+ # the re.VERBOSE flag allows for:
+ # - whitespace is ignored except when in a character class or escaped
+ # - anything after a '#' that is not escaped or in a character class is ignored, allowing for comments
+ m = re.search(r"""# This capture group is optional because it will only be present in signed cleartext messages
+ (^-{5}BEGIN\ PGP\ SIGNED\ MESSAGE-{5}(?:\r?\n)
+ (Hash:\ (?P<hashes>[A-Za-z0-9\-,]+)(?:\r?\n){2})?
+ (?P<cleartext>(.*\n)+)(?:\r?\n)
+ )?
+ # armor header line; capture the variable part of the magic text
+ ^-{5}BEGIN\ PGP\ (?P<magic>[A-Z0-9 ,]+)-{5}(?:\r?\n)
+ # try to capture all the headers into one capture group
+ # if this doesn't match, m['headers'] will be None
+ (?P<headers>(^.+:\ .+(?:\r?\n))+)?(?:\r?\n)?
+ # capture all lines of the body, up to 76 characters long,
+ # including the newline, and the pad character(s)
+ (?P<body>([A-Za-z0-9+/]{1,75}={,2}(?:\r?\n))+)
+ # capture the armored CRC24 value
+ ^=(?P<crc>[A-Za-z0-9+/]{4})(?:\r?\n)
+ # finally, capture the armor tail line, which must match the armor header line
+ ^-{5}END\ PGP\ (?P=magic)-{5}(?:\r?\n)?
+ """, text, flags=re.MULTILINE | re.VERBOSE)
+
+ if m is None: # pragma: no cover
+ raise ValueError("Expected: ASCII-armored PGP data")
+
+ m = m.groupdict()
+
+ if m['hashes'] is not None:
+ m['hashes'] = m['hashes'].split(',')
+
+ if m['headers'] is not None:
+ m['headers'] = collections.OrderedDict(re.findall('^(?P<key>.+): (?P<value>.+)$\n?', m['headers'], flags=re.MULTILINE))
+
+ if m['body'] is not None:
+ try:
+ m['body'] = bytearray(base64.b64decode(m['body'].encode()))
+
+ except (binascii.Error, TypeError) as ex:
+ six.raise_from(PGPError, ex)
+
+ if m['crc'] is not None:
+ m['crc'] = Header.bytes_to_int(base64.b64decode(m['crc'].encode()))
+ if Armorable.crc24(m['body']) != m['crc']:
+ warnings.warn('Incorrect crc24', stacklevel=3)
+
+ return m
+
+ @staticmethod
+ def crc24(data):
+ # CRC24 computation, as described in the RFC 4880 section on Radix-64 Conversions
+ #
+ # The checksum is a 24-bit Cyclic Redundancy Check (CRC) converted to
+ # four characters of radix-64 encoding by the same MIME base64
+ # transformation, preceded by an equal sign (=). The CRC is computed
+ # by using the generator 0x864CFB and an initialization of 0xB704CE.
+ # The accumulation is done on the data before it is converted to
+ # radix-64, rather than on the converted data.
+ crc = Armorable.__crc24_init__
+
+ if not isinstance(data, bytearray):
+ data = six.iterbytes(data)
+
+ for b in data:
+ crc ^= b << 16
+
+ for i in range(8):
+ crc <<= 1
+ if crc & 0x1000000:
+ crc ^= Armorable.__crc24_poly__
+
+ return crc & 0xFFFFFF
+
+ @abc.abstractproperty
+ def magic(self):
+ """The magic string identifier for the current PGP type"""
+
+ @classmethod
+ def from_file(cls, filename):
+ with open(filename, 'rb') as file:
+ obj = cls()
+ data = bytearray(os.path.getsize(filename))
+ file.readinto(data)
+
+ po = obj.parse(data)
+
+ if po is not None:
+ return (obj, po)
+
+ return obj # pragma: no cover
+
+ @classmethod
+ def from_blob(cls, blob):
+ obj = cls()
+ if (not isinstance(blob, six.binary_type)) and (not isinstance(blob, bytearray)):
+ po = obj.parse(bytearray(blob, 'latin-1'))
+
+ else:
+ po = obj.parse(bytearray(blob))
+
+ if po is not None:
+ return (obj, po)
+
+ return obj # pragma: no cover
+
+ def __init__(self):
+ super(Armorable, self).__init__()
+ self.ascii_headers = collections.OrderedDict()
+ self.ascii_headers['Version'] = 'PGPy v' + __version__ # Default value
+
+ def __str__(self):
+ payload = base64.b64encode(self.__bytes__()).decode('latin-1')
+ payload = '\n'.join(payload[i:(i + 64)] for i in range(0, len(payload), 64))
+
+ return self.__armor_fmt__.format(
+ block_type=self.magic,
+ headers=''.join('{key}: {val}\n'.format(key=key, val=val) for key, val in self.ascii_headers.items()),
+ packet=payload,
+ crc=base64.b64encode(PGPObject.int_to_bytes(self.crc24(self.__bytes__()), 3)).decode('latin-1')
+ )
+
+ def __copy__(self):
+ obj = self.__class__()
+ obj.ascii_headers = self.ascii_headers.copy()
+
+ return obj
+
+
+class ParentRef(object):
+ # mixin class to handle weak-referencing a parent object
+ @property
+ def _parent(self):
+ if isinstance(self.__parent, weakref.ref):
+ return self.__parent()
+ return self.__parent
+
+ @_parent.setter
+ def _parent(self, parent):
+ try:
+ self.__parent = weakref.ref(parent)
+
+ except TypeError:
+ self.__parent = parent
+
+ @property
+ def parent(self):
+ return self._parent
+
+ def __init__(self):
+ super(ParentRef, self).__init__()
+ self._parent = None
+
+
+class PGPObject(six.with_metaclass(abc.ABCMeta, object)):
+ __metaclass__ = abc.ABCMeta
+
+ @staticmethod
+ def int_byte_len(i):
+ return (i.bit_length() + 7) // 8
+
+ @staticmethod
+ def bytes_to_int(b, order='big'): # pragma: no cover
+ """convert bytes to integer"""
+ if six.PY2:
+ # save the original type of b without having to copy any data
+ _b = b.__class__()
+ if order != 'little':
+ b = reversed(b)
+
+ if not isinstance(_b, bytearray):
+ b = six.iterbytes(b)
+
+ return sum(c << (i * 8) for i, c in enumerate(b))
+
+ return int.from_bytes(b, order)
+
+ @staticmethod
+ def int_to_bytes(i, minlen=1, order='big'): # pragma: no cover
+ """convert integer to bytes"""
+ blen = max(minlen, PGPObject.int_byte_len(i), 1)
+
+ if six.PY2:
+ r = iter(_ * 8 for _ in (range(blen) if order == 'little' else range(blen - 1, -1, -1)))
+ return bytes(bytearray((i >> c) & 0xff for c in r))
+
+ return i.to_bytes(blen, order)
+
+ @staticmethod
+ def text_to_bytes(text):
+ if text is None:
+ return text
+
+ # if we got bytes, just return it
+ if isinstance(text, (bytearray, six.binary_type)):
+ return text
+
+ # if we were given a unicode string, or if we translated the string into utf-8,
+ # we know that Python already has it in utf-8 encoding, so we can now just encode it to bytes
+ return text.encode('utf-8')
+
+ @staticmethod
+ def bytes_to_text(text):
+ if text is None or isinstance(text, six.text_type):
+ return text
+
+ return text.decode('utf-8')
+
+ @abc.abstractmethod
+ def parse(self, packet):
+ """this method is too abstract to understand"""
+
+ @abc.abstractmethod
+ def __bytearray__(self):
+ """
+ Returns the contents of concrete subclasses in a binary format that can be understood by other OpenPGP
+ implementations
+ """
+
+ def __bytes__(self):
+ """
+ Return the contents of concrete subclasses in a binary format that can be understood by other OpenPGP
+ implementations
+ """
+ # this is what all subclasses will do anyway, so doing this here we can reduce code duplication significantly
+ return bytes(self.__bytearray__())
+
+
+class Field(PGPObject):
+ @abc.abstractmethod
+ def __len__(self):
+ """Return the length of the output of __bytes__"""
+
+
+class Header(Field):
+ @staticmethod
+ def encode_length(l, nhf=True, llen=1):
+ def _new_length(l):
+ if 192 > l:
+ return Header.int_to_bytes(l)
+
+ elif 8384 > l:
+ elen = ((l & 0xFF00) + (192 << 8)) + ((l & 0xFF) - 192)
+ return Header.int_to_bytes(elen, 2)
+
+ return b'\xFF' + Header.int_to_bytes(l, 4)
+
+ def _old_length(l, llen):
+ return Header.int_to_bytes(l, llen) if llen > 0 else b''
+
+ return _new_length(l) if nhf else _old_length(l, llen)
+
+ @sdproperty
+ def length(self):
+ return self._len
+
+ @length.register(int)
+ def length_int(self, val):
+ self._len = val
+
+ @length.register(six.binary_type)
+ @length.register(bytearray)
+ def length_bin(self, val):
+ def _new_len(b):
+ fo = b[0]
+
+ if 192 > fo:
+ self._len = self.bytes_to_int(b[:1])
+ del b[:1]
+
+ elif 224 > fo: # >= 192 is implied
+ dlen = self.bytes_to_int(b[:2])
+ self._len = ((dlen - (192 << 8)) & 0xFF00) + ((dlen & 0xFF) + 192)
+ del b[:2]
+
+ elif 255 > fo: # pragma: no cover
+ # not testable until partial body lengths actually work
+ # >= 224 is implied
+ # this is a partial-length header
+ self._partial = True
+ self._len = 1 << (fo & 0x1f)
+
+ elif 255 == fo:
+ self._len = self.bytes_to_int(b[1:5])
+ del b[:5]
+
+ else: # pragma: no cover
+ raise ValueError("Malformed length: 0x{:02x}".format(fo))
+
+ def _old_len(b):
+ if self.llen > 0:
+ self._len = self.bytes_to_int(b[:self.llen])
+ del b[:self.llen]
+
+ else: # pragma: no cover
+ self._len = 0
+
+ _new_len(val) if self._lenfmt == 1 else _old_len(val)
+
+ @sdproperty
+ def llen(self):
+ l = self.length
+ lf = self._lenfmt
+
+ if lf == 1:
+ # new-format length
+ if 192 > l:
+ return 1
+
+ elif 8384 > self.length: # >= 192 is implied
+ return 2
+
+ else:
+ return 5
+
+ else:
+ # old-format length
+ ##TODO: what if _llen needs to be (re)computed?
+ return self._llen
+
+ @llen.register(int)
+ def llen_int(self, val):
+ if self._lenfmt == 0:
+ self._llen = {0: 1, 1: 2, 2: 4, 3: 0}[val]
+
+ def __init__(self):
+ super(Header, self).__init__()
+ self._len = 1
+ self._llen = 1
+ self._lenfmt = 1
+ self._partial = False
+
+
+class MetaDispatchable(abc.ABCMeta):
+ """
+ MetaDispatchable is a metaclass for objects that subclass Dispatchable
+ """
+
+ _roots = set()
+ """
+ _roots is a set of all currently registered RootClass class objects
+
+ A RootClass is successfully registered if the following things are true:
+ - it inherits (directly or indirectly) from Dispatchable
+ - __typeid__ == -1
+ """
+ _registry = {}
+ """
+ _registry is the Dispatchable class registry. It uses the following format:
+
+ { (RootClass, None): OpaqueClass }:
+ denotes the default ("opaque") for a given RootClass.
+
+ An OpaqueClass is successfully registered as such provided the following conditions are met:
+ - it inherits directly from a RootClass
+ - __typeid__ is None
+
+ { (RootClass, TypeID): SubClass }:
+ denotes the class that handles the type given in TypeID
+
+ a SubClass is successfully registered as such provided the following conditions are met:
+ - it inherits (directly or indirectly) from a RootClass
+ - __typeid__ is a positive int
+ - the given typeid is not already registered
+
+ { (RootClass, TypeID): VerSubClass }:
+ denotes that a given TypeID has multiple versions, and that this is class' subclasses handle those.
+ A VerSubClass is registered identically to a normal SubClass.
+
+ { (RootClass, TypeID, Ver): VerSubClass }:
+ denotes the class that handles the type given in TypeID and the version of that type given in Ver
+
+ a Versioned SubClass is successfully registered as such provided the following conditions are met:
+ - it inherits from a VerSubClass
+ - __ver__ > 0
+ - the given typeid/ver combination is not already registered
+ """
+
+ def __new__(mcs, name, bases, attrs): # NOQA
+ ncls = super(MetaDispatchable, mcs).__new__(mcs, name, bases, attrs)
+
+ if not hasattr(ncls.__typeid__, '__isabstractmethod__'):
+ if ncls.__typeid__ == -1 and not issubclass(ncls, tuple(MetaDispatchable._roots)):
+ # this is a root class
+ MetaDispatchable._roots.add(ncls)
+
+ elif issubclass(ncls, tuple(MetaDispatchable._roots)) and ncls.__typeid__ != -1:
+ for rcls in [ root for root in MetaDispatchable._roots if issubclass(ncls, root) ]:
+ if (rcls, ncls.__typeid__) not in MetaDispatchable._registry:
+ MetaDispatchable._registry[(rcls, ncls.__typeid__)] = ncls
+
+ if (ncls.__ver__ is not None and ncls.__ver__ > 0 and
+ (rcls, ncls.__typeid__, ncls.__ver__) not in MetaDispatchable._registry):
+ MetaDispatchable._registry[(rcls, ncls.__typeid__, ncls.__ver__)] = ncls
+
+ # finally, return the new class object
+ return ncls
+
+ def __call__(cls, packet=None): # NOQA
+ def _makeobj(cls):
+ obj = object.__new__(cls)
+ obj.__init__()
+ return obj
+
+ if packet is not None:
+ if cls in MetaDispatchable._roots:
+ rcls = cls
+
+ elif issubclass(cls, tuple(MetaDispatchable._roots)): # pragma: no cover
+ rcls = next(root for root in MetaDispatchable._roots if issubclass(cls, root))
+
+ ##TODO: else raise an exception of some kind, but this should never happen
+
+ header = rcls.__headercls__()
+ header.parse(packet)
+
+ ncls = None
+ if (rcls, header.typeid) in MetaDispatchable._registry:
+ ncls = MetaDispatchable._registry[(rcls, header.typeid)]
+
+ if ncls.__ver__ == 0:
+ if header.__class__ != ncls.__headercls__:
+ nh = ncls.__headercls__()
+ nh.__dict__.update(header.__dict__)
+ try:
+ nh.parse(packet)
+
+ except Exception as ex:
+ six.raise_from(PGPError, ex)
+
+ header = nh
+
+ if (rcls, header.typeid, header.version) in MetaDispatchable._registry:
+ ncls = MetaDispatchable._registry[(rcls, header.typeid, header.version)]
+
+ else: # pragma: no cover
+ ncls = None
+
+ if ncls is None:
+ ncls = MetaDispatchable._registry[(rcls, None)]
+
+ obj = _makeobj(ncls)
+ obj.header = header
+
+ try:
+ obj.parse(packet)
+
+ except Exception as ex:
+ six.raise_from(PGPError, ex)
+
+ else:
+ obj = _makeobj(cls)
+
+ return obj
+
+
+class Dispatchable(six.with_metaclass(MetaDispatchable, PGPObject)):
+ __metaclass__ = MetaDispatchable
+
+ @abc.abstractproperty
+ def __headercls__(self): # pragma: no cover
+ return False
+
+ @abc.abstractproperty
+ def __typeid__(self): # pragma: no cover
+ return False
+
+ __ver__ = None
+
+
+class SignatureVerification(object):
+ _sigsubj = collections.namedtuple('sigsubj', ['verified', 'by', 'signature', 'subject'])
+
+ @property
+ def good_signatures(self):
+ """
+ A generator yielding namedtuples of all signatures that were successfully verified
+ in the operation that returned this instance. The namedtuple has the following attributes:
+
+ ``sigsubj.verified`` - ``bool`` of whether the signature verified successfully or not.
+
+ ``sigsubj.by`` - the :py:obj:`~pgpy.PGPKey` that was used in this verify operation.
+
+ ``sigsubj.signature`` - the :py:obj:`~pgpy.PGPSignature` that was verified.
+
+ ``sigsubj.subject`` - the subject that was verified using the signature.
+ """
+ for s in [ i for i in self._subjects if i.verified ]:
+ yield s
+
+ @property
+ def bad_signatures(self): # pragma: no cover
+ """
+ A generator yielding namedtuples of all signatures that were not verified
+ in the operation that returned this instance. The namedtuple has the following attributes:
+
+ ``sigsubj.verified`` - ``bool`` of whether the signature verified successfully or not.
+
+ ``sigsubj.by`` - the :py:obj:`~pgpy.PGPKey` that was used in this verify operation.
+
+ ``sigsubj.signature`` - the :py:obj:`~pgpy.PGPSignature` that was verified.
+
+ ``sigsubj.subject`` - the subject that was verified using the signature.
+ """
+ for s in [ i for i in self._subjects if not i.verified ]:
+ yield s
+
+ def __init__(self):
+ """
+ Returned by :py:meth:`PGPKey.verify`
+
+ Can be compared directly as a boolean to determine whether or not the specified signature verified.
+ """
+ super(SignatureVerification, self).__init__()
+ self._subjects = []
+
+ def __contains__(self, item):
+ return item in {ii for i in self._subjects for ii in [i.signature, i.subject]}
+
+ def __len__(self):
+ return len(self._subjects)
+
+ def __bool__(self):
+ return all(s.verified for s in self._subjects)
+
+ def __nonzero__(self):
+ return self.__bool__()
+
+ def __and__(self, other):
+ if not isinstance(other, SignatureVerification):
+ raise TypeError(type(other))
+
+ self._subjects += other._subjects
+ return self
+
+ def __repr__(self):
+ return "<SignatureVerification({verified})>".format(verified=str(bool(self)))
+
+ def add_sigsubj(self, signature, by, subject=None, verified=False):
+ self._subjects.append(self._sigsubj(verified, by, signature, subject))
+
+
+class FlagEnumMeta(EnumMeta):
+ def __and__(self, other):
+ return { f for f in iter(self) if f.value & other }
+
+ def __rand__(self, other): # pragma: no cover
+ return self & other
+
+
+if six.PY2:
+ class FlagEnum(IntEnum):
+ __metaclass__ = FlagEnumMeta
+
+else:
+ namespace = FlagEnumMeta.__prepare__('FlagEnum', (IntEnum,))
+ FlagEnum = FlagEnumMeta('FlagEnum', (IntEnum,), namespace)
+
+
+class Fingerprint(str):
+ """
+ A subclass of ``str``. Can be compared using == and != to ``str``, ``unicode``, and other :py:obj:`Fingerprint` instances.
+
+ Primarily used as a key for internal dictionaries, so it ignores spaces when comparing and hashing
+ """
+ @property
+ def keyid(self):
+ return str(self).replace(' ', '')[-16:]
+
+ @property
+ def shortid(self):
+ return str(self).replace(' ', '')[-8:]
+
+ def __new__(cls, content):
+ if isinstance(content, Fingerprint):
+ return content
+
+ # validate input before continuing: this should be a string of 40 hex digits
+ content = content.upper().replace(' ', '')
+ if not bool(re.match(r'^[A-F0-9]{40}$', content)):
+ raise ValueError("Expected: String of 40 hex digits")
+
+ # store in the format: "AAAA BBBB CCCC DDDD EEEE FFFF 0000 1111 2222 3333"
+ # ^^ note 2 spaces here
+ spaces = [ ' ' if i != 4 else ' ' for i in range(10) ]
+ chunks = [ ''.join(g) for g in six.moves.zip_longest(*[iter(content)] * 4) ]
+ content = ''.join(j for i in six.moves.zip_longest(chunks, spaces, fillvalue='') for j in i).strip()
+
+ return str.__new__(cls, content)
+
+ def __eq__(self, other):
+ if isinstance(other, Fingerprint):
+ return str(self) == str(other)
+
+ if isinstance(other, (six.text_type, bytes, bytearray)):
+ if isinstance(other, (bytes, bytearray)): # pragma: no cover
+ other = other.decode('latin-1')
+
+ other = str(other).replace(' ', '')
+ return any([self.replace(' ', '') == other,
+ self.keyid == other,
+ self.shortid == other])
+
+ return False # pragma: no cover
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __hash__(self):
+ return hash(str(self.replace(' ', '')))
+
+ def __bytes__(self):
+ return binascii.unhexlify(six.b(self.replace(' ', '')))
+
+
+class SorteDeque(collections.deque):
+ """A deque subclass that tries to maintain sorted ordering using bisect"""
+ def insort(self, item):
+ i = bisect.bisect_left(self, item)
+ self.rotate(- i)
+ self.appendleft(item)
+ self.rotate(i)
+
+ def resort(self, item): # pragma: no cover
+ if item in self:
+ # if item is already in self, see if it is still in sorted order.
+ # if not, re-sort it by removing it and then inserting it into its sorted order
+ i = bisect.bisect_left(self, item)
+ if i == len(self) or self[i] is not item:
+ self.remove(item)
+ self.insort(item)
+
+ else:
+ # if item is not in self, just insert it in sorted order
+ self.insort(item)
+
+ def check(self): # pragma: no cover
+ """re-sort any items in self that are not sorted"""
+ for unsorted in iter(self[i] for i in range(len(self) - 2) if not operator.le(self[i], self[i + 1])):
+ self.resort(unsorted)