diff options
Diffstat (limited to 'gnupg/_parsers.py')
-rw-r--r-- | gnupg/_parsers.py | 276 |
1 files changed, 221 insertions, 55 deletions
diff --git a/gnupg/_parsers.py b/gnupg/_parsers.py index 2e1767e..9de57d2 100644 --- a/gnupg/_parsers.py +++ b/gnupg/_parsers.py @@ -367,7 +367,7 @@ def _sanitise(*args): checked += (val + " ") log.debug("_check_option(): No checks for %s" % val) - return checked + return checked.rstrip(' ') is_flag = lambda x: x.startswith('--') @@ -475,6 +475,8 @@ def _get_options_group(group=None): '--export-secret-subkeys', '--fingerprint', '--gen-revoke', + '--hidden-encrypt-to', + '--hidden-recipient', '--list-key', '--list-keys', '--list-public-keys', @@ -514,6 +516,7 @@ def _get_options_group(group=None): '--import', '--verify', '--verify-files', + '--output', ]) #: These options expect a string. see :func:`_check_preferences`. pref_options = frozenset(['--digest-algo', @@ -555,6 +558,9 @@ def _get_options_group(group=None): '--list-public-keys', '--list-secret-keys', '--list-sigs', + '--lock-multiple', + '--lock-never', + '--lock-once', '--no-default-keyring', '--no-default-recipient', '--no-emit-version', @@ -566,6 +572,7 @@ def _get_options_group(group=None): '--quiet', '--sign', '--symmetric', + '--throw-keyids', '--use-agent', '--verbose', '--version', @@ -905,6 +912,7 @@ class Sign(object): timestamp = None #: xxx fill me in what = None + status = None def __init__(self, gpg): self._gpg = gpg @@ -927,9 +935,9 @@ class Sign(object): :raises: :exc:`~exceptions.ValueError` if the status message is unknown. """ if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", - "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", - "INV_SGNR", "SIGEXPIRED"): - pass + "GOOD_PASSPHRASE", "MISSING_PASSPHRASE", "PINENTRY_LAUNCHED", + "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR", "SIGEXPIRED"): + self.status = key.replace("_", " ").lower() elif key == "SIG_CREATED": (self.sig_type, self.sig_algo, self.sig_hash_algo, self.what, self.timestamp, self.fingerprint) = value.split() @@ -946,6 +954,7 @@ class Sign(object): else: raise ValueError("Unknown status message: %r" % key) + class ListKeys(list): """Handle status messages for --list-keys. @@ -956,7 +965,6 @@ class ListKeys(list): | crs = X.509 certificate and private key available | ssb = secret subkey (secondary key) | uat = user attribute (same as user id except for field 10). - | sig = signature | rev = revocation signature | pkd = public key data (special field format, see below) | grp = reserved for gpgsm @@ -967,8 +975,10 @@ class ListKeys(list): super(ListKeys, self).__init__() self._gpg = gpg self.curkey = None + self.curuid = None self.fingerprints = [] self.uids = [] + self.sigs = {} def key(self, args): vars = (""" @@ -978,8 +988,12 @@ class ListKeys(list): for i in range(len(vars)): self.curkey[vars[i]] = args[i] self.curkey['uids'] = [] + self.curkey['sigs'] = {} if self.curkey['uid']: - self.curkey['uids'].append(self.curkey['uid']) + self.curuid = self.curkey['uid'] + self.curkey['uids'].append(self.curuid) + self.sigs[self.curuid] = set() + self.curkey['sigs'][self.curuid] = [] del self.curkey['uid'] self.curkey['subkeys'] = [] self.append(self.curkey) @@ -994,8 +1008,21 @@ class ListKeys(list): uid = args[9] uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) self.curkey['uids'].append(uid) + self.curuid = uid + self.curkey['sigs'][uid] = [] + self.sigs[uid] = set() self.uids.append(uid) + def sig(self, args): + vars = (""" + type trust length algo keyid date expires dummy ownertrust uid + """).split() + sig = {} + for i in range(len(vars)): + sig[vars[i]] = args[i] + self.curkey['sigs'][self.curuid].append(sig) + self.sigs[self.curuid].add(sig['keyid']) + def sub(self, args): subkey = [args[4], args[11]] self.curkey['subkeys'].append(subkey) @@ -1005,42 +1032,52 @@ class ListKeys(list): class ImportResult(object): - """Parse GnuPG status messages for key import operations. - - :type gpg: :class:`gnupg.GPG` - :param gpg: An instance of :class:`gnupg.GPG`. - """ - _ok_reason = {'0': 'Not actually changed', - '1': 'Entirely new key', - '2': 'New user IDs', - '4': 'New signatures', - '8': 'New subkeys', - '16': 'Contains private key', - '17': 'Contains private key',} - - _problem_reason = { '0': 'No specific reason given', - '1': 'Invalid Certificate', - '2': 'Issuer Certificate missing', - '3': 'Certificate Chain too long', - '4': 'Error storing certificate', } - - _fields = '''count no_user_id imported imported_rsa unchanged - n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups - not_imported'''.split() - _counts = OrderedDict( - zip(_fields, [int(0) for x in range(len(_fields))]) ) - - #: A list of strings containing the fingerprints of the GnuPG keyIDs - #: imported. - fingerprints = list() - - #: A list containing dictionaries with information gathered on keys - #: imported. - results = list() + """Parse GnuPG status messages for key import operations.""" def __init__(self, gpg): + """Start parsing the results of a key import operation. + + :type gpg: :class:`gnupg.GPG` + :param gpg: An instance of :class:`gnupg.GPG`. + """ self._gpg = gpg - self.counts = self._counts + + #: A map from GnuPG codes shown with the ``IMPORT_OK`` status message + #: to their human-meaningful English equivalents. + self._ok_reason = {'0': 'Not actually changed', + '1': 'Entirely new key', + '2': 'New user IDs', + '4': 'New signatures', + '8': 'New subkeys', + '16': 'Contains private key', + '17': 'Contains private key',} + + #: A map from GnuPG codes shown with the ``IMPORT_PROBLEM`` status + #: message to their human-meaningful English equivalents. + self._problem_reason = { '0': 'No specific reason given', + '1': 'Invalid Certificate', + '2': 'Issuer Certificate missing', + '3': 'Certificate Chain too long', + '4': 'Error storing certificate', } + + #: All the possible status messages pertaining to actions taken while + #: importing a key. + self._fields = '''count no_user_id imported imported_rsa unchanged + n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups + not_imported'''.split() + + #: Counts of all the status message results, :data:`_fields` which + #: have appeared. + self.counts = OrderedDict( + zip(self._fields, [int(0) for x in range(len(self._fields))])) + + #: A list of strings containing the fingerprints of the GnuPG keyIDs + #: imported. + self.fingerprints = list() + + #: A list containing dictionaries with information gathered on keys + #: imported. + self.results = list() def __nonzero__(self): """Override the determination for truthfulness evaluation. @@ -1048,7 +1085,7 @@ class ImportResult(object): :rtype: bool :returns: True if we have immport some keys, False otherwise. """ - if self.counts.not_imported > 0: return False + if self.counts['not_imported'] > 0: return False if len(self.fingerprints) == 0: return False return True __bool__ = __nonzero__ @@ -1056,7 +1093,7 @@ class ImportResult(object): def _handle_status(self, key, value): """Parse a status code from the attached GnuPG process. - :raises: :exc:`~exceptions.ValueError` if the status message is unknown. + :raises ValueError: if the status message is unknown. """ if key == "IMPORTED": # this duplicates info we already see in import_ok & import_problem @@ -1189,6 +1226,37 @@ class Verify(object): self.trust_level = None #: The string corresponding to the ``trust_level`` number. self.trust_text = None + #: The subpackets. These are stored as a dictionary, in the following + #: form: + #: Verify.subpackets = {'SUBPACKET_NUMBER': {'flags': FLAGS, + #: 'length': LENGTH, + #: 'data': DATA}, + #: 'ANOTHER_SUBPACKET_NUMBER': {...}} + self.subpackets = {} + #: The signature or key notations. These are also stored as a + #: dictionary, in the following form: + #: + #: Verify.notations = {NOTATION_NAME: NOTATION_DATA} + #: + #: For example, the Bitcoin core developer, Peter Todd, encodes in + #: every signature the header of the latest block on the Bitcoin + #: blockchain (to prove that a GnuPG signature that Peter made was made + #: *after* a specific point in time). These look like: + #: + #: gpg: Signature notation: blockhash@bitcoin.org=000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a + #: + #: Which python-gnupg would store as: + #: + #: Verify.notations['blockhash@bitcoin.org'] = '000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a' + self.notations = {} + + #: This will be a str or None. If not None, it is the last + #: ``NOTATION_NAME`` we stored in the ``notations`` dict. Because we're + #: not assured that a ``NOTATION_DATA`` status will arrive *immediately* + #: after its corresponding ``NOTATION_NAME``, we store the latest + #: ``NOTATION_NAME`` here until we get its corresponding + #: ``NOTATION_DATA``. + self._last_notation_name = None def __nonzero__(self): """Override the determination for truthfulness evaluation. @@ -1209,7 +1277,8 @@ class Verify(object): self.trust_level = self.TRUST_LEVELS[key] elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT", "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO", - "DECRYPTION_OKAY", "INV_SGNR"): + "DECRYPTION_OKAY", "INV_SGNR", "PROGRESS", + "PINENTRY_LAUNCHED"): pass elif key == "BADSIG": self.valid = False @@ -1220,6 +1289,7 @@ class Verify(object): self.status = 'signature good' self.key_id, self.username = value.split(None, 1) elif key == "VALIDSIG": + self.valid = True (self.fingerprint, self.creation_date, self.sig_timestamp, @@ -1245,17 +1315,106 @@ class Verify(object): self.valid = False self.key_id = value self.status = 'no public key' + # These are useless in Verify, since they are spit out for any + # pub/subkeys on the key, not just the one doing the signing. + # if we want to check for signatures make with expired key, + # the relevant flags are REVKEYSIG and KEYREVOKED. elif key in ("KEYEXPIRED", "SIGEXPIRED"): - # these are useless in verify, since they are spit out for any - # pub/subkeys on the key, not just the one doing the signing. - # if we want to check for signatures with expired key, - # the relevant flag is EXPKEYSIG. pass + # The signature has an expiration date which has already passed + # (EXPKEYSIG), or the signature has been revoked (REVKEYSIG): elif key in ("EXPKEYSIG", "REVKEYSIG"): - # signed with expired or revoked key self.valid = False self.key_id = value.split()[0] self.status = (('%s %s') % (key[:3], key[3:])).lower() + # This is super annoying, and bad design on the part of GnuPG, in my + # opinion. + # + # This flag can get triggered if a valid signature is made, and then + # later the key (or subkey) which created the signature is + # revoked. When this happens, GnuPG will output: + # + # REVKEYSIG 075BFD18B365D34C Test Expired Key <test@python-gnupg.git> + # VALIDSIG DAB69B05F591640B7F4DCBEA075BFD18B365D34C 2014-09-26 1411700539 0 4 0 1 2 00 4BA800F77452A6C29447FF20F4AF76ACBBE22CE2 + # KEYREVOKED + # + # Meaning that we have a timestamp for when the signature was created, + # and we know that the signature is valid, but since GnuPG gives us no + # timestamp for when the key was revoked... we have no ability to + # determine if the valid signature was made *before* the signing key + # was revoked or *after*. Meaning that if you are like me and you sign + # all your software releases and git commits, and you also practice + # good opsec by doing regular key rotations, your old signatures made + # by your expired/revoked keys (even though they were created when the + # key was still good) are considered bad because GnuPG is a + # braindamaged piece of shit. + # + # Software engineering, motherfuckers, DO YOU SPEAK IT? + # + # The signing key which created the signature has since been revoked + # (KEYREVOKED), and we're going to ignore it (but add something to the + # status message): + elif key in ("KEYREVOKED"): + self.status = '\n'.join([self.status, "key revoked"]) + # SIG_SUBPACKET <type> <flags> <len> <data> + # This indicates that a signature subpacket was seen. The format is + # the same as the "spk" record above. + # + # [...] + # + # SPK - Signature subpacket records + # + # - Field 2 :: Subpacket number as per RFC-4880 and later. + # - Field 3 :: Flags in hex. Currently the only two bits assigned + # are 1, to indicate that the subpacket came from the + # hashed part of the signature, and 2, to indicate the + # subpacket was marked critical. + # - Field 4 :: Length of the subpacket. Note that this is the + # length of the subpacket, and not the length of field + # 5 below. Due to the need for %-encoding, the length + # of field 5 may be up to 3x this value. + # - Field 5 :: The subpacket data. Printable ASCII is shown as + # ASCII, but other values are rendered as %XX where XX + # is the hex value for the byte. + elif key in ("SIG_SUBPACKET"): + fields = value.split() + try: + subpacket_number = fields[0] + self.subpackets[subpacket_number] = {'flags': None, + 'length': None, + 'data': None} + except IndexError: + # We couldn't parse the subpacket type (an RFC4880 + # identifier), so we shouldn't continue parsing. + pass + else: + # Pull as much data as we can parse out of the subpacket: + try: + self.subpackets[subpacket_number]['flags'] = fields[1] + self.subpackets[subpacket_number]['length'] = fields[2] + self.subpackets[subpacket_number]['data'] = fields[3] + except IndexError: + pass + # NOTATION_ + # There are actually two related status codes to convey notation + # data: + # + # - NOTATION_NAME <name> + # - NOTATION_DATA <string> + # + # <name> and <string> are %XX escaped; the data may be split among + # several NOTATION_DATA lines. + elif key.startswith("NOTATION_"): + if key.endswith("NAME"): + self.notations[value] = str() + self._last_notation_name = value + elif key.endswith("DATA"): + if self._last_notation_name is not None: + # Append the NOTATION_DATA to any previous data we + # received for that NOTATION_NAME: + self.notations[self._last_notation_name] += value + else: + pass else: raise ValueError("Unknown status message: %r" % key) @@ -1360,26 +1519,33 @@ class ListPackets(object): self.need_passphrase_sym = None #: The keyid and uid which this data is encrypted to. self.userid_hint = None + #: The first key that we detected that a message was encrypted + #: to. This is provided for backwards compatibility. As of Issue #77_, + #: the ``encrypted_to`` attribute should be used instead. + self.key = None + #: A list of keyid's that the message has been encrypted to. + self.encrypted_to = [] def _handle_status(self, key, value): """Parse a status code from the attached GnuPG process. :raises: :exc:`~exceptions.ValueError` if the status message is unknown. """ - if key == 'NODATA': + if key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED', + 'END_DECRYPTION', 'GOOD_PASSPHRASE', 'BAD_PASSPHRASE'): + pass + elif key == 'NODATA': self.status = nodata(value) elif key == 'ENC_TO': - # This will only capture keys in our keyring. In the future we - # may want to include multiple unknown keys in this list. - self.key, _, _ = value.split() - elif key == 'NEED_PASSPHRASE': + key, _, _ = value.split() + if not self.key: + self.key = key + self.encrypted_to.append(key) + elif key == ('NEED_PASSPHRASE', 'MISSING_PASSPHRASE'): self.need_passphrase = True elif key == 'NEED_PASSPHRASE_SYM': self.need_passphrase_sym = True elif key == 'USERID_HINT': self.userid_hint = value.strip().split() - elif key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED', - 'END_DECRYPTION'): - pass else: raise ValueError("Unknown status message: %r" % key) |