From d89c1849d551de68d26a1f56798ee5084dca6556 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 7 Mar 2013 17:23:05 -0300 Subject: Move source files to proper subdirectory. --- src/leap/soledad/util.py | 187 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 src/leap/soledad/util.py (limited to 'src/leap/soledad/util.py') diff --git a/src/leap/soledad/util.py b/src/leap/soledad/util.py new file mode 100644 index 00000000..c64d4c5f --- /dev/null +++ b/src/leap/soledad/util.py @@ -0,0 +1,187 @@ +""" +Utilities for Soledad. +""" + +import os +import gnupg +import re +from gnupg import ( + logger, + _is_sequence, + _make_binary_stream, +) + + +class ListPackets(): + """ + Handle status messages for --list-packets. + """ + + def __init__(self, gpg): + self.gpg = gpg + self.nodata = None + self.key = None + self.need_passphrase = None + self.need_passphrase_sym = None + self.userid_hint = None + + def handle_status(self, key, value): + # TODO: write tests for handle_status + if key == 'NODATA': + self.nodata = True + if key == 'ENC_TO': + # This will only capture keys in our keyring. In the future we + # may want to include multiple unknown keys in this list. + self.key, _, _ = value.split() + if key == 'NEED_PASSPHRASE': + self.need_passphrase = True + if key == 'NEED_PASSPHRASE_SYM': + self.need_passphrase_sym = True + if key == 'USERID_HINT': + self.userid_hint = value.strip().split() + + +class GPGWrapper(gnupg.GPG): + """ + This is a temporary class for handling GPG requests, and should be + replaced by a more general class used throughout the project. + """ + + GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg" + GNUPG_BINARY = "/usr/bin/gpg" # this has to be changed based on OS + + def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME, + verbose=False, use_agent=False, keyring=None, options=None): + super(GPGWrapper, self).__init__(gnupghome=gnupghome, + gpgbinary=gpgbinary, + verbose=verbose, + use_agent=use_agent, + keyring=keyring, + options=options) + self.result_map['list-packets'] = ListPackets + + def find_key_by_email(self, email, secret=False): + """ + Find user's key based on their email. + """ + for key in self.list_keys(secret=secret): + for uid in key['uids']: + if re.search(email, uid): + return key + raise LookupError("GnuPG public key for email %s not found!" % email) + + def find_key_by_subkey(self, subkey): + for key in self.list_keys(): + for sub in key['subkeys']: + if sub[0] == subkey: + return key + raise LookupError( + "GnuPG public key for subkey %s not found!" % subkey) + + def find_key_by_keyid(self, keyid): + for key in self.list_keys(): + if keyid == key['keyid']: + return key + raise LookupError( + "GnuPG public key for subkey %s not found!" % subkey) + + def encrypt(self, data, recipient, sign=None, always_trust=True, + passphrase=None, symmetric=False): + """ + Encrypt data using GPG. + """ + # TODO: devise a way so we don't need to "always trust". + return super(GPGWrapper, self).encrypt(data, recipient, sign=sign, + always_trust=always_trust, + passphrase=passphrase, + symmetric=symmetric, + cipher_algo='AES256') + + def decrypt(self, data, always_trust=True, passphrase=None): + """ + Decrypt data using GPG. + """ + # TODO: devise a way so we don't need to "always trust". + return super(GPGWrapper, self).decrypt(data, + always_trust=always_trust, + passphrase=passphrase) + + def send_keys(self, keyserver, *keyids): + """ + Send keys to a keyserver + """ + result = self.result_map['list'](self) + gnupg.logger.debug('send_keys: %r', keyids) + data = gnupg._make_binary_stream("", self.encoding) + args = ['--keyserver', keyserver, '--send-keys'] + args.extend(keyids) + self._handle_io(args, data, result, binary=True) + gnupg.logger.debug('send_keys result: %r', result.__dict__) + data.close() + return result + + def encrypt_file(self, file, recipients, sign=None, + always_trust=False, passphrase=None, + armor=True, output=None, symmetric=False, + cipher_algo=None): + "Encrypt the message read from the file-like object 'file'" + args = ['--encrypt'] + if symmetric: + args = ['--symmetric'] + if cipher_algo: + args.append('--cipher-algo %s' % cipher_algo) + else: + args = ['--encrypt'] + if not _is_sequence(recipients): + recipients = (recipients,) + for recipient in recipients: + args.append('--recipient "%s"' % recipient) + if armor: # create ascii-armored output - set to False for binary + args.append('--armor') + if output: # write the output to a file with the specified name + if os.path.exists(output): + os.remove(output) # to avoid overwrite confirmation message + args.append('--output "%s"' % output) + if sign: + args.append('--sign --default-key "%s"' % sign) + if always_trust: + args.append("--always-trust") + result = self.result_map['crypt'](self) + self._handle_io(args, file, result, passphrase=passphrase, binary=True) + logger.debug('encrypt result: %r', result.data) + return result + + def list_packets(self, raw_data): + args = ["--list-packets"] + result = self.result_map['list-packets'](self) + self._handle_io( + args, + _make_binary_stream(raw_data, self.encoding), + result, + ) + return result + + def encrypted_to(self, raw_data): + """ + Return the key to which raw_data is encrypted to. + """ + # TODO: make this support multiple keys. + result = self.list_packets(raw_data) + if not result.key: + raise LookupError( + "Content is not encrypted to a GnuPG key!") + try: + return self.find_key_by_keyid(result.key) + except: + return self.find_key_by_subkey(result.key) + + def is_encrypted_sym(self, raw_data): + result = self.list_packets(raw_data) + return bool(result.need_passphrase_sym) + + def is_encrypted_asym(self, raw_data): + result = self.list_packets(raw_data) + return bool(result.key) + + def is_encrypted(self, raw_data): + self.is_encrypted_asym() or self.is_encrypted_sym() -- cgit v1.2.3