summaryrefslogtreecommitdiff
path: root/gnupg/_parsers.py
diff options
context:
space:
mode:
Diffstat (limited to 'gnupg/_parsers.py')
-rw-r--r--gnupg/_parsers.py276
1 files changed, 221 insertions, 55 deletions
diff --git a/gnupg/_parsers.py b/gnupg/_parsers.py
index 2e1767e..9de57d2 100644
--- a/gnupg/_parsers.py
+++ b/gnupg/_parsers.py
@@ -367,7 +367,7 @@ def _sanitise(*args):
checked += (val + " ")
log.debug("_check_option(): No checks for %s" % val)
- return checked
+ return checked.rstrip(' ')
is_flag = lambda x: x.startswith('--')
@@ -475,6 +475,8 @@ def _get_options_group(group=None):
'--export-secret-subkeys',
'--fingerprint',
'--gen-revoke',
+ '--hidden-encrypt-to',
+ '--hidden-recipient',
'--list-key',
'--list-keys',
'--list-public-keys',
@@ -514,6 +516,7 @@ def _get_options_group(group=None):
'--import',
'--verify',
'--verify-files',
+ '--output',
])
#: These options expect a string. see :func:`_check_preferences`.
pref_options = frozenset(['--digest-algo',
@@ -555,6 +558,9 @@ def _get_options_group(group=None):
'--list-public-keys',
'--list-secret-keys',
'--list-sigs',
+ '--lock-multiple',
+ '--lock-never',
+ '--lock-once',
'--no-default-keyring',
'--no-default-recipient',
'--no-emit-version',
@@ -566,6 +572,7 @@ def _get_options_group(group=None):
'--quiet',
'--sign',
'--symmetric',
+ '--throw-keyids',
'--use-agent',
'--verbose',
'--version',
@@ -905,6 +912,7 @@ class Sign(object):
timestamp = None
#: xxx fill me in
what = None
+ status = None
def __init__(self, gpg):
self._gpg = gpg
@@ -927,9 +935,9 @@ class Sign(object):
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
- "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL",
- "INV_SGNR", "SIGEXPIRED"):
- pass
+ "GOOD_PASSPHRASE", "MISSING_PASSPHRASE", "PINENTRY_LAUNCHED",
+ "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR", "SIGEXPIRED"):
+ self.status = key.replace("_", " ").lower()
elif key == "SIG_CREATED":
(self.sig_type, self.sig_algo, self.sig_hash_algo,
self.what, self.timestamp, self.fingerprint) = value.split()
@@ -946,6 +954,7 @@ class Sign(object):
else:
raise ValueError("Unknown status message: %r" % key)
+
class ListKeys(list):
"""Handle status messages for --list-keys.
@@ -956,7 +965,6 @@ class ListKeys(list):
| crs = X.509 certificate and private key available
| ssb = secret subkey (secondary key)
| uat = user attribute (same as user id except for field 10).
- | sig = signature
| rev = revocation signature
| pkd = public key data (special field format, see below)
| grp = reserved for gpgsm
@@ -967,8 +975,10 @@ class ListKeys(list):
super(ListKeys, self).__init__()
self._gpg = gpg
self.curkey = None
+ self.curuid = None
self.fingerprints = []
self.uids = []
+ self.sigs = {}
def key(self, args):
vars = ("""
@@ -978,8 +988,12 @@ class ListKeys(list):
for i in range(len(vars)):
self.curkey[vars[i]] = args[i]
self.curkey['uids'] = []
+ self.curkey['sigs'] = {}
if self.curkey['uid']:
- self.curkey['uids'].append(self.curkey['uid'])
+ self.curuid = self.curkey['uid']
+ self.curkey['uids'].append(self.curuid)
+ self.sigs[self.curuid] = set()
+ self.curkey['sigs'][self.curuid] = []
del self.curkey['uid']
self.curkey['subkeys'] = []
self.append(self.curkey)
@@ -994,8 +1008,21 @@ class ListKeys(list):
uid = args[9]
uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
self.curkey['uids'].append(uid)
+ self.curuid = uid
+ self.curkey['sigs'][uid] = []
+ self.sigs[uid] = set()
self.uids.append(uid)
+ def sig(self, args):
+ vars = ("""
+ type trust length algo keyid date expires dummy ownertrust uid
+ """).split()
+ sig = {}
+ for i in range(len(vars)):
+ sig[vars[i]] = args[i]
+ self.curkey['sigs'][self.curuid].append(sig)
+ self.sigs[self.curuid].add(sig['keyid'])
+
def sub(self, args):
subkey = [args[4], args[11]]
self.curkey['subkeys'].append(subkey)
@@ -1005,42 +1032,52 @@ class ListKeys(list):
class ImportResult(object):
- """Parse GnuPG status messages for key import operations.
-
- :type gpg: :class:`gnupg.GPG`
- :param gpg: An instance of :class:`gnupg.GPG`.
- """
- _ok_reason = {'0': 'Not actually changed',
- '1': 'Entirely new key',
- '2': 'New user IDs',
- '4': 'New signatures',
- '8': 'New subkeys',
- '16': 'Contains private key',
- '17': 'Contains private key',}
-
- _problem_reason = { '0': 'No specific reason given',
- '1': 'Invalid Certificate',
- '2': 'Issuer Certificate missing',
- '3': 'Certificate Chain too long',
- '4': 'Error storing certificate', }
-
- _fields = '''count no_user_id imported imported_rsa unchanged
- n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups
- not_imported'''.split()
- _counts = OrderedDict(
- zip(_fields, [int(0) for x in range(len(_fields))]) )
-
- #: A list of strings containing the fingerprints of the GnuPG keyIDs
- #: imported.
- fingerprints = list()
-
- #: A list containing dictionaries with information gathered on keys
- #: imported.
- results = list()
+ """Parse GnuPG status messages for key import operations."""
def __init__(self, gpg):
+ """Start parsing the results of a key import operation.
+
+ :type gpg: :class:`gnupg.GPG`
+ :param gpg: An instance of :class:`gnupg.GPG`.
+ """
self._gpg = gpg
- self.counts = self._counts
+
+ #: A map from GnuPG codes shown with the ``IMPORT_OK`` status message
+ #: to their human-meaningful English equivalents.
+ self._ok_reason = {'0': 'Not actually changed',
+ '1': 'Entirely new key',
+ '2': 'New user IDs',
+ '4': 'New signatures',
+ '8': 'New subkeys',
+ '16': 'Contains private key',
+ '17': 'Contains private key',}
+
+ #: A map from GnuPG codes shown with the ``IMPORT_PROBLEM`` status
+ #: message to their human-meaningful English equivalents.
+ self._problem_reason = { '0': 'No specific reason given',
+ '1': 'Invalid Certificate',
+ '2': 'Issuer Certificate missing',
+ '3': 'Certificate Chain too long',
+ '4': 'Error storing certificate', }
+
+ #: All the possible status messages pertaining to actions taken while
+ #: importing a key.
+ self._fields = '''count no_user_id imported imported_rsa unchanged
+ n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups
+ not_imported'''.split()
+
+ #: Counts of all the status message results, :data:`_fields` which
+ #: have appeared.
+ self.counts = OrderedDict(
+ zip(self._fields, [int(0) for x in range(len(self._fields))]))
+
+ #: A list of strings containing the fingerprints of the GnuPG keyIDs
+ #: imported.
+ self.fingerprints = list()
+
+ #: A list containing dictionaries with information gathered on keys
+ #: imported.
+ self.results = list()
def __nonzero__(self):
"""Override the determination for truthfulness evaluation.
@@ -1048,7 +1085,7 @@ class ImportResult(object):
:rtype: bool
:returns: True if we have immport some keys, False otherwise.
"""
- if self.counts.not_imported > 0: return False
+ if self.counts['not_imported'] > 0: return False
if len(self.fingerprints) == 0: return False
return True
__bool__ = __nonzero__
@@ -1056,7 +1093,7 @@ class ImportResult(object):
def _handle_status(self, key, value):
"""Parse a status code from the attached GnuPG process.
- :raises: :exc:`~exceptions.ValueError` if the status message is unknown.
+ :raises ValueError: if the status message is unknown.
"""
if key == "IMPORTED":
# this duplicates info we already see in import_ok & import_problem
@@ -1189,6 +1226,37 @@ class Verify(object):
self.trust_level = None
#: The string corresponding to the ``trust_level`` number.
self.trust_text = None
+ #: The subpackets. These are stored as a dictionary, in the following
+ #: form:
+ #: Verify.subpackets = {'SUBPACKET_NUMBER': {'flags': FLAGS,
+ #: 'length': LENGTH,
+ #: 'data': DATA},
+ #: 'ANOTHER_SUBPACKET_NUMBER': {...}}
+ self.subpackets = {}
+ #: The signature or key notations. These are also stored as a
+ #: dictionary, in the following form:
+ #:
+ #: Verify.notations = {NOTATION_NAME: NOTATION_DATA}
+ #:
+ #: For example, the Bitcoin core developer, Peter Todd, encodes in
+ #: every signature the header of the latest block on the Bitcoin
+ #: blockchain (to prove that a GnuPG signature that Peter made was made
+ #: *after* a specific point in time). These look like:
+ #:
+ #: gpg: Signature notation: blockhash@bitcoin.org=000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a
+ #:
+ #: Which python-gnupg would store as:
+ #:
+ #: Verify.notations['blockhash@bitcoin.org'] = '000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a'
+ self.notations = {}
+
+ #: This will be a str or None. If not None, it is the last
+ #: ``NOTATION_NAME`` we stored in the ``notations`` dict. Because we're
+ #: not assured that a ``NOTATION_DATA`` status will arrive *immediately*
+ #: after its corresponding ``NOTATION_NAME``, we store the latest
+ #: ``NOTATION_NAME`` here until we get its corresponding
+ #: ``NOTATION_DATA``.
+ self._last_notation_name = None
def __nonzero__(self):
"""Override the determination for truthfulness evaluation.
@@ -1209,7 +1277,8 @@ class Verify(object):
self.trust_level = self.TRUST_LEVELS[key]
elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT",
"PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO",
- "DECRYPTION_OKAY", "INV_SGNR"):
+ "DECRYPTION_OKAY", "INV_SGNR", "PROGRESS",
+ "PINENTRY_LAUNCHED"):
pass
elif key == "BADSIG":
self.valid = False
@@ -1220,6 +1289,7 @@ class Verify(object):
self.status = 'signature good'
self.key_id, self.username = value.split(None, 1)
elif key == "VALIDSIG":
+ self.valid = True
(self.fingerprint,
self.creation_date,
self.sig_timestamp,
@@ -1245,17 +1315,106 @@ class Verify(object):
self.valid = False
self.key_id = value
self.status = 'no public key'
+ # These are useless in Verify, since they are spit out for any
+ # pub/subkeys on the key, not just the one doing the signing.
+ # if we want to check for signatures make with expired key,
+ # the relevant flags are REVKEYSIG and KEYREVOKED.
elif key in ("KEYEXPIRED", "SIGEXPIRED"):
- # these are useless in verify, since they are spit out for any
- # pub/subkeys on the key, not just the one doing the signing.
- # if we want to check for signatures with expired key,
- # the relevant flag is EXPKEYSIG.
pass
+ # The signature has an expiration date which has already passed
+ # (EXPKEYSIG), or the signature has been revoked (REVKEYSIG):
elif key in ("EXPKEYSIG", "REVKEYSIG"):
- # signed with expired or revoked key
self.valid = False
self.key_id = value.split()[0]
self.status = (('%s %s') % (key[:3], key[3:])).lower()
+ # This is super annoying, and bad design on the part of GnuPG, in my
+ # opinion.
+ #
+ # This flag can get triggered if a valid signature is made, and then
+ # later the key (or subkey) which created the signature is
+ # revoked. When this happens, GnuPG will output:
+ #
+ # REVKEYSIG 075BFD18B365D34C Test Expired Key <test@python-gnupg.git>
+ # VALIDSIG DAB69B05F591640B7F4DCBEA075BFD18B365D34C 2014-09-26 1411700539 0 4 0 1 2 00 4BA800F77452A6C29447FF20F4AF76ACBBE22CE2
+ # KEYREVOKED
+ #
+ # Meaning that we have a timestamp for when the signature was created,
+ # and we know that the signature is valid, but since GnuPG gives us no
+ # timestamp for when the key was revoked... we have no ability to
+ # determine if the valid signature was made *before* the signing key
+ # was revoked or *after*. Meaning that if you are like me and you sign
+ # all your software releases and git commits, and you also practice
+ # good opsec by doing regular key rotations, your old signatures made
+ # by your expired/revoked keys (even though they were created when the
+ # key was still good) are considered bad because GnuPG is a
+ # braindamaged piece of shit.
+ #
+ # Software engineering, motherfuckers, DO YOU SPEAK IT?
+ #
+ # The signing key which created the signature has since been revoked
+ # (KEYREVOKED), and we're going to ignore it (but add something to the
+ # status message):
+ elif key in ("KEYREVOKED"):
+ self.status = '\n'.join([self.status, "key revoked"])
+ # SIG_SUBPACKET <type> <flags> <len> <data>
+ # This indicates that a signature subpacket was seen. The format is
+ # the same as the "spk" record above.
+ #
+ # [...]
+ #
+ # SPK - Signature subpacket records
+ #
+ # - Field 2 :: Subpacket number as per RFC-4880 and later.
+ # - Field 3 :: Flags in hex. Currently the only two bits assigned
+ # are 1, to indicate that the subpacket came from the
+ # hashed part of the signature, and 2, to indicate the
+ # subpacket was marked critical.
+ # - Field 4 :: Length of the subpacket. Note that this is the
+ # length of the subpacket, and not the length of field
+ # 5 below. Due to the need for %-encoding, the length
+ # of field 5 may be up to 3x this value.
+ # - Field 5 :: The subpacket data. Printable ASCII is shown as
+ # ASCII, but other values are rendered as %XX where XX
+ # is the hex value for the byte.
+ elif key in ("SIG_SUBPACKET"):
+ fields = value.split()
+ try:
+ subpacket_number = fields[0]
+ self.subpackets[subpacket_number] = {'flags': None,
+ 'length': None,
+ 'data': None}
+ except IndexError:
+ # We couldn't parse the subpacket type (an RFC4880
+ # identifier), so we shouldn't continue parsing.
+ pass
+ else:
+ # Pull as much data as we can parse out of the subpacket:
+ try:
+ self.subpackets[subpacket_number]['flags'] = fields[1]
+ self.subpackets[subpacket_number]['length'] = fields[2]
+ self.subpackets[subpacket_number]['data'] = fields[3]
+ except IndexError:
+ pass
+ # NOTATION_
+ # There are actually two related status codes to convey notation
+ # data:
+ #
+ # - NOTATION_NAME <name>
+ # - NOTATION_DATA <string>
+ #
+ # <name> and <string> are %XX escaped; the data may be split among
+ # several NOTATION_DATA lines.
+ elif key.startswith("NOTATION_"):
+ if key.endswith("NAME"):
+ self.notations[value] = str()
+ self._last_notation_name = value
+ elif key.endswith("DATA"):
+ if self._last_notation_name is not None:
+ # Append the NOTATION_DATA to any previous data we
+ # received for that NOTATION_NAME:
+ self.notations[self._last_notation_name] += value
+ else:
+ pass
else:
raise ValueError("Unknown status message: %r" % key)
@@ -1360,26 +1519,33 @@ class ListPackets(object):
self.need_passphrase_sym = None
#: The keyid and uid which this data is encrypted to.
self.userid_hint = None
+ #: The first key that we detected that a message was encrypted
+ #: to. This is provided for backwards compatibility. As of Issue #77_,
+ #: the ``encrypted_to`` attribute should be used instead.
+ self.key = None
+ #: A list of keyid's that the message has been encrypted to.
+ self.encrypted_to = []
def _handle_status(self, key, value):
"""Parse a status code from the attached GnuPG process.
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
- if key == 'NODATA':
+ if key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED',
+ 'END_DECRYPTION', 'GOOD_PASSPHRASE', 'BAD_PASSPHRASE'):
+ pass
+ elif key == 'NODATA':
self.status = nodata(value)
elif key == 'ENC_TO':
- # This will only capture keys in our keyring. In the future we
- # may want to include multiple unknown keys in this list.
- self.key, _, _ = value.split()
- elif key == 'NEED_PASSPHRASE':
+ key, _, _ = value.split()
+ if not self.key:
+ self.key = key
+ self.encrypted_to.append(key)
+ elif key == ('NEED_PASSPHRASE', 'MISSING_PASSPHRASE'):
self.need_passphrase = True
elif key == 'NEED_PASSPHRASE_SYM':
self.need_passphrase_sym = True
elif key == 'USERID_HINT':
self.userid_hint = value.strip().split()
- elif key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED',
- 'END_DECRYPTION'):
- pass
else:
raise ValueError("Unknown status message: %r" % key)