diff options
Diffstat (limited to 'src/leap/mx/vendor/pgpy/types.py')
-rw-r--r-- | src/leap/mx/vendor/pgpy/types.py | 729 |
1 files changed, 729 insertions, 0 deletions
diff --git a/src/leap/mx/vendor/pgpy/types.py b/src/leap/mx/vendor/pgpy/types.py new file mode 100644 index 0000000..b549434 --- /dev/null +++ b/src/leap/mx/vendor/pgpy/types.py @@ -0,0 +1,729 @@ +""" types.py +""" +from __future__ import division + +import abc +import base64 +import binascii +import bisect +import codecs +import collections +import operator +import os +import re +import warnings +import weakref + +from enum import EnumMeta +from enum import IntEnum + +import six + +from ._author import __version__ + +from .decorators import sdproperty + +from .errors import PGPError + +__all__ = ['Armorable', + 'ParentRef', + 'PGPObject', + 'Field', + 'Header', + 'MetaDispatchable', + 'Dispatchable', + 'SignatureVerification', + 'FlagEnumMeta', + 'FlagEnum', + 'Fingerprint', + 'SorteDeque'] + +if six.PY2: + FileNotFoundError = IOError + re.ASCII = 0 + + +class Armorable(six.with_metaclass(abc.ABCMeta)): + __crc24_init__ = 0x0B704CE + __crc24_poly__ = 0x1864CFB + + __armor_fmt__ = '-----BEGIN PGP {block_type}-----\n' \ + '{headers}\n' \ + '{packet}\n' \ + '={crc}\n' \ + '-----END PGP {block_type}-----\n' + + @property + def charset(self): + return self.ascii_headers.get('Charset', 'utf-8') + + @charset.setter + def charset(self, encoding): + self.ascii_headers['Charset'] = codecs.lookup(encoding).name + + @staticmethod + def is_ascii(text): + if isinstance(text, six.string_types): + return bool(re.match(r'^[ -~\r\n]+$', text, flags=re.ASCII)) + + if isinstance(text, (bytes, bytearray)): + return bool(re.match(br'^[ -~\r\n]+$', text, flags=re.ASCII)) + + raise TypeError("Expected: ASCII input of type str, bytes, or bytearray") # pragma: no cover + + @staticmethod + def ascii_unarmor(text): + """ + Takes an ASCII-armored PGP block and returns the decoded byte value. + + :param text: An ASCII-armored PGP block, to un-armor. + :raises: :py:exc:`ValueError` if ``text`` did not contain an ASCII-armored PGP block. + :raises: :py:exc:`TypeError` if ``text`` is not a ``str``, ``bytes``, or ``bytearray`` + :returns: A ``dict`` containing information from ``text``, including the de-armored data. + """ + m = {'magic': None, 'headers': None, 'body': bytearray(), 'crc': None} + if not Armorable.is_ascii(text): + m['body'] = bytearray(text) + return m + + if isinstance(text, (bytes, bytearray)): # pragma: no cover + text = text.decode('latin-1') + + # the re.VERBOSE flag allows for: + # - whitespace is ignored except when in a character class or escaped + # - anything after a '#' that is not escaped or in a character class is ignored, allowing for comments + m = re.search(r"""# This capture group is optional because it will only be present in signed cleartext messages + (^-{5}BEGIN\ PGP\ SIGNED\ MESSAGE-{5}(?:\r?\n) + (Hash:\ (?P<hashes>[A-Za-z0-9\-,]+)(?:\r?\n){2})? + (?P<cleartext>(.*\n)+)(?:\r?\n) + )? + # armor header line; capture the variable part of the magic text + ^-{5}BEGIN\ PGP\ (?P<magic>[A-Z0-9 ,]+)-{5}(?:\r?\n) + # try to capture all the headers into one capture group + # if this doesn't match, m['headers'] will be None + (?P<headers>(^.+:\ .+(?:\r?\n))+)?(?:\r?\n)? + # capture all lines of the body, up to 76 characters long, + # including the newline, and the pad character(s) + (?P<body>([A-Za-z0-9+/]{1,75}={,2}(?:\r?\n))+) + # capture the armored CRC24 value + ^=(?P<crc>[A-Za-z0-9+/]{4})(?:\r?\n) + # finally, capture the armor tail line, which must match the armor header line + ^-{5}END\ PGP\ (?P=magic)-{5}(?:\r?\n)? + """, text, flags=re.MULTILINE | re.VERBOSE) + + if m is None: # pragma: no cover + raise ValueError("Expected: ASCII-armored PGP data") + + m = m.groupdict() + + if m['hashes'] is not None: + m['hashes'] = m['hashes'].split(',') + + if m['headers'] is not None: + m['headers'] = collections.OrderedDict(re.findall('^(?P<key>.+): (?P<value>.+)$\n?', m['headers'], flags=re.MULTILINE)) + + if m['body'] is not None: + try: + m['body'] = bytearray(base64.b64decode(m['body'].encode())) + + except (binascii.Error, TypeError) as ex: + six.raise_from(PGPError, ex) + + if m['crc'] is not None: + m['crc'] = Header.bytes_to_int(base64.b64decode(m['crc'].encode())) + if Armorable.crc24(m['body']) != m['crc']: + warnings.warn('Incorrect crc24', stacklevel=3) + + return m + + @staticmethod + def crc24(data): + # CRC24 computation, as described in the RFC 4880 section on Radix-64 Conversions + # + # The checksum is a 24-bit Cyclic Redundancy Check (CRC) converted to + # four characters of radix-64 encoding by the same MIME base64 + # transformation, preceded by an equal sign (=). The CRC is computed + # by using the generator 0x864CFB and an initialization of 0xB704CE. + # The accumulation is done on the data before it is converted to + # radix-64, rather than on the converted data. + crc = Armorable.__crc24_init__ + + if not isinstance(data, bytearray): + data = six.iterbytes(data) + + for b in data: + crc ^= b << 16 + + for i in range(8): + crc <<= 1 + if crc & 0x1000000: + crc ^= Armorable.__crc24_poly__ + + return crc & 0xFFFFFF + + @abc.abstractproperty + def magic(self): + """The magic string identifier for the current PGP type""" + + @classmethod + def from_file(cls, filename): + with open(filename, 'rb') as file: + obj = cls() + data = bytearray(os.path.getsize(filename)) + file.readinto(data) + + po = obj.parse(data) + + if po is not None: + return (obj, po) + + return obj # pragma: no cover + + @classmethod + def from_blob(cls, blob): + obj = cls() + if (not isinstance(blob, six.binary_type)) and (not isinstance(blob, bytearray)): + po = obj.parse(bytearray(blob, 'latin-1')) + + else: + po = obj.parse(bytearray(blob)) + + if po is not None: + return (obj, po) + + return obj # pragma: no cover + + def __init__(self): + super(Armorable, self).__init__() + self.ascii_headers = collections.OrderedDict() + self.ascii_headers['Version'] = 'PGPy v' + __version__ # Default value + + def __str__(self): + payload = base64.b64encode(self.__bytes__()).decode('latin-1') + payload = '\n'.join(payload[i:(i + 64)] for i in range(0, len(payload), 64)) + + return self.__armor_fmt__.format( + block_type=self.magic, + headers=''.join('{key}: {val}\n'.format(key=key, val=val) for key, val in self.ascii_headers.items()), + packet=payload, + crc=base64.b64encode(PGPObject.int_to_bytes(self.crc24(self.__bytes__()), 3)).decode('latin-1') + ) + + def __copy__(self): + obj = self.__class__() + obj.ascii_headers = self.ascii_headers.copy() + + return obj + + +class ParentRef(object): + # mixin class to handle weak-referencing a parent object + @property + def _parent(self): + if isinstance(self.__parent, weakref.ref): + return self.__parent() + return self.__parent + + @_parent.setter + def _parent(self, parent): + try: + self.__parent = weakref.ref(parent) + + except TypeError: + self.__parent = parent + + @property + def parent(self): + return self._parent + + def __init__(self): + super(ParentRef, self).__init__() + self._parent = None + + +class PGPObject(six.with_metaclass(abc.ABCMeta, object)): + __metaclass__ = abc.ABCMeta + + @staticmethod + def int_byte_len(i): + return (i.bit_length() + 7) // 8 + + @staticmethod + def bytes_to_int(b, order='big'): # pragma: no cover + """convert bytes to integer""" + if six.PY2: + # save the original type of b without having to copy any data + _b = b.__class__() + if order != 'little': + b = reversed(b) + + if not isinstance(_b, bytearray): + b = six.iterbytes(b) + + return sum(c << (i * 8) for i, c in enumerate(b)) + + return int.from_bytes(b, order) + + @staticmethod + def int_to_bytes(i, minlen=1, order='big'): # pragma: no cover + """convert integer to bytes""" + blen = max(minlen, PGPObject.int_byte_len(i), 1) + + if six.PY2: + r = iter(_ * 8 for _ in (range(blen) if order == 'little' else range(blen - 1, -1, -1))) + return bytes(bytearray((i >> c) & 0xff for c in r)) + + return i.to_bytes(blen, order) + + @staticmethod + def text_to_bytes(text): + if text is None: + return text + + # if we got bytes, just return it + if isinstance(text, (bytearray, six.binary_type)): + return text + + # if we were given a unicode string, or if we translated the string into utf-8, + # we know that Python already has it in utf-8 encoding, so we can now just encode it to bytes + return text.encode('utf-8') + + @staticmethod + def bytes_to_text(text): + if text is None or isinstance(text, six.text_type): + return text + + return text.decode('utf-8') + + @abc.abstractmethod + def parse(self, packet): + """this method is too abstract to understand""" + + @abc.abstractmethod + def __bytearray__(self): + """ + Returns the contents of concrete subclasses in a binary format that can be understood by other OpenPGP + implementations + """ + + def __bytes__(self): + """ + Return the contents of concrete subclasses in a binary format that can be understood by other OpenPGP + implementations + """ + # this is what all subclasses will do anyway, so doing this here we can reduce code duplication significantly + return bytes(self.__bytearray__()) + + +class Field(PGPObject): + @abc.abstractmethod + def __len__(self): + """Return the length of the output of __bytes__""" + + +class Header(Field): + @staticmethod + def encode_length(l, nhf=True, llen=1): + def _new_length(l): + if 192 > l: + return Header.int_to_bytes(l) + + elif 8384 > l: + elen = ((l & 0xFF00) + (192 << 8)) + ((l & 0xFF) - 192) + return Header.int_to_bytes(elen, 2) + + return b'\xFF' + Header.int_to_bytes(l, 4) + + def _old_length(l, llen): + return Header.int_to_bytes(l, llen) if llen > 0 else b'' + + return _new_length(l) if nhf else _old_length(l, llen) + + @sdproperty + def length(self): + return self._len + + @length.register(int) + def length_int(self, val): + self._len = val + + @length.register(six.binary_type) + @length.register(bytearray) + def length_bin(self, val): + def _new_len(b): + fo = b[0] + + if 192 > fo: + self._len = self.bytes_to_int(b[:1]) + del b[:1] + + elif 224 > fo: # >= 192 is implied + dlen = self.bytes_to_int(b[:2]) + self._len = ((dlen - (192 << 8)) & 0xFF00) + ((dlen & 0xFF) + 192) + del b[:2] + + elif 255 > fo: # pragma: no cover + # not testable until partial body lengths actually work + # >= 224 is implied + # this is a partial-length header + self._partial = True + self._len = 1 << (fo & 0x1f) + + elif 255 == fo: + self._len = self.bytes_to_int(b[1:5]) + del b[:5] + + else: # pragma: no cover + raise ValueError("Malformed length: 0x{:02x}".format(fo)) + + def _old_len(b): + if self.llen > 0: + self._len = self.bytes_to_int(b[:self.llen]) + del b[:self.llen] + + else: # pragma: no cover + self._len = 0 + + _new_len(val) if self._lenfmt == 1 else _old_len(val) + + @sdproperty + def llen(self): + l = self.length + lf = self._lenfmt + + if lf == 1: + # new-format length + if 192 > l: + return 1 + + elif 8384 > self.length: # >= 192 is implied + return 2 + + else: + return 5 + + else: + # old-format length + ##TODO: what if _llen needs to be (re)computed? + return self._llen + + @llen.register(int) + def llen_int(self, val): + if self._lenfmt == 0: + self._llen = {0: 1, 1: 2, 2: 4, 3: 0}[val] + + def __init__(self): + super(Header, self).__init__() + self._len = 1 + self._llen = 1 + self._lenfmt = 1 + self._partial = False + + +class MetaDispatchable(abc.ABCMeta): + """ + MetaDispatchable is a metaclass for objects that subclass Dispatchable + """ + + _roots = set() + """ + _roots is a set of all currently registered RootClass class objects + + A RootClass is successfully registered if the following things are true: + - it inherits (directly or indirectly) from Dispatchable + - __typeid__ == -1 + """ + _registry = {} + """ + _registry is the Dispatchable class registry. It uses the following format: + + { (RootClass, None): OpaqueClass }: + denotes the default ("opaque") for a given RootClass. + + An OpaqueClass is successfully registered as such provided the following conditions are met: + - it inherits directly from a RootClass + - __typeid__ is None + + { (RootClass, TypeID): SubClass }: + denotes the class that handles the type given in TypeID + + a SubClass is successfully registered as such provided the following conditions are met: + - it inherits (directly or indirectly) from a RootClass + - __typeid__ is a positive int + - the given typeid is not already registered + + { (RootClass, TypeID): VerSubClass }: + denotes that a given TypeID has multiple versions, and that this is class' subclasses handle those. + A VerSubClass is registered identically to a normal SubClass. + + { (RootClass, TypeID, Ver): VerSubClass }: + denotes the class that handles the type given in TypeID and the version of that type given in Ver + + a Versioned SubClass is successfully registered as such provided the following conditions are met: + - it inherits from a VerSubClass + - __ver__ > 0 + - the given typeid/ver combination is not already registered + """ + + def __new__(mcs, name, bases, attrs): # NOQA + ncls = super(MetaDispatchable, mcs).__new__(mcs, name, bases, attrs) + + if not hasattr(ncls.__typeid__, '__isabstractmethod__'): + if ncls.__typeid__ == -1 and not issubclass(ncls, tuple(MetaDispatchable._roots)): + # this is a root class + MetaDispatchable._roots.add(ncls) + + elif issubclass(ncls, tuple(MetaDispatchable._roots)) and ncls.__typeid__ != -1: + for rcls in [ root for root in MetaDispatchable._roots if issubclass(ncls, root) ]: + if (rcls, ncls.__typeid__) not in MetaDispatchable._registry: + MetaDispatchable._registry[(rcls, ncls.__typeid__)] = ncls + + if (ncls.__ver__ is not None and ncls.__ver__ > 0 and + (rcls, ncls.__typeid__, ncls.__ver__) not in MetaDispatchable._registry): + MetaDispatchable._registry[(rcls, ncls.__typeid__, ncls.__ver__)] = ncls + + # finally, return the new class object + return ncls + + def __call__(cls, packet=None): # NOQA + def _makeobj(cls): + obj = object.__new__(cls) + obj.__init__() + return obj + + if packet is not None: + if cls in MetaDispatchable._roots: + rcls = cls + + elif issubclass(cls, tuple(MetaDispatchable._roots)): # pragma: no cover + rcls = next(root for root in MetaDispatchable._roots if issubclass(cls, root)) + + ##TODO: else raise an exception of some kind, but this should never happen + + header = rcls.__headercls__() + header.parse(packet) + + ncls = None + if (rcls, header.typeid) in MetaDispatchable._registry: + ncls = MetaDispatchable._registry[(rcls, header.typeid)] + + if ncls.__ver__ == 0: + if header.__class__ != ncls.__headercls__: + nh = ncls.__headercls__() + nh.__dict__.update(header.__dict__) + try: + nh.parse(packet) + + except Exception as ex: + six.raise_from(PGPError, ex) + + header = nh + + if (rcls, header.typeid, header.version) in MetaDispatchable._registry: + ncls = MetaDispatchable._registry[(rcls, header.typeid, header.version)] + + else: # pragma: no cover + ncls = None + + if ncls is None: + ncls = MetaDispatchable._registry[(rcls, None)] + + obj = _makeobj(ncls) + obj.header = header + + try: + obj.parse(packet) + + except Exception as ex: + six.raise_from(PGPError, ex) + + else: + obj = _makeobj(cls) + + return obj + + +class Dispatchable(six.with_metaclass(MetaDispatchable, PGPObject)): + __metaclass__ = MetaDispatchable + + @abc.abstractproperty + def __headercls__(self): # pragma: no cover + return False + + @abc.abstractproperty + def __typeid__(self): # pragma: no cover + return False + + __ver__ = None + + +class SignatureVerification(object): + _sigsubj = collections.namedtuple('sigsubj', ['verified', 'by', 'signature', 'subject']) + + @property + def good_signatures(self): + """ + A generator yielding namedtuples of all signatures that were successfully verified + in the operation that returned this instance. The namedtuple has the following attributes: + + ``sigsubj.verified`` - ``bool`` of whether the signature verified successfully or not. + + ``sigsubj.by`` - the :py:obj:`~pgpy.PGPKey` that was used in this verify operation. + + ``sigsubj.signature`` - the :py:obj:`~pgpy.PGPSignature` that was verified. + + ``sigsubj.subject`` - the subject that was verified using the signature. + """ + for s in [ i for i in self._subjects if i.verified ]: + yield s + + @property + def bad_signatures(self): # pragma: no cover + """ + A generator yielding namedtuples of all signatures that were not verified + in the operation that returned this instance. The namedtuple has the following attributes: + + ``sigsubj.verified`` - ``bool`` of whether the signature verified successfully or not. + + ``sigsubj.by`` - the :py:obj:`~pgpy.PGPKey` that was used in this verify operation. + + ``sigsubj.signature`` - the :py:obj:`~pgpy.PGPSignature` that was verified. + + ``sigsubj.subject`` - the subject that was verified using the signature. + """ + for s in [ i for i in self._subjects if not i.verified ]: + yield s + + def __init__(self): + """ + Returned by :py:meth:`PGPKey.verify` + + Can be compared directly as a boolean to determine whether or not the specified signature verified. + """ + super(SignatureVerification, self).__init__() + self._subjects = [] + + def __contains__(self, item): + return item in {ii for i in self._subjects for ii in [i.signature, i.subject]} + + def __len__(self): + return len(self._subjects) + + def __bool__(self): + return all(s.verified for s in self._subjects) + + def __nonzero__(self): + return self.__bool__() + + def __and__(self, other): + if not isinstance(other, SignatureVerification): + raise TypeError(type(other)) + + self._subjects += other._subjects + return self + + def __repr__(self): + return "<SignatureVerification({verified})>".format(verified=str(bool(self))) + + def add_sigsubj(self, signature, by, subject=None, verified=False): + self._subjects.append(self._sigsubj(verified, by, signature, subject)) + + +class FlagEnumMeta(EnumMeta): + def __and__(self, other): + return { f for f in iter(self) if f.value & other } + + def __rand__(self, other): # pragma: no cover + return self & other + + +if six.PY2: + class FlagEnum(IntEnum): + __metaclass__ = FlagEnumMeta + +else: + namespace = FlagEnumMeta.__prepare__('FlagEnum', (IntEnum,)) + FlagEnum = FlagEnumMeta('FlagEnum', (IntEnum,), namespace) + + +class Fingerprint(str): + """ + A subclass of ``str``. Can be compared using == and != to ``str``, ``unicode``, and other :py:obj:`Fingerprint` instances. + + Primarily used as a key for internal dictionaries, so it ignores spaces when comparing and hashing + """ + @property + def keyid(self): + return str(self).replace(' ', '')[-16:] + + @property + def shortid(self): + return str(self).replace(' ', '')[-8:] + + def __new__(cls, content): + if isinstance(content, Fingerprint): + return content + + # validate input before continuing: this should be a string of 40 hex digits + content = content.upper().replace(' ', '') + if not bool(re.match(r'^[A-F0-9]{40}$', content)): + raise ValueError("Expected: String of 40 hex digits") + + # store in the format: "AAAA BBBB CCCC DDDD EEEE FFFF 0000 1111 2222 3333" + # ^^ note 2 spaces here + spaces = [ ' ' if i != 4 else ' ' for i in range(10) ] + chunks = [ ''.join(g) for g in six.moves.zip_longest(*[iter(content)] * 4) ] + content = ''.join(j for i in six.moves.zip_longest(chunks, spaces, fillvalue='') for j in i).strip() + + return str.__new__(cls, content) + + def __eq__(self, other): + if isinstance(other, Fingerprint): + return str(self) == str(other) + + if isinstance(other, (six.text_type, bytes, bytearray)): + if isinstance(other, (bytes, bytearray)): # pragma: no cover + other = other.decode('latin-1') + + other = str(other).replace(' ', '') + return any([self.replace(' ', '') == other, + self.keyid == other, + self.shortid == other]) + + return False # pragma: no cover + + def __ne__(self, other): + return not (self == other) + + def __hash__(self): + return hash(str(self.replace(' ', ''))) + + def __bytes__(self): + return binascii.unhexlify(six.b(self.replace(' ', ''))) + + +class SorteDeque(collections.deque): + """A deque subclass that tries to maintain sorted ordering using bisect""" + def insort(self, item): + i = bisect.bisect_left(self, item) + self.rotate(- i) + self.appendleft(item) + self.rotate(i) + + def resort(self, item): # pragma: no cover + if item in self: + # if item is already in self, see if it is still in sorted order. + # if not, re-sort it by removing it and then inserting it into its sorted order + i = bisect.bisect_left(self, item) + if i == len(self) or self[i] is not item: + self.remove(item) + self.insort(item) + + else: + # if item is not in self, just insert it in sorted order + self.insort(item) + + def check(self): # pragma: no cover + """re-sort any items in self that are not sorted""" + for unsorted in iter(self[i] for i in range(len(self) - 2) if not operator.le(self[i], self[i + 1])): + self.resort(unsorted) |